-
Notifications
You must be signed in to change notification settings - Fork 11
/
cron.go
69 lines (62 loc) · 2.12 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package app
import (
"github.com/go-co-op/gocron/v2"
"github.com/jitsucom/bulker/jitsubase/appbase"
"github.com/jitsucom/bulker/jitsubase/utils"
"math/rand"
"time"
)
type Cron struct {
appbase.Service
config *Config
scheduler gocron.Scheduler
}
func NewCron(config *Config) *Cron {
base := appbase.NewServiceBase("cron")
s, _ := gocron.NewScheduler(gocron.WithLocation(time.UTC))
s.Start()
return &Cron{Service: base, scheduler: s, config: config}
}
func (c *Cron) AddBatchConsumer(batchConsumer BatchConsumer) (gocron.Job, error) {
options := []gocron.JobOption{gocron.WithTags(batchConsumer.TopicId())}
if batchConsumer.BatchPeriodSec() > 1 {
//randomize start time to avoid all batch run at the same time
delay := rand.Intn(batchConsumer.BatchPeriodSec())
//don't do small delays. gocron doesn't like StartDateTime at past. with small delays that may be possible
if delay > 5 {
options = append(options, gocron.WithStartAt(gocron.WithStartDateTime(time.Now().Add(time.Duration(delay)*time.Second))))
}
}
return c.scheduler.NewJob(gocron.DurationJob(time.Duration(batchConsumer.BatchPeriodSec())*time.Second),
gocron.NewTask(batchConsumer.RunJob),
options...)
}
func (c *Cron) ReplaceBatchConsumer(batchConsumer BatchConsumer) (gocron.Job, error) {
c.RemoveBatchConsumer(batchConsumer)
return c.AddBatchConsumer(batchConsumer)
}
func (c *Cron) RemoveBatchConsumer(batchConsumer BatchConsumer) {
jobs := c.scheduler.Jobs()
for _, job := range jobs {
if utils.ArrayContains(job.Tags(), batchConsumer.TopicId()) {
err := c.scheduler.RemoveJob(job.ID())
if err != nil {
c.Errorf("Error removing job for[%s] id: %s: %v", batchConsumer.TopicId(), job.ID(), err)
}
}
}
}
// Close scheduler
func (c *Cron) Close() {
stopped := make(chan struct{})
go func() {
_ = c.scheduler.Shutdown()
close(stopped)
}()
select {
case <-stopped:
c.Infof("Cron scheduler stopped")
case <-time.After(time.Duration(c.config.ShutdownTimeoutSec) * time.Second):
c.Warnf("Shutdown timeout [%ds] expired. Scheduler will be stopped forcibly. Active batches may be interrupted abruptly", c.config.ShutdownTimeoutSec)
}
}