-
Notifications
You must be signed in to change notification settings - Fork 0
/
alternate_policy.go
251 lines (204 loc) · 5.83 KB
/
alternate_policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
package policy
import (
"errors"
"fmt"
"sync"
"time"
"github.com/vmware/harbor/src/common/scheduler/task"
"github.com/vmware/harbor/src/common/utils/log"
)
const (
oneDay = 24 * 3600
)
//AlternatePolicyConfiguration store the related configurations for alternate policy.
type AlternatePolicyConfiguration struct {
//Duration is the interval of executing attached tasks.
//E.g: 24*3600 for daily
// 7*24*3600 for weekly
Duration time.Duration
//An integer to indicate the the weekday of the week. Please be noted that Sunday is 7.
//Use default value 0 to indicate weekday is not set.
//To support by weekly function.
Weekday int8
//OffsetTime is the execution time point of each turn
//It's a number to indicate the seconds offset to the 00:00 of UTC time.
OffsetTime int64
}
//AlternatePolicy is a policy that repeatedly executing tasks with specified duration during a specified time scope.
type AlternatePolicy struct {
//To sync the related operations.
*sync.RWMutex
//Keep the attached tasks.
tasks task.Store
//Policy configurations.
config *AlternatePolicyConfiguration
//To indicated whether policy is enabled or not.
isEnabled bool
//Channel used to send evaluation result signals.
evaluation chan bool
//Channel used to notify policy termination.
done chan bool
//Channel used to receive terminate signal.
terminator chan bool
//Unique name of this policy to support multiple instances
name string
}
//NewAlternatePolicy is constructor of creating AlternatePolicy.
//Accept name and configuration as parameters.
func NewAlternatePolicy(name string, config *AlternatePolicyConfiguration) *AlternatePolicy {
return &AlternatePolicy{
RWMutex: new(sync.RWMutex),
tasks: task.NewDefaultStore(),
config: config,
isEnabled: false,
terminator: make(chan bool),
name: name,
}
}
//GetConfig returns the current configuration options of this policy.
func (alp *AlternatePolicy) GetConfig() *AlternatePolicyConfiguration {
return alp.config
}
//Name is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Name() string {
return alp.name
}
//Tasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Tasks() []task.Task {
return alp.tasks.GetTasks()
}
//Done is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Done() <-chan bool {
return alp.done
}
//AttachTasks is an implementation of same method in policy interface.
func (alp *AlternatePolicy) AttachTasks(tasks ...task.Task) error {
if len(tasks) == 0 {
return errors.New("No tasks can be attached")
}
alp.tasks.AddTasks(tasks...)
return nil
}
//Disable is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Disable() error {
alp.Lock()
if !alp.isEnabled {
alp.Unlock()
return fmt.Errorf("Instance of policy %s is not enabled", alp.Name())
}
//Set state to disabled
alp.isEnabled = false
alp.Unlock()
//Stop the evaluation goroutine
alp.terminator <- true
return nil
}
//Evaluate is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Evaluate() (<-chan bool, error) {
//Lock for state changing
defer alp.Unlock()
alp.Lock()
//Check if configuration is valid
if !alp.isValidConfig() {
return nil, errors.New("Policy configuration is not valid")
}
//Check if policy instance is still running
if alp.isEnabled {
return nil, fmt.Errorf("Instance of policy %s is still running", alp.Name())
}
//Keep idempotent
if alp.evaluation != nil {
return alp.evaluation, nil
}
alp.done = make(chan bool)
alp.evaluation = make(chan bool)
go func() {
var (
waitingTime int64
)
timeNow := time.Now().UTC()
//Reach the execution time point?
//Weekday is set
if alp.config.Weekday > 0 {
targetWeekday := (alp.config.Weekday + 7) % 7
currentWeekday := timeNow.Weekday()
weekdayDiff := (int)(targetWeekday - (int8)(currentWeekday))
if weekdayDiff < 0 {
weekdayDiff += 7
}
waitingTime = (int64)(weekdayDiff * oneDay)
}
//Time
utcTime := (int64)(timeNow.Hour()*3600 + timeNow.Minute()*60)
diff := alp.config.OffsetTime - utcTime
if waitingTime > 0 {
waitingTime += diff
} else {
waitingTime = diff
if waitingTime < 0 {
waitingTime += oneDay
}
}
//Let's wait for a while
if waitingTime > 0 {
//Wait for a while.
log.Infof("Waiting for %d seconds after comparing offset %d and utc time %d\n", diff, alp.config.OffsetTime, utcTime)
select {
case <-time.After(time.Duration(waitingTime) * time.Second):
case <-alp.terminator:
return
}
}
//Trigger the first tick.
alp.evaluation <- true
//Start the ticker for repeat checking.
tk := time.NewTicker(alp.config.Duration)
defer func() {
if tk != nil {
tk.Stop()
}
}()
for {
select {
case <-tk.C:
if alp.IsEnabled() {
alp.evaluation <- true
}
case <-alp.terminator:
return
}
}
}()
//Enabled
alp.isEnabled = true
return alp.evaluation, nil
}
//Equal is an implementation of same method in policy interface.
func (alp *AlternatePolicy) Equal(p Policy) bool {
if p == nil {
return false
}
pl, ok := p.(*AlternatePolicy)
if !ok {
return false
}
cfg := pl.GetConfig()
cfg2 := alp.GetConfig()
if (cfg == nil && cfg2 != nil) || (cfg != nil && cfg2 == nil) {
return false
}
return cfg == nil ||
(cfg.Duration == cfg2.Duration &&
cfg.OffsetTime == cfg2.OffsetTime &&
cfg.Weekday == cfg2.Weekday)
}
//IsEnabled is an implementation of same method in policy interface.
func (alp *AlternatePolicy) IsEnabled() bool {
defer alp.RUnlock()
alp.RLock()
return alp.isEnabled
}
//Check if the config is valid. At least it should have the configurations for supporting daily policy.
func (alp *AlternatePolicy) isValidConfig() bool {
return alp.config != nil && alp.config.Duration > 0 && alp.config.OffsetTime >= 0
}