-
Notifications
You must be signed in to change notification settings - Fork 0
/
manage_job.go
98 lines (84 loc) · 2.06 KB
/
manage_job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package commands
import (
"encoding/json"
"time"
"github.com/jobatator/jobatator/pkg/store"
)
// ListJobs - List all jobs in a queue
func ListJobs(cmd CmdInterface) {
queue, err := store.FindQueueBySlug(cmd.Parts[1], cmd.User.CurrentGroup, false)
if err != nil {
ReturnError(cmd, err.Error())
return
}
rawJSON, _ := json.Marshal(queue.Jobs)
ReturnString(cmd, string(rawJSON))
}
// UpdateJob - Update the state of the job
// Arguments: JOB_ID, JOB_STATUS
func UpdateJob(cmd CmdInterface) {
job, err := store.FindJob(cmd.Parts[1])
if err != nil {
ReturnError(cmd, err.Error())
return
}
if cmd.Parts[2] == store.JobInProgress {
job.State = store.JobInProgress
job.StartedProcessingAt = time.Now()
} else if cmd.Parts[2] == store.JobDone {
job.State = store.JobDone
job.EndProcessingAt = time.Now()
} else if cmd.Parts[2] == store.JobErrored {
job.State = store.JobErrored
job.Attempts = job.Attempts + 1
} else {
ReturnError(cmd, "unknown-state")
return
}
store.UpdateSession(cmd.User)
job.Update()
delay := -1
if cmd.Parts[2] == store.JobInProgress {
// set the worker as busy
cmd.User.Status = store.WorkerBusy
} else {
// free this worker
cmd.User.Status = store.WorkerAvailable
// see if this worker can work again
delay = 2 // in seconds
if job.State == store.JobErrored {
delay = 1800 // wait 30 minutes before trying again
}
if store.Options.DelayPolicy == "IGNORE" {
delay = 0
}
}
// update the worker status
for key, worker := range job.Queue.Workers {
if worker.Addr == cmd.User.Addr {
job.Queue.Workers[key] = cmd.User
}
}
job.Queue.UpdateAndKeep([]string{"Jobs"})
if delay != -1 {
go DispatchUniversalWithDelay(delay)
}
if job.State == store.JobDone {
go job.Expire(store.Options.JobTimeout)
}
ReturnString(cmd, "OK")
}
// DeleteJob - Delete a job
func DeleteJob(cmd CmdInterface) {
job, err := store.FindJob(cmd.Parts[1])
if err != nil {
ReturnError(cmd, err.Error())
return
}
err = job.Delete()
if err != nil {
ReturnError(cmd, err.Error())
return
}
ReturnString(cmd, "OK")
}