-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
Copy pathsweeper.go
97 lines (80 loc) · 2.33 KB
/
sweeper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright 2018 The Harbor Authors. All rights reserved.
package period
import (
"fmt"
"time"
"github.com/gocraft/work"
"github.com/garyburd/redigo/redis"
"github.com/vmware/harbor/src/jobservice/logger"
"github.com/vmware/harbor/src/jobservice/utils"
)
//Sweeper take charge of clearing the outdated data such as scheduled jobs etc..
//Currently, only used in redis worker pool.
type Sweeper struct {
redisPool *redis.Pool
client *work.Client
namespace string
}
//NewSweeper is constructor of Sweeper.
func NewSweeper(namespace string, pool *redis.Pool, client *work.Client) *Sweeper {
return &Sweeper{
namespace: namespace,
redisPool: pool,
client: client,
}
}
//ClearOutdatedScheduledJobs clears the outdated scheduled jobs.
//Try best to do
func (s *Sweeper) ClearOutdatedScheduledJobs() error {
//Check if other workpool has done the action
conn := s.redisPool.Get()
defer conn.Close()
//Lock
r, err := conn.Do("SET", utils.KeyPeriodicLock(s.namespace), time.Now().Unix(), "EX", 30, "NX")
defer func() {
//Make sure it can be unlocked if it is not expired yet
if _, err := conn.Do("DEL", utils.KeyPeriodicLock(s.namespace)); err != nil {
logger.Errorf("Unlock key '%s' failed with error: %s\n", utils.KeyPeriodicLock(s.namespace), err.Error())
}
}()
if err != nil {
return err
}
if r == nil {
//Action is already locked by other workerpool
logger.Info("Ignore clear outdated scheduled jobs")
return nil
}
nowEpoch := time.Now().Unix()
jobScores, err := utils.GetZsetByScore(s.redisPool, utils.RedisKeyScheduled(s.namespace), []int64{0, nowEpoch})
if err != nil {
return err
}
allErrors := make([]error, 0)
for _, jobScore := range jobScores {
j, err := utils.DeSerializeJob(jobScore.JobBytes)
if err != nil {
allErrors = append(allErrors, err)
continue
}
if err = s.client.DeleteScheduledJob(jobScore.Score, j.ID); err != nil {
allErrors = append(allErrors, err)
}
logger.Infof("Clear outdated scheduled job: %s run at %#v\n", j.ID, time.Unix(jobScore.Score, 0).String())
}
//Unlock
if len(allErrors) == 0 {
return nil
}
if len(allErrors) == 1 {
return allErrors[0]
}
errorSummary := allErrors[0].Error()
for index, e := range allErrors {
if index == 0 {
continue
}
errorSummary = fmt.Sprintf("%s, %s", errorSummary, e)
}
return fmt.Errorf("%s", errorSummary)
}