-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
spawner.go
322 lines (270 loc) · 8.79 KB
/
spawner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
package job
import (
"context"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
"gopkg.in/guregu/null.v4"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services/postgres"
"github.com/smartcontractkit/chainlink/core/store/models"
"github.com/smartcontractkit/chainlink/core/utils"
)
//go:generate mockery --name Spawner --output ./mocks/ --case=underscore
//go:generate mockery --name Delegate --output ./mocks/ --case=underscore
type (
// The job spawner manages the spinning up and spinning down of the long-running
// services that perform the work described by job specs. Each active job spec
// has 1 or more of these services associated with it.
//
// At present, Flux Monitor and Offchain Reporting jobs can only have a single
// "initiator", meaning that they only require a single service. But the older
// "direct request" model allows for multiple initiators, which imply multiple
// services.
Spawner interface {
Start()
Stop()
CreateJob(ctx context.Context, spec Spec, name null.String) (int32, error)
DeleteJob(ctx context.Context, jobID int32) error
RegisterDelegate(delegate Delegate)
}
spawner struct {
orm ORM
config Config
jobTypeDelegates map[Type]Delegate
jobTypeDelegatesMu sync.RWMutex
startUnclaimedServicesWorker utils.SleeperTask
services map[int32][]Service
chStopJob chan int32
utils.StartStopOnce
chStop chan struct{}
chDone chan struct{}
}
// TODO(spook): I can't wait for Go generics
Delegate interface {
JobType() Type
ToDBRow(spec Spec) models.JobSpecV2
FromDBRow(spec models.JobSpecV2) Spec
ServicesForSpec(spec Spec) ([]Service, error)
}
)
const checkForDeletedJobsPollInterval = 5 * time.Minute
var _ Spawner = (*spawner)(nil)
func NewSpawner(orm ORM, config Config) *spawner {
s := &spawner{
orm: orm,
config: config,
jobTypeDelegates: make(map[Type]Delegate),
services: make(map[int32][]Service),
chStopJob: make(chan int32),
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
s.startUnclaimedServicesWorker = utils.NewSleeperTask(
utils.SleeperTaskFuncWorker(s.startUnclaimedServices),
)
return s
}
func (js *spawner) Start() {
if !js.OkayToStart() {
logger.Error("Job spawner has already been started")
return
}
go js.runLoop()
}
func (js *spawner) Stop() {
if !js.OkayToStop() {
logger.Error("Job spawner has already been stopped")
return
}
close(js.chStop)
<-js.chDone
}
func (js *spawner) destroy() {
js.stopAllServices()
err := js.startUnclaimedServicesWorker.Stop()
if err != nil {
logger.Error(err)
}
}
func (js *spawner) RegisterDelegate(delegate Delegate) {
js.jobTypeDelegatesMu.Lock()
defer js.jobTypeDelegatesMu.Unlock()
if _, exists := js.jobTypeDelegates[delegate.JobType()]; exists {
panic("registered job type " + string(delegate.JobType()) + " more than once")
}
logger.Infof("Registered job type '%v'", delegate.JobType())
js.jobTypeDelegates[delegate.JobType()] = delegate
}
func (js *spawner) runLoop() {
defer close(js.chDone)
defer js.destroy()
// Initialize the Postgres event listener for created and deleted jobs
var newJobEvents <-chan postgres.Event
newJobs, err := js.orm.ListenForNewJobs()
if err != nil {
logger.Warn("Job spawner could not subscribe to new job events, falling back to polling")
} else {
defer newJobs.Close()
newJobEvents = newJobs.Events()
}
var pgDeletedJobEvents <-chan postgres.Event
deletedJobs, err := js.orm.ListenForDeletedJobs()
if err != nil {
logger.Warn("Job spawner could not subscribe to deleted job events")
} else {
defer deletedJobs.Close()
pgDeletedJobEvents = deletedJobs.Events()
}
// Initialize the DB poll ticker
dbPollTicker := time.NewTicker(utils.WithJitter(js.config.TriggerFallbackDBPollInterval()))
defer dbPollTicker.Stop()
// Initialize the poll that checks for deleted jobs and removes them
// This is only necessary as a fallback in case the event doesn't fire for some reason
// It doesn't need to run very often
deletedPollTicker := time.NewTicker(checkForDeletedJobsPollInterval)
defer deletedPollTicker.Stop()
ctx, cancel := utils.CombinedContext(js.chStop)
defer cancel()
js.startUnclaimedServicesWorker.WakeUp()
for {
select {
case <-newJobEvents:
js.startUnclaimedServicesWorker.WakeUp()
case <-dbPollTicker.C:
js.startUnclaimedServicesWorker.WakeUp()
case jobID := <-js.chStopJob:
js.stopService(jobID)
case <-deletedPollTicker.C:
js.checkForDeletedJobs(ctx)
case deleteJobEvent := <-pgDeletedJobEvents:
js.handlePGDeleteEvent(ctx, deleteJobEvent)
case <-js.chStop:
return
}
}
}
func (js *spawner) startUnclaimedServices() {
ctx, cancel := utils.CombinedContext(js.chStop, 5*time.Second)
defer cancel()
specDBRows, err := js.orm.ClaimUnclaimedJobs(ctx)
if err != nil {
logger.Errorf("Couldn't fetch unclaimed jobs: %v", err)
return
}
js.jobTypeDelegatesMu.RLock()
defer js.jobTypeDelegatesMu.RUnlock()
for _, specDBRow := range specDBRows {
if _, exists := js.services[specDBRow.ID]; exists {
logger.Warnw("Job spawner ORM attempted to claim locally-claimed job, skipping", "jobID", specDBRow.ID)
continue
}
var services []Service
for _, delegate := range js.jobTypeDelegates {
spec := delegate.FromDBRow(specDBRow)
if spec == nil {
// This spec isn't owned by this delegate
continue
}
moreServices, err := delegate.ServicesForSpec(spec)
if err != nil {
logger.Errorw("Error creating services for job", "jobID", specDBRow.ID, "error", err)
js.orm.RecordError(ctx, specDBRow.ID, err.Error())
continue
}
services = append(services, moreServices...)
}
logger.Infow("Starting services for job", "jobID", specDBRow.ID, "count", len(services))
for _, service := range services {
err := service.Start()
if err != nil {
logger.Errorw("Error creating service for job", "jobID", specDBRow.ID, "error", err)
continue
}
js.services[specDBRow.ID] = append(js.services[specDBRow.ID], service)
}
}
}
func (js *spawner) stopAllServices() {
for jobID := range js.services {
js.stopService(jobID)
}
}
func (js *spawner) stopService(jobID int32) {
for _, service := range js.services[jobID] {
err := service.Close()
if err != nil {
logger.Errorw("Error stopping job service", "jobID", jobID, "error", err)
} else {
logger.Infow("Stopped job service", "jobID", jobID)
}
}
delete(js.services, jobID)
}
func (js *spawner) checkForDeletedJobs(ctx context.Context) {
jobIDs, err := js.orm.CheckForDeletedJobs(ctx)
if err != nil {
logger.Errorw("failed to CheckForDeletedJobs", "err", err)
return
}
for _, jobID := range jobIDs {
js.unloadDeletedJob(ctx, jobID)
}
}
func (js *spawner) unloadDeletedJob(ctx context.Context, jobID int32) {
logger.Infow("Unloading deleted job", "jobID", jobID)
js.stopService(jobID)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := js.orm.UnclaimJob(ctx, jobID); err != nil {
logger.Errorw("Unexpected error unclaiming job", "jobID", jobID)
}
}
func (js *spawner) handlePGDeleteEvent(ctx context.Context, ev postgres.Event) {
jobIDString := ev.Payload
jobID64, err := strconv.ParseInt(jobIDString, 10, 32)
if err != nil {
logger.Errorw("Unexpected error decoding deleted job event payload, expected 32-bit integer", "payload", jobIDString, "channel", ev.Channel)
}
jobID := int32(jobID64)
js.unloadDeletedJob(ctx, jobID)
}
func (js *spawner) CreateJob(ctx context.Context, spec Spec, name null.String) (int32, error) {
js.jobTypeDelegatesMu.Lock()
defer js.jobTypeDelegatesMu.Unlock()
delegate, exists := js.jobTypeDelegates[spec.JobType()]
if !exists {
logger.Errorf("job type '%s' has not been registered with the job.Spawner", spec.JobType())
return 0, errors.Errorf("job type '%s' has not been registered with the job.Spawner", spec.JobType())
}
ctx, cancel := utils.CombinedContext(js.chStop, ctx)
defer cancel()
specDBRow := delegate.ToDBRow(spec)
specDBRow.Name = name
err := js.orm.CreateJob(ctx, &specDBRow, spec.TaskDAG())
if err != nil {
logger.Errorw("Error creating job", "type", spec.JobType(), "error", err)
return 0, err
}
logger.Infow("Created job", "type", spec.JobType(), "jobID", specDBRow.ID)
return specDBRow.ID, err
}
func (js *spawner) DeleteJob(ctx context.Context, jobID int32) error {
if jobID == 0 {
return errors.New("will not delete job with 0 ID")
}
ctx, cancel := utils.CombinedContext(js.chStop, ctx)
defer cancel()
err := js.orm.DeleteJob(ctx, jobID)
if err != nil {
logger.Errorw("Error deleting job", "jobID", jobID, "error", err)
return err
}
logger.Infow("Deleted job", "jobID", jobID)
select {
case <-js.chStop:
case js.chStopJob <- jobID:
}
return nil
}