-
Notifications
You must be signed in to change notification settings - Fork 607
/
queue.go
215 lines (192 loc) · 6.81 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package volumequeue
import (
"sync"
"time"
)
// baseRetryInterval is the base interval to retry volume operations. each
// subsequent attempt is exponential from this one
const baseRetryInterval = 100 * time.Millisecond
// maxRetryInterval is the maximum amount of time we will wait between retrying
// volume operations.
const maxRetryInterval = 10 * time.Minute
// vqTimerSource is an interface for creating timers for the volumeQueue
type vqTimerSource interface {
// NewTimer takes an attempt number and returns a vqClockTrigger which will
// trigger after a set period based on that attempt number.
NewTimer(attempt uint) vqTimer
}
// vqTimer is an interface representing a timer. However, the timer
// trigger channel, C, is instead wrapped in a Done method, so that in testing,
// the timer can be substituted for a different object.
type vqTimer interface {
Done() <-chan time.Time
Stop() bool
}
// timerSource is an empty struct type which is used to represent the default
// vqTimerSource, which uses time.Timer.
type timerSource struct{}
// NewTimer creates a new timer.
func (timerSource) NewTimer(attempt uint) vqTimer {
var waitFor time.Duration
if attempt == 0 {
waitFor = 0
} else {
// bit-shifting the base retry interval will raise it by 2 to the power
// of attempt. this is an easy way to do an exponent solely with
// integers
waitFor = baseRetryInterval << attempt
if waitFor > maxRetryInterval {
waitFor = maxRetryInterval
}
}
return timer{Timer: time.NewTimer(waitFor)}
}
// timer wraps a time.Timer to provide a Done method.
type timer struct {
*time.Timer
}
// Done returns the timer's C channel, which triggers in response to the timer
// expiring.
func (t timer) Done() <-chan time.Time {
return t.C
}
// VolumeQueue manages the exponential backoff of retrying volumes. it behaves
// somewhat like a priority queue. however, the key difference is that volumes
// which are ready to process or reprocess are read off of an unbuffered
// channel, meaning the order in which ready volumes are processed is at the
// mercy of the golang scheduler. in practice, this does not matter.
type VolumeQueue struct {
sync.Mutex
// next returns the next volumeQueueEntry when it is ready.
next chan *volumeQueueEntry
// outstanding is the set of all pending volumeQueueEntries, mapped by
// volume ID.
outstanding map[string]*volumeQueueEntry
// stopChan stops the volumeQueue and cancels all entries.
stopChan chan struct{}
// timerSource is an object which is used to create the timer for a
// volumeQueueEntry. it exists so that in testing, the timer can be
// substituted for an object that we control.
timerSource vqTimerSource
}
// volumeQueueEntry represents one entry in the volumeQueue
type volumeQueueEntry struct {
// id is the id of the volume this entry represents. we only need the ID,
// because the CSI manager will look up the latest revision of the volume
// before doing any work on it.
id string
// attempt is the current retry attempt of the entry.
attempt uint
// cancel is a function which is called to abort the retry attempt.
cancel func()
}
// NewVolumeQueue returns a new VolumeQueue with the default timerSource.
func NewVolumeQueue() *VolumeQueue {
return &VolumeQueue{
next: make(chan *volumeQueueEntry),
outstanding: make(map[string]*volumeQueueEntry),
stopChan: make(chan struct{}),
timerSource: timerSource{},
}
}
// Enqueue adds an entry to the VolumeQueue with the specified retry attempt.
// if an entry for the specified id already exists, enqueue will remove it and
// create a new entry.
func (vq *VolumeQueue) Enqueue(id string, attempt uint) {
// we must lock the volumeQueue when we add entries, because we will be
// accessing the outstanding map
vq.Lock()
defer vq.Unlock()
if entry, ok := vq.outstanding[id]; ok {
entry.cancel()
delete(vq.outstanding, id)
}
cancelChan := make(chan struct{})
v := &volumeQueueEntry{
id: id,
attempt: attempt,
cancel: func() {
close(cancelChan)
},
}
t := vq.timerSource.NewTimer(attempt)
// this goroutine is the meat of the volumeQueue. when the timer triggers,
// the volume queue entry is written out to the next channel.
//
// the nature of the select statement, and of goroutines and of
// ansynchronous operations means that this is not actually strictly
// ordered. if several entries are ready, then the one that actually gets
// dequeued is at the mercy of the golang scheduler.
//
// however, the flip side of this is that canceling an entry truly cancels
// it. because we're blocking on a write attempt, if we cancel, we don't
// do that write attempt, and there's no need to try to remove from the
// queue a ready-but-now-canceled entry before it is processed.
go func() {
select {
case <-t.Done():
// once the timer fires, we will try to write this entry to the
// next channel. however, because next is unbuffered, if we ended
// up in a situation where no read occurred, we would be
// deadlocked. to avoid this, we select on both a vq.next write and
// on a read from cancelChan, which allows us to abort our write
// attempt.
select {
case vq.next <- v:
case <-cancelChan:
}
case <-cancelChan:
// the documentation for timer recommends draining the channel like
// this.
if !t.Stop() {
<-t.Done()
}
}
}()
vq.outstanding[id] = v
}
// Wait returns the ID and attempt number of the next Volume ready to process.
// If no volume is ready, wait blocks until one is ready. if the volumeQueue
// is stopped, wait returns "", 0
func (vq *VolumeQueue) Wait() (string, uint) {
select {
case v := <-vq.next:
vq.Lock()
defer vq.Unlock()
// we need to be certain that this entry is the same entry that we
// read, because otherwise there may be a race.
//
// it would be possible for the read from next to succeed, but before
// the lock is acquired, a new attempt is enqueued. enqueuing the new
// attempt deletes the old entry before replacing it with the new entry
// and releasing the lock. then, this routine may acquire the lock, and
// delete a new entry.
//
// in practice, it is unclear if this race could happen or would matter
// if it did, but always better safe than sorry.
e, ok := vq.outstanding[v.id]
if ok && e == v {
delete(vq.outstanding, v.id)
}
return v.id, v.attempt
case <-vq.stopChan:
// if the volumeQueue is stopped, then there may be no more writes, so
// we should return an empty result from wait
return "", 0
}
}
// Outstanding returns the number of items outstanding in this queue
func (vq *VolumeQueue) Outstanding() int {
return len(vq.outstanding)
}
// Stop stops the volumeQueue and cancels all outstanding entries. stop may
// only be called once.
func (vq *VolumeQueue) Stop() {
vq.Lock()
defer vq.Unlock()
close(vq.stopChan)
for _, entry := range vq.outstanding {
entry.cancel()
}
return
}