-
Notifications
You must be signed in to change notification settings - Fork 10
/
job.go
219 lines (187 loc) · 5.35 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package scheduler
import (
"fmt"
"sync"
"time"
"github.com/n9e/n9e-agentd/staging/datadog-agent/pkg/collector/check"
"github.com/n9e/n9e-agentd/staging/datadog-agent/pkg/status/health"
"k8s.io/klog/v2"
)
type jobBucket struct {
jobs []check.Check
mu sync.RWMutex // to protect critical sections in struct's fields
}
func (jb *jobBucket) size() int {
jb.mu.RLock()
defer jb.mu.RUnlock()
return len(jb.jobs)
}
func (jb *jobBucket) addJob(c check.Check) {
jb.mu.Lock()
defer jb.mu.Unlock()
jb.jobs = append(jb.jobs, c)
}
// removeJob removes the check from the bucket, and returns
// whether the check was indeed in the bucket (and therefore actually removed)
func (jb *jobBucket) removeJob(id check.ID) bool {
jb.mu.Lock()
defer jb.mu.Unlock()
for i, c := range jb.jobs {
if c.ID() == id {
// delete the check from the jobs slice, making sure the backing array
// doesn't keep a reference to the check, so that it can be GC'ed.
// Logic from https://github.com/golang/go/wiki/SliceTricks
copy(jb.jobs[i:], jb.jobs[i+1:])
jb.jobs[len(jb.jobs)-1] = nil
jb.jobs = jb.jobs[:len(jb.jobs)-1]
return true
}
}
return false
}
// jobQueue contains a list of checks (called jobs) that need to be
// scheduled at a certain interval.
type jobQueue struct {
interval time.Duration
stop chan bool // to stop this queue
stopped chan bool // signals that this queue has stopped
buckets []*jobBucket
bucketTicker *time.Ticker
lastTick time.Time
sparseStep uint
currentBucketIdx uint
schedulingBucketIdx uint
running bool
health *health.Handle
mu sync.RWMutex // to protect critical sections in struct's fields
}
// newJobQueue creates a new jobQueue instance
func newJobQueue(interval time.Duration) *jobQueue {
jq := &jobQueue{
interval: interval,
stop: make(chan bool),
stopped: make(chan bool),
health: health.RegisterLiveness("collector-queue"),
bucketTicker: time.NewTicker(time.Second),
}
var nb int
if interval <= time.Second {
nb = 1
} else {
nb = int(interval.Truncate(time.Second).Seconds())
}
for i := 0; i < nb; i++ {
bucket := &jobBucket{}
jq.buckets = append(jq.buckets, bucket)
}
// compute step for sparse scheduling
if nb <= 2 {
jq.sparseStep = uint(1)
} else {
switch nb % 2 {
case 0:
step := nb / 2
switch step % 2 {
case 0:
jq.sparseStep = uint(step - 1)
case 1:
jq.sparseStep = uint(step - 2)
}
case 1:
jq.sparseStep = uint(nb / 2)
}
}
return jq
}
// addJob is a convenience method to add a check to a queue
func (jq *jobQueue) addJob(c check.Check) {
jq.mu.Lock()
defer jq.mu.Unlock()
// Checks scheduled to buckets scheduled with sparse round-robin
jq.buckets[jq.schedulingBucketIdx].addJob(c)
jq.schedulingBucketIdx = (jq.schedulingBucketIdx + jq.sparseStep) % uint(len(jq.buckets))
}
func (jq *jobQueue) removeJob(id check.ID) error {
jq.mu.Lock()
defer jq.mu.Unlock()
for _, bucket := range jq.buckets {
if found := bucket.removeJob(id); found {
return nil
}
}
return fmt.Errorf("check with id %s is not in this Job Queue", id)
}
func (jq *jobQueue) stats() map[string]interface{} {
jq.mu.RLock()
defer jq.mu.RUnlock()
nJobs := 0
nBuckets := 0
for _, bucket := range jq.buckets {
nJobs += bucket.size()
nBuckets++
}
return map[string]interface{}{
"Interval": jq.interval / time.Second,
"Buckets": nBuckets,
"Size": nJobs,
}
}
// run schedules the checks in the queue by posting them to the
// execution pipeline.
// Not blocking, runs in a new goroutine.
func (jq *jobQueue) run(s *Scheduler) {
go func() {
klog.V(5).Infof("Job queue is running...")
for jq.process(s) {
// empty
}
jq.stopped <- true
}()
}
// process enqueues the checks at a tick, and returns whether the queue
// should listen to the following tick (or stop)
func (jq *jobQueue) process(s *Scheduler) bool {
select {
case <-jq.stop:
jq.health.Deregister() //nolint:errcheck
return false
case t := <-jq.bucketTicker.C:
klog.V(6).Infof("Bucket ticked... current index: %v", jq.currentBucketIdx)
jq.mu.Lock()
if !jq.lastTick.Equal(time.Time{}) && t.After(jq.lastTick.Add(2*time.Second)) {
klog.V(5).Infof("Previous bucket took over %v to schedule. Next checks will be running behind the schedule.", t.Sub(jq.lastTick))
}
jq.lastTick = t
bucket := jq.buckets[jq.currentBucketIdx]
jq.mu.Unlock()
bucket.mu.RLock()
// we have to copy to avoid blocking the bucket :(
// blocking could interfere with scheduling new jobs
jobs := []check.Check{}
jobs = append(jobs, bucket.jobs...)
bucket.mu.RUnlock()
klog.V(6).Infof("Jobs in bucket: %v", jobs)
for _, check := range jobs {
if !s.IsCheckScheduled(check.ID()) {
continue
}
select {
// blocking, we'll be here as long as it takes
case s.checksPipe <- check:
case <-jq.stop:
jq.health.Deregister() //nolint:errcheck
return false
}
}
jq.mu.Lock()
jq.currentBucketIdx = (jq.currentBucketIdx + 1) % uint(len(jq.buckets))
jq.mu.Unlock()
case <-jq.health.C:
// nothing
}
return true
}