-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
cron.go
90 lines (76 loc) · 2.3 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package cron
import (
"context"
"fmt"
"github.com/robfig/cron/v3"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
// Cron runs a cron jobSpec from a CronSpec
type Cron struct {
cronRunner *cron.Cron
logger logger.Logger
jobSpec job.Job
pipelineRunner pipeline.Runner
chStop utils.StopChan
}
// NewCronFromJobSpec instantiates a job that executes on a predefined schedule.
func NewCronFromJobSpec(
jobSpec job.Job,
pipelineRunner pipeline.Runner,
logger logger.Logger,
) (*Cron, error) {
cronLogger := logger.Named("Cron").With(
"jobID", jobSpec.ID,
"schedule", jobSpec.CronSpec.CronSchedule,
)
return &Cron{
cronRunner: cronRunner(),
logger: cronLogger,
jobSpec: jobSpec,
pipelineRunner: pipelineRunner,
chStop: make(chan struct{}),
}, nil
}
// Start implements the job.Service interface.
func (cr *Cron) Start(context.Context) error {
cr.logger.Debug("Starting")
_, err := cr.cronRunner.AddFunc(cr.jobSpec.CronSpec.CronSchedule, cr.runPipeline)
if err != nil {
cr.logger.Errorw(fmt.Sprintf("Error running cron job %d", cr.jobSpec.ID), "err", err, "schedule", cr.jobSpec.CronSpec.CronSchedule, "jobID", cr.jobSpec.ID)
return err
}
cr.cronRunner.Start()
return nil
}
// Close implements the job.Service interface. It stops this job from
// running and cleans up resources.
func (cr *Cron) Close() error {
cr.logger.Debug("Closing")
cr.cronRunner.Stop()
return nil
}
func (cr *Cron) runPipeline() {
ctx, cancel := cr.chStop.NewCtx()
defer cancel()
vars := pipeline.NewVarsFrom(map[string]interface{}{
"jobSpec": map[string]interface{}{
"databaseID": cr.jobSpec.ID,
"externalJobID": cr.jobSpec.ExternalJobID,
"name": cr.jobSpec.Name.ValueOrZero(),
},
"jobRun": map[string]interface{}{
"meta": map[string]interface{}{},
},
})
run := pipeline.NewRun(*cr.jobSpec.PipelineSpec, vars)
_, err := cr.pipelineRunner.Run(ctx, &run, cr.logger, false, nil)
if err != nil {
cr.logger.Errorf("Error executing new run for jobSpec ID %v", cr.jobSpec.ID)
}
}
func cronRunner() *cron.Cron {
return cron.New(cron.WithSeconds())
}