/
schedule.go
77 lines (70 loc) · 2.04 KB
/
schedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package alerting
import (
"time"
"github.com/raintank/worldping-api/pkg/alerting/jobqueue"
"github.com/raintank/worldping-api/pkg/log"
m "github.com/raintank/worldping-api/pkg/models"
"github.com/raintank/worldping-api/pkg/services/sqlstore"
"github.com/raintank/worldping-api/pkg/util"
)
// getJobs retrieves all jobs for which lastPointAt % their freq == their offset.
func getJobs(lastPointAt int64) ([]*m.AlertingJob, error) {
checks, err := sqlstore.GetChecksForAlerts(lastPointAt)
if err != nil {
return nil, err
}
jobs := make([]*m.AlertingJob, 0)
for i := range checks {
check := &checks[i]
if check.HealthSettings == nil {
continue
}
if check.Frequency == 0 || check.HealthSettings.Steps == 0 || check.HealthSettings.NumProbes == 0 {
continue
}
jobs = append(jobs, &m.AlertingJob{CheckForAlertDTO: check})
}
return jobs, nil
}
func dispatchJobs(jobQ *jobqueue.JobQueue) {
ticker := time.NewTicker(time.Second)
offsetTicker := time.NewTicker(time.Minute)
newOffsetChan := make(chan int)
offset := LoadOrSetOffset()
log.Info("Alerting using offset %d", offset)
next := time.Now().Unix() - int64(offset)
for {
select {
case lastPointAt := <-ticker.C:
for next <= lastPointAt.Unix()-int64(offset) {
pre := time.Now()
jobs, err := getJobs(next)
next++
dispatcherNumGetSchedules.Inc()
dispatcherGetSchedules.Value(util.Since(pre))
if err != nil {
log.Error(0, "Alerting failed to get jobs from DB: %q", err)
continue
}
log.Debug("%d jobs found for TS: %d", len(jobs), next)
for _, job := range jobs {
job.GeneratedAt = time.Now()
job.LastPointTs = time.Unix(next-1, 0)
jobQ.QueueJob(job)
dispatcherJobsScheduled.Inc()
}
}
case <-offsetTicker.C:
// run this in a separate goroutine so we dont block the scheduler.
go func() {
newOffset := LoadOrSetOffset()
if newOffset != offset {
newOffsetChan <- newOffset
}
}()
case newOffset := <-newOffsetChan:
log.Info("Alerting offset updated to %d", offset)
offset = newOffset
}
}
}