/
job.go
124 lines (106 loc) · 2.89 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package worker
import (
"strconv"
"time"
"github.com/robertkrimen/otto"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
)
//JobMeta Struct that represents a job.
type JobMeta struct {
Key string
Stopped bool
vm *otto.Otto
cron *cron.Cron
cronString string
}
func (jm *JobMeta) getVM() *otto.Otto {
return jm.vm
}
func (jm *JobMeta) getStatus(w *worker) (status string) {
status = w.Client.HGet(ctx, jm.Key, "Status").Val()
return
}
func (jm *JobMeta) getCron(w *worker) (status string) {
status = w.Client.HGet(ctx, jm.Key, "Cron").Val()
return
}
func (jm *JobMeta) getState(w *worker) (state string) {
state = w.Client.HGet(ctx, jm.Key, "State").Val()
return
}
func (jm *JobMeta) getSource(w *worker) (source string) {
source = w.Client.HGet(ctx, jm.Key, "Source").Val()
return
}
func (jm *JobMeta) getHeartBeat(w *worker) (hb int, err error) {
hbString := w.Client.HGet(ctx, jm.Key, "Heartbeat").Val()
hb, err = strconv.Atoi(hbString)
return
}
func (jm *JobMeta) getOwner(w *worker) (owner string) {
owner = w.Client.HGet(ctx, jm.Key, "Owner").Val()
return
}
func (jm *JobMeta) schedule(w *worker) {
if jm.cron == nil || jm.cronString != jm.getCron(w) {
log.Info("Setting up job cron for ", jm.Key, " cron: ", jm.getCron(w))
jm.cron = newWithSeconds()
jm.cron.Start()
jm.cron.AddFunc(jm.getCron(w), func() {
go jm.run(w)
})
jm.cronString = jm.getCron(w)
}
}
func (jm *JobMeta) disable(w *worker) {
if jm.getOwner(w) == w.WorkerName && !jm.Stopped {
log.Info("Disabling thread ", jm.Key)
jm.Stopped = true
w.Client.HSet(ctx, jm.Key, "State", STOPPED)
w.Client.HSet(ctx, jm.Key, "Status", DISABLED)
if jm.vm != nil {
jm.vm.Interrupt <- func() {
log.Error("Disabled thread")
return
}
}
}
}
func (jm *JobMeta) run(w *worker) {
log.Info("Starting job ", jm.Key)
if jm.getStatus(w) == DISABLED {
jm.cron.Stop()
jm.cron = nil
log.Info("Job disabled ", jm.Key)
}
if jm.getOwner(w) == "" {
jm.Stopped = false
w.Client.HSet(ctx, jm.Key, "State", RUNNING)
w.Client.HSet(ctx, jm.Key, "Heartbeat", time.Now().UnixNano())
w.Client.HSet(ctx, jm.Key, "Owner", w.WorkerName)
jm.vm = otto.New()
jm.vm.Interrupt = make(chan func(), 1)
applyLibrary(w, jm)
source := jm.getSource(w)
if source == "" {
log.Error("Source empty for thread ", jm.Key)
return
}
//Check one last time to make sure someone didn't beat us.
if jm.getOwner(w) == w.WorkerName {
//Get whole script in memory.
_, err := jm.vm.Run(source)
if err != nil {
w.Client.HSet(ctx, jm.Key, "State", CRASHED)
w.Client.HSet(ctx, jm.Key, "Status", DISABLED)
w.Client.HSet(ctx, jm.Key, "Error", err.Error())
w.Client.HSet(ctx, jm.Key, "ErrorTime", time.Now())
log.WithError(err).Error("Syntax error in script.")
return
}
w.Client.HSet(ctx, jm.Key, "State", STOPPED)
w.Client.HSet(ctx, jm.Key, "Owner", "")
}
}
}