-
Notifications
You must be signed in to change notification settings - Fork 0
/
enqueuer.go
318 lines (273 loc) · 9.15 KB
/
enqueuer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package period
import (
"fmt"
"math/rand"
"time"
"context"
"github.com/gocraft/work"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/lcm"
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/gomodule/redigo/redis"
"github.com/robfig/cron"
)
const (
enqueuerSleep = 2 * time.Minute
enqueuerHorizon = 4 * time.Minute
neverExecuted = 365 * 24 * time.Hour
// PeriodicExecutionMark marks the scheduled job to a periodic execution
PeriodicExecutionMark = "_job_kind_periodic_"
)
type enqueuer struct {
namespace string
context context.Context
pool *redis.Pool
policyStore *policyStore
ctl lcm.Controller
// Diff with other nodes
nodeID string
// Track the error of enqueuing
lastEnqueueErr error
// For stop
stopChan chan bool
}
func newEnqueuer(ctx context.Context, namespace string, pool *redis.Pool, ctl lcm.Controller) *enqueuer {
nodeID := ctx.Value(utils.NodeID)
if nodeID == nil {
// Must be failed
panic("missing node ID in the system context of periodic enqueuer")
}
return &enqueuer{
context: ctx,
namespace: namespace,
pool: pool,
policyStore: newPolicyStore(ctx, namespace, pool),
ctl: ctl,
stopChan: make(chan bool, 1),
nodeID: nodeID.(string),
}
}
// Blocking call
func (e *enqueuer) start() error {
// Load policies first when starting
if err := e.policyStore.load(); err != nil {
return err
}
go e.loop()
logger.Info("Periodic enqueuer is started")
return e.policyStore.serve()
}
func (e *enqueuer) loop() {
defer func() {
logger.Info("Periodic enqueuer is stopped")
}()
// Do enqueue immediately when starting
isHit := e.checkAndEnqueue()
// Begin reaping periodically
timer := time.NewTimer(e.nextTurn(isHit, e.lastEnqueueErr != nil))
defer timer.Stop()
for {
select {
case <-e.stopChan:
// Stop policy store now
e.policyStore.stopChan <- true
return
case <-timer.C:
// Pause the timer for completing the processing this time
timer.Reset(neverExecuted)
// Check and enqueue.
// Set next turn with lower priority to balance workload with long
// round time if it hits.
isHit = e.checkAndEnqueue()
timer.Reset(e.nextTurn(isHit, e.lastEnqueueErr != nil))
}
}
}
// checkAndEnqueue checks if it should do enqueue and
// does enqueue when condition hit.
func (e *enqueuer) checkAndEnqueue() (isHit bool) {
if isHit = e.shouldEnqueue(); isHit {
e.enqueue()
}
return
}
// nextTurn returns the next check time slot by applying
// priorities to balance the workloads across multiple nodes
func (e *enqueuer) nextTurn(isHit bool, enqErr bool) time.Duration {
base := enqueuerSleep
if isHit {
// Down the hit priority by adding more waiting time
base = base + time.Duration(3)*time.Second
if enqErr {
// Downgrade the priority if the node has occurred error when enqueuing
base = base + time.Duration(5)*time.Second
}
} else {
// Upgrade the priority of hitting in the next turn
base = base - time.Duration(3)*time.Second
}
// Add random waiting time [0,8)
base = base + time.Duration(rand.Intn(5))*time.Second
return base
}
func (e *enqueuer) enqueue() {
conn := e.pool.Get()
defer func() {
_ = conn.Close()
}()
// Reset error track
e.lastEnqueueErr = nil
e.policyStore.Iterate(func(id string, p *Policy) bool {
e.scheduleNextJobs(p, conn)
return true
})
}
// scheduleNextJobs schedules job for next time slots based on the policy
func (e *enqueuer) scheduleNextJobs(p *Policy, conn redis.Conn) {
nowTime := time.Unix(time.Now().Unix(), 0)
horizon := nowTime.Add(enqueuerHorizon)
schedule, err := cron.Parse(p.CronSpec)
if err != nil {
// The cron spec should be already checked at upper layers.
// Just in cases, if error occurred, ignore it
e.lastEnqueueErr = err
logger.Errorf("Invalid corn spec in periodic policy %s %s: %s", p.JobName, p.ID, err)
} else {
if p.JobParameters == nil {
p.JobParameters = make(job.Parameters)
}
// Clone job parameters
wJobParams := make(job.Parameters)
if p.JobParameters != nil && len(p.JobParameters) > 0 {
for k, v := range p.JobParameters {
wJobParams[k] = v
}
}
// Add extra argument for job running
// Notes: Only for system using
wJobParams[PeriodicExecutionMark] = true
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
// Create an execution (job) based on the periodic job template (policy)
j := &work.Job{
Name: p.JobName,
ID: p.ID, // Use the ID of policy to avoid scheduling duplicated periodic job executions.
// This is technically wrong, but this lets the bytes be identical for the same periodic job instance.
// If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own
// history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
EnqueuedAt: epoch,
// Pass parameters to scheduled job here
Args: wJobParams,
}
rawJSON, err := utils.SerializeJob(j)
if err != nil {
e.lastEnqueueErr = err
// Actually this error should not happen if the object struct is well defined
logger.Errorf("Serialize job object for periodic job %s error: %s", p.ID, err)
break
}
// Persistent execution first.
// Please pay attention that the job has not been really scheduled yet.
// If job data is failed to persistent, then job schedule should be abandoned.
execution := e.createExecution(p, epoch)
eTracker, err := e.ctl.New(execution)
if err != nil {
e.lastEnqueueErr = err
logger.Errorf("Save stats data of job execution '%s' error: %s", execution.Info.JobID, err)
break
}
// Put job to the scheduled job queue
_, err = conn.Do("ZADD", rds.RedisKeyScheduled(e.namespace), epoch, rawJSON)
if err != nil {
e.lastEnqueueErr = err
logger.Errorf("Put the execution of the periodic job '%s' to the scheduled job queue error: %s", p.ID, err)
// Mark job status to be error
// If this happened, the job stats is definitely becoming dirty data at job service side.
// For the consumer side, the retrying of web hook may fix the problem.
if err := eTracker.Fail(); err != nil {
e.lastEnqueueErr = err
logger.Errorf("Mark execution '%s' to failure status error: %s", execution.Info.JobID, err)
}
break // Probably redis connection is broken
}
logger.Debugf("Scheduled execution for periodic job %s:%s at %d", j.Name, p.ID, epoch)
}
}
}
// createExecution creates execution object
func (e *enqueuer) createExecution(p *Policy, runAt int64) *job.Stats {
eID := fmt.Sprintf("%s@%d", p.ID, runAt)
return &job.Stats{
Info: &job.StatsInfo{
JobID: eID,
JobName: p.JobName,
WebHookURL: p.WebHookURL,
CronSpec: p.CronSpec,
UpstreamJobID: p.ID,
RunAt: runAt,
Status: job.ScheduledStatus.String(),
JobKind: job.KindScheduled, // For periodic job execution, it should be set to 'scheduled'
EnqueueTime: time.Now().Unix(),
RefLink: fmt.Sprintf("/api/v1/jobs/%s", eID),
Parameters: p.JobParameters,
},
}
}
func (e *enqueuer) shouldEnqueue() bool {
conn := e.pool.Get()
defer func() {
_ = conn.Close()
}()
// Acquired a lock before doing checking
// If failed, directly returns false.
lockKey := rds.KeyPeriodicLock(e.namespace)
if err := rds.AcquireLock(conn, lockKey, e.nodeID, 30); err != nil {
logger.Errorf("acquire lock for periodic enqueuing error: %s", err)
return false
}
// Acquired lock
// For lock releasing
defer func() {
if err := rds.ReleaseLock(conn, lockKey, e.nodeID); err != nil {
logger.Errorf("release lock for periodic enqueuing error: %s", err)
}
}()
shouldEnq := false
lastEnqueue, err := redis.Int64(conn.Do("GET", rds.RedisKeyLastPeriodicEnqueue(e.namespace)))
if err != nil {
if err.Error() != redis.ErrNil.Error() {
// Logged error
logger.Errorf("get timestamp of last enqueue error: %s", err)
}
// Should enqueue
shouldEnq = true
} else {
// Check further condition
shouldEnq = lastEnqueue < (time.Now().Unix() - int64(enqueuerSleep/time.Minute)*60)
}
if shouldEnq {
// Set last periodic enqueue timestamp
if _, err := conn.Do("SET", rds.RedisKeyLastPeriodicEnqueue(e.namespace), time.Now().Unix()); err != nil {
logger.Errorf("set last periodic enqueue timestamp error: %s", err)
}
// Anyway the action should be enforced
// The negative effect of this failure is just more re-enqueues by other nodes
return true
}
return false
}