/
director.go
235 lines (206 loc) 路 5.34 KB
/
director.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package director
import (
// "fmt"
"sync"
"time"
)
const (
FOREVER = -1
ONCE = 1
)
// A Looper is used in place of a direct call to "for {}" and implements some
// controls over how the loop will be run. The Loop() function is the main call
// used by dependant routines. Common patterns like Quit and Done channels are
// easily implemented in a Looper.
type Looper interface {
Loop(fn func() error)
Wait() error
WaitWithoutError()
Done(err error)
Quit()
}
// A TimedLooper is a Looper that runs on a timed schedule, using a Timer
// underneath. It also implements Quit and Done channels to allow external
// routines to more easily control and synchronize the loop.
//
// If you pass in a DoneChan at creation time, it will send a nil on the
// channel when the loop has completed successfully or an error if the loop
// resulted in an error condition.
type TimedLooper struct {
Count int
Interval time.Duration
DoneChan chan error
quitChan chan struct{}
Immediate bool
finalizedChan chan struct{}
finalize sync.Once
}
func NewTimedLooper(count int, interval time.Duration, done chan error) *TimedLooper {
return &TimedLooper{
Count: count,
Interval: interval,
DoneChan: done,
quitChan: make(chan struct{}),
Immediate: false,
finalizedChan: make(chan struct{}),
}
}
// Same as a TimedLooper, except it will execute an iteration of the loop
// immediately after calling on Loop() (as opposed to waiting until the tick)
func NewImmediateTimedLooper(count int, interval time.Duration, done chan error) *TimedLooper {
return &TimedLooper{
Count: count,
Interval: interval,
DoneChan: done,
quitChan: make(chan struct{}),
Immediate: true,
finalizedChan: make(chan struct{}),
}
}
func (l *TimedLooper) Wait() error {
return <-l.DoneChan
}
// WaitWithoutError, unlike Wait(), can be waited on by multiple goroutine
// safely. It does not, however, return the last error.
func (l *TimedLooper) WaitWithoutError() {
<-l.finalizedChan
}
// Signal a dependant routine that we're done with our work
func (l *TimedLooper) Done(err error) {
if l.finalizedChan != nil {
l.finalize.Do(func() { close(l.finalizedChan) })
}
if l.DoneChan != nil {
select {
case l.DoneChan <- err:
// do nothing else
default:
// do nothing
}
}
}
// The main method of the Looper. This call takes a function with a single
// return value, an error. If the error is nil, the Looper will run the next
// iteration. If it's an error, it will not run the next iteration, will clean
// up any internals that need to be, and will invoke done().
func (l *TimedLooper) Loop(fn func() error) {
i := 0
var stop bool
stopFunc := func(err error) {
l.Done(err)
stop = true
}
runIteration := func() {
err := fn()
if err != nil {
stopFunc(err)
return
}
// We have to make sure not to increment if we started
// at -1 otherwise we quit on maxint rollover.
if l.Count != FOREVER {
i = i + 1
if i >= l.Count {
stopFunc(nil)
return
}
}
}
// Immediatelly run our function if we've been instantiated via
// NewImmediateTimedLooper
if l.Immediate {
runIteration()
}
ticker := time.NewTicker(l.Interval)
defer ticker.Stop()
for {
// The execution loop needs to be able to stop automatically after
// l.Count iterations. It does so when runIteration invokes stopFunc,
// which sets `stop` to false.
if stop {
break
}
select {
case <-ticker.C:
runIteration()
case <-l.quitChan:
stopFunc(nil)
break
}
}
}
// Quit() signals to the Looper to not run the next iteration and to call
// done() and return as quickly as possible. It is does not intervene between
// iterations.
func (l *TimedLooper) Quit() {
close(l.quitChan)
}
// A FreeLooper is like a TimedLooper but doesn't wait between iterations.
type FreeLooper struct {
Count int
DoneChan chan error
quitChan chan struct{}
finalizedChan chan struct{}
finalize sync.Once
}
func NewFreeLooper(count int, done chan error) *FreeLooper {
return &FreeLooper{
Count: count,
DoneChan: done,
quitChan: make(chan struct{}),
finalizedChan: make(chan struct{}),
}
}
func (l *FreeLooper) Wait() error {
return <-l.DoneChan
}
func (l *FreeLooper) WaitWithoutError() {
<-l.finalizedChan
}
// This is used internally, but can also be used by controlling routines to
// signal that a job is completed. The FreeLooper doesn's support its use
// outside the internals.
func (l *FreeLooper) Done(err error) {
if l.finalizedChan != nil {
l.finalize.Do(func() { close(l.finalizedChan) })
}
if l.DoneChan != nil {
select {
case l.DoneChan <- err:
// do nothing else
default:
// do nothing
}
}
}
func (l *FreeLooper) Loop(fn func() error) {
i := 0
for {
err := fn()
if err != nil {
l.Done(err)
return
}
// We have to make sure not to increment if we started at -1 otherwise
// we quit on maxint rollover.
if l.Count != FOREVER {
i = i + 1
if i >= l.Count {
l.Done(nil)
return
}
}
select {
case <-l.quitChan:
l.Done(nil)
return
default:
}
}
}
// Quit() signals to the Looper to not run the next iteration and to call
// Done() and return as quickly as possible. It is does not intervene between
// iterations. It is a non-blocking operation.
func (l *FreeLooper) Quit() {
close(l.quitChan)
}