Skip to content
126 changes: 96 additions & 30 deletions go/src/pythia/backend/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package backend

import (
"container/list"
"encoding/json"
"flag"
"fmt"
"log"
"pythia"
"strings"
"sync"
"time"
)

func init() {
Expand All @@ -41,7 +43,7 @@ type queueClient struct {
Id int

// The response channel.
Response chan<- pythia.Message
Response chan<- pythia.Message `json:"-"`

// The number of parallel jobs this pool can handle.
Capacity int
Expand All @@ -67,7 +69,7 @@ type queueJob struct {
// The client having submitted this job.
Origin *queueClient

// Element of the queue.waiting list pointing to this job, or nil if the job
// Element of the queue.Waiting list pointing to this job, or nil if the job
// is currently running.
WaitingElement *list.Element

Expand Down Expand Up @@ -96,6 +98,17 @@ const (
quitMsg pythia.MsgType = "-quit"
)

// A queueStatus is an internal structure required to marshal the state of the Queue
// in a semantically right JSON.
type QueueStatus struct {
Capacity int `json:"capacity"`
Available int `json:"available"`
Clients []*queueClient `json:"clients, omitempty"`
Jobs []*queueJob `json:"jobs, omitempty"`
Waiting *list.List `json:"waiting"`
CreationDate time.Time `json:"creation_date"`
}

// The Queue is the central component of Pythia.
// It receives jobs (tasks with inputs) from front-ends and dispatches them
// to the sandboxes.
Expand All @@ -116,20 +129,24 @@ type Queue struct {
wg sync.WaitGroup

// Active connections
clients map[int]*queueClient
Clients map[int]*queueClient

// Jobs to be processed/currently processing
jobs map[string]*queueJob
Jobs map[string]*queueJob

// List of jobs (*queueJob) waiting to be assigned.
waiting *list.List
Waiting *list.List

// Get the Queue creation datetime
CreationDate time.Time
}

// NewQueue returns a new queue with default parameters.
func NewQueue() *Queue {
queue := new(Queue)
queue.Capacity = 500
queue.quit = make(chan bool, 1)
queue.CreationDate = time.Now()
return queue
}

Expand All @@ -149,6 +166,7 @@ func (queue *Queue) Run() {
closing := false
master := make(chan queueMessage)
queue.master = master

go func() {
<-queue.quit
closing = true
Expand Down Expand Up @@ -192,29 +210,29 @@ func (queue *Queue) Shutdown() {
// Main goroutine responsible for scheduling the jobs.
func (queue *Queue) main(master <-chan queueMessage) {
defer queue.wg.Done()
queue.clients = make(map[int]*queueClient)
queue.jobs = make(map[string]*queueJob)
queue.waiting = list.New()
queue.Clients = make(map[int]*queueClient)
queue.Jobs = make(map[string]*queueJob)
queue.Waiting = list.New()
for qm := range master {
switch qm.Msg.Message {
case connectMsg:
log.Print("Client ", qm.Client.Id, ": connected.")
queue.clients[qm.Client.Id] = qm.Client
queue.Clients[qm.Client.Id] = qm.Client
case pythia.RegisterPoolMsg:
log.Print("Client ", qm.Client.Id, ": pool capacity ",
qm.Msg.Capacity)
qm.Client.Capacity = qm.Msg.Capacity
case pythia.LaunchMsg:
id := qm.Msg.Id
if _, ok := queue.jobs[id]; ok {
if _, ok := queue.Jobs[id]; ok {
log.Print("Job ", id, ": already launched, rejecting.")
qm.Client.Response <- pythia.Message{
Message: pythia.DoneMsg,
Id: id,
Status: pythia.Fatal,
Output: "Job already launched",
}
} else if queue.waiting.Len() >= queue.Capacity {
} else if queue.Waiting.Len() >= queue.Capacity {
log.Print("Job ", id, ": queue full, rejecting.")
qm.Client.Response <- pythia.Message{
Message: pythia.DoneMsg,
Expand All @@ -229,14 +247,14 @@ func (queue *Queue) main(master <-chan queueMessage) {
Origin: qm.Client,
}
qm.Client.Submitted[id] = job
queue.jobs[id] = job
job.WaitingElement = queue.waiting.PushBack(job)
queue.Jobs[id] = job
job.WaitingElement = queue.Waiting.PushBack(job)
log.Print("Job ", id, ": queued.")
}
case pythia.DoneMsg:
id := qm.Msg.Id
log.Print("Job ", id, ": done.")
job := queue.jobs[id]
job := queue.Jobs[id]
if job == nil {
log.Println("Ignoring message for unknown job", qm.Msg)
break
Expand All @@ -246,7 +264,7 @@ func (queue *Queue) main(master <-chan queueMessage) {
log.Println("Ignoring message from wrong source", qm.Msg)
break
}
delete(queue.jobs, id)
delete(queue.Jobs, id)
delete(pool.Running, id)
if job.Origin != nil {
// job.Origin is nil if the submitting client has disconnected
Expand All @@ -257,35 +275,51 @@ func (queue *Queue) main(master <-chan queueMessage) {
case closedMsg:
log.Print("Client ", qm.Client.Id, ": disconnected.")
close(qm.Client.Response)
delete(queue.clients, qm.Client.Id)
delete(queue.Clients, qm.Client.Id)
for _, job := range qm.Client.Running {
if job.Origin == nil {
// Submitter disconnected, we can discard the job.
delete(queue.jobs, job.Id)
delete(queue.Jobs, job.Id)
} else {
// Otherwise, reschedule it.
job.Pool = nil
job.WaitingElement = queue.waiting.PushFront(job)
job.WaitingElement = queue.Waiting.PushFront(job)
}
}
for _, job := range qm.Client.Submitted {
if job.WaitingElement != nil {
// Job is in waiting queue, discard it.
queue.waiting.Remove(job.WaitingElement)
delete(queue.jobs, job.Id)
queue.Waiting.Remove(job.WaitingElement)
delete(queue.Jobs, job.Id)
} else if job.Pool != nil {
// Job is running, abort it.
job.Origin = nil
job.Pool.Response <- pythia.Message{
Message: pythia.AbortMsg,
Id: job.Id,
}
// Keep job in queue.jobs to handle abort result
// Keep job in queue.Jobs to handle abort result
}
}
case quitMsg:
log.Println("Quitting.")
goto quit

case pythia.StatusMsg:
status := fillQueueStatus(queue)
id := qm.Msg.Id
serializedStatus, err := json.Marshal(status)
if err != nil {
log.Fatal("Queue is in an invalid state")
log.Fatal(err)
}
qm.Client.Response <- pythia.Message{
Message: pythia.DoneMsg,
Id: id,
Status: pythia.Success,
Output: string(serializedStatus),
}
log.Println("Client ", qm.Client.Id, " : Status sent")
default:
log.Fatal("Invalid internal message", qm.Msg)
}
Expand All @@ -295,19 +329,19 @@ func (queue *Queue) main(master <-chan queueMessage) {
}

quit:
if len(queue.clients) == 0 {
if len(queue.Clients) == 0 {
return
}
for _, client := range queue.clients {
for _, client := range queue.Clients {
close(client.Response)
}
// Wait for all clients to quit. We flush messages from the master channel
// Wait for all Clients to quit. We flush messages from the master channel
// to ensure no connection handler is in a deadlock.
for qm := range master {
switch qm.Msg.Message {
case closedMsg:
delete(queue.clients, qm.Client.Id)
if len(queue.clients) == 0 {
delete(queue.Clients, qm.Client.Id)
if len(queue.Clients) == 0 {
return
}
default:
Expand All @@ -320,17 +354,17 @@ quit:
// This function shall be called from the main goroutine, as it manipulates
// the queue data structures.
func (queue *Queue) schedule() {
if queue.waiting.Len() == 0 {
if queue.Waiting.Len() == 0 {
return
}
for _, client := range queue.clients {
for _, client := range queue.Clients {
for len(client.Running) < client.Capacity {
job := queue.waiting.Remove(queue.waiting.Front()).(*queueJob)
job := queue.Waiting.Remove(queue.Waiting.Front()).(*queueJob)
job.WaitingElement = nil
job.Pool = client
client.Running[job.Id] = job
client.Response <- job.Msg
if queue.waiting.Len() == 0 {
if queue.Waiting.Len() == 0 {
return
}
}
Expand Down Expand Up @@ -361,6 +395,8 @@ func (queue *Queue) handle(conn *pythia.Conn, client *queueClient, response chan
queue.master <- queueMessage{msg, client}
case pythia.DoneMsg:
queue.master <- queueMessage{msg, client}
case pythia.StatusMsg:
queue.master <- queueMessage{msg, client}
default:
log.Println("Ignoring message", msg)
}
Expand All @@ -374,10 +410,40 @@ func (queue *Queue) handle(conn *pythia.Conn, client *queueClient, response chan
case pythia.DoneMsg:
msg.Id = msg.Id[strings.Index(msg.Id, ":")+1:]
conn.Send(msg)
case pythia.StatusMsg:
conn.Send(msg)
default:
log.Fatal("Invalid internal message", msg)
}
}
}

func convertClientsToSlice(clients map[int]*queueClient) (clientsSlice []*queueClient) {
clientsSlice = make([]*queueClient, 0)
for _, element := range clients {
clientsSlice = append(clientsSlice, element)
}
return clientsSlice
}

func convertJobsToSlice(jobs map[string]*queueJob) (jobsSlice []*queueJob) {
jobsSlice = make([]*queueJob, 0)
for _, element := range jobs {
jobsSlice = append(jobsSlice, element)
}
return jobsSlice
}

// Return a QueueStatus struct filled with information coming from the Queue
func fillQueueStatus(queue *Queue) (status QueueStatus) {
status.Capacity = queue.Capacity
status.Available = queue.Capacity - len(queue.Jobs) - queue.Waiting.Len()
status.Clients = convertClientsToSlice(queue.Clients)
status.Jobs = convertJobsToSlice(queue.Jobs)
status.Waiting = queue.Waiting
status.CreationDate = queue.CreationDate

return status
}

// vim:set ts=4 sw=4 noet:
45 changes: 45 additions & 0 deletions go/src/pythia/backend/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package backend

import (
"encoding/json"
"pythia"
"reflect"
"strconv"
"testing"
"testutils"
"testutils/pytest"
Expand Down Expand Up @@ -114,4 +117,46 @@ func TestQueueSimple(t *testing.T) {
f.TearDown()
}

func TestQueueStatus(t *testing.T) {
f := SetupQueueFixture(t, 500, 2)
frontend := f.Clients[0]

frontend.Send(pythia.Message{
Message: pythia.StatusMsg,
Id: "test",
})

// Removing Clients array from Message as this part will differ due to client list
// referencing the test client as a client but said client is no longer connected
// when the status is emitted
status := fillQueueStatus(f.Queue)
status.Clients = make([]*queueClient, 0)

msg := <-frontend.Conn.Receive()
if msg.Message != pythia.DoneMsg || msg.Id != "test" || msg.Status != pythia.Success {
t.Fatal("Message content mismatching")
}

var expected QueueStatus
expected.Clients = make([]*queueClient, 0)
json.Unmarshal([]byte(msg.Output), &expected)

// The content of the Waiting list is not compared because it's not efficient
// and it's not really interesting because the list is supposed to be empty
if expected.Capacity != status.Capacity ||
expected.Available != status.Available ||
!reflect.DeepEqual(expected.Jobs, status.Jobs) ||
!reflect.DeepEqual(expected.Waiting.Len(), status.Waiting.Len()) ||
!expected.CreationDate.Equal(status.CreationDate) {

t.Error("Capacity : " + strconv.FormatBool(expected.Capacity != status.Capacity))
t.Error("Available : " + strconv.FormatBool(expected.Available != status.Available))
t.Error("Jobs : " + strconv.FormatBool(!reflect.DeepEqual(expected.Jobs, status.Jobs)))
t.Error("Waiting : " + strconv.FormatBool(!reflect.DeepEqual(expected.Waiting, status.Waiting)))
t.Error("CreationDate : " + strconv.FormatBool(!expected.CreationDate.Equal(status.CreationDate)))
}

f.TearDown()
}

// vim:set sw=4 ts=4 noet:
25 changes: 25 additions & 0 deletions go/src/pythia/frontend/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (server *Server) Run() {
}()
// Start the web server
http.HandleFunc("/execute", handler)
http.HandleFunc("/status", statusHandler)
log.Println("Server listening on", server.Port)
if err := http.ListenAndServe(fmt.Sprint(":", server.Port), nil); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -140,4 +141,28 @@ func handler(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}

// Handle for /status route to get the status of the Queue
func statusHandler(rw http.ResponseWriter, req *http.Request) {
log.Println("Client connected: ", req.URL)
if req.Method != "GET" {
rw.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Connection to the pool
conn := pythia.DialRetry(pythia.QueueAddr)
defer conn.Close()
conn.Send(pythia.Message{
Message: pythia.StatusMsg,
})
if msg, ok := <-conn.Receive(); ok {
switch msg.Status {
case "success":
rw.Header().Set("Content-Type", "application/json")
fmt.Fprintf(rw, msg.Output)
}
return
}
rw.WriteHeader(http.StatusInternalServerError)
}

// vim:set sw=4 ts=4 noet:
Loading