/
queue.go
389 lines (334 loc) · 10.2 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
package pubsub
import (
"context"
"fmt"
"io"
"sync"
"github.com/tychoish/fun"
"github.com/tychoish/fun/ers"
)
// stolen shamelessly from https://github.com/tendermint/tendermint/tree/master/internal/libs/queue
const (
// ErrQueueFull is returned by the Add method of a queue when the queue has
// reached its hard capacity limit.
ErrQueueFull = ers.Error("queue is full")
// ErrQueueNoCredit is returned by the Add method of a queue when the queue has
// exceeded its soft quota and there is insufficient burst credit.
ErrQueueNoCredit = ers.Error("insufficient burst credit")
)
var (
// ErrQueueClosed is returned by the Add method of a closed queue, and by
// the Wait method of a closed empty queue.
ErrQueueClosed = fmt.Errorf("queue is closed: %w", io.EOF)
// Sentinel errors reported by the New constructor.
errHardLimit = fmt.Errorf("hard limit must be > 0 and ≥ soft quota: %w", ers.ErrMalformedConfiguration)
errBurstCredit = fmt.Errorf("burst credit must be non-negative: %w", ers.ErrMalformedConfiguration)
)
// A Queue is a limited-capacity FIFO queue of arbitrary data items.
//
// A queue has a soft quota and a hard limit on the number of items that may be
// contained in the queue. Adding items in excess of the hard limit will fail
// unconditionally.
//
// For items in excess of the soft quota, a credit system applies: Each queue
// maintains a burst credit score. Adding an item in excess of the soft quota
// costs 1 unit of burst credit. If there is not enough burst credit, the add
// will fail.
//
// The initial burst credit is assigned when the queue is constructed. Removing
// items from the queue adds additional credit if the resulting queue length is
// less than the current soft quota. Burst credit is capped by the hard limit.
//
// A Queue is safe for concurrent use by multiple goroutines.
type Queue[T any] struct {
mu sync.Mutex // protects the fields below
tracker queueLimitTracker
closed bool
nempty *sync.Cond
nupdates *sync.Cond
// The queue is singly-linked. Front points to the sentinel and back points
// to the newest entry. The oldest entry is front.link if it exists.
back *entry[T]
front *entry[T]
}
// NewQueue constructs a new empty queue with the specified options.
// It reports an error if any of the option values are invalid.
func NewQueue[T any](opts QueueOptions) (*Queue[T], error) {
if err := opts.Validate(); err != nil {
return nil, err
}
return makeQueue[T](newQueueLimitTracker(opts)), nil
}
// NewUnlimitedQueue produces an unbounded queue.
func NewUnlimitedQueue[T any]() *Queue[T] {
return makeQueue[T](&queueNoLimitTrackerImpl{})
}
func makeQueue[T any](tracker queueLimitTracker) *Queue[T] {
sentinel := new(entry[T])
q := &Queue[T]{
back: sentinel,
front: sentinel,
tracker: tracker,
}
q.nempty = sync.NewCond(&q.mu)
q.nupdates = sync.NewCond(&q.mu)
return q
}
// Add adds item to the back of the queue. It reports an error and does not
// enqueue the item if the queue is full or closed, or if it exceeds its soft
// quota and there is not enough burst credit.
func (q *Queue[T]) Add(item T) error {
q.mu.Lock()
defer q.mu.Unlock()
return q.doAdd(item)
}
// Len returns the number of items in the queue. Because the queue
// tracks its size this is a constant time operation.
func (q *Queue[T]) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.tracker.len()
}
func (q *Queue[T]) doAdd(item T) error {
if q.closed {
return ErrQueueClosed
}
if err := q.tracker.add(); err != nil {
return err
}
e := &entry[T]{item: item}
q.back.link = e
q.back = e
if q.tracker.len() == 1 { // was empty
q.nempty.Signal()
}
// for the iterator, signal for any updates
q.nupdates.Signal()
return nil
}
// BlockingAdd attempts to add an item to the queue, as with Add, but
// if the queue is full, blocks until the queue has capacity, is
// closed, or the context is canceled. Returns an error if the context
// is canceled or the queue is closed.
func (q *Queue[T]) BlockingAdd(ctx context.Context, item T) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return ErrQueueClosed
}
if q.tracker.cap() > q.tracker.len() {
return q.doAdd(item)
}
cond := q.nupdates
// If the context terminates, wake the waiter.
ctx, cancel := context.WithCancel(ctx)
go func() { <-ctx.Done(); cond.Broadcast() }()
defer cancel()
for q.tracker.cap() <= q.tracker.len() {
select {
case <-ctx.Done():
return ctx.Err()
default:
cond.Wait()
}
}
return q.doAdd(item)
}
// Remove removes and returns the frontmost (oldest) item in the queue and
// reports whether an item was available. If the queue is empty, Remove
// returns nil, false.
func (q *Queue[T]) Remove() (out T, _ bool) {
q.mu.Lock()
defer q.mu.Unlock()
if q.tracker.len() == 0 {
return out, false
}
return q.popFront(), true
}
// Wait blocks until q is non-empty or closed, and then returns the frontmost
// (oldest) item from the queue. If ctx ends before an item is available, Wait
// returns a nil value and a context error. If the queue is closed while it is
// still, Wait returns nil, ErrQueueClosed.
//
// Wait is destructive: every item returned is removed from the queue.
func (q *Queue[T]) Wait(ctx context.Context) (out T, _ error) {
q.mu.Lock()
defer q.mu.Unlock()
if err := q.unsafeWaitWhileEmpty(ctx); err != nil {
return out, err
}
return q.popFront(), nil
}
// caller must hold the lock, but this implements the wait behavior
// without modifying the queue for use in the iterator.
func (q *Queue[T]) unsafeWaitWhileEmpty(ctx context.Context) error {
// If the context terminates, wake the waiter.
ctx, cancel := context.WithCancel(ctx)
go func() { <-ctx.Done(); q.nempty.Broadcast() }()
defer cancel()
for q.tracker.len() == 0 {
if q.closed {
return ErrQueueClosed
}
select {
case <-ctx.Done():
return ctx.Err()
default:
q.nempty.Wait()
}
}
return nil
}
func (q *Queue[T]) waitForNew(ctx context.Context) error {
q.mu.Lock()
defer q.mu.Unlock()
// when the function returns wake all other waiters.
ctx, cancel := context.WithCancel(ctx)
go func() { <-ctx.Done(); q.nupdates.Broadcast() }()
defer cancel()
head := q.back
for head == q.back && q.back.link != q.front {
if q.closed {
return ErrQueueClosed
}
select {
case <-ctx.Done():
return ctx.Err()
default:
q.nupdates.Wait()
}
}
return nil
}
// Close closes the queue. After closing, any further Add calls will report an
// error, but items that were added to the queue prior to closing will still be
// available for Remove and Wait. Wait will report an error without blocking if
// it is called on a closed, empty queue.
func (q *Queue[T]) Close() error {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.nupdates.Broadcast()
q.nempty.Broadcast()
return nil
}
// popFront removes the frontmost item of q and returns its value after
// updating quota and credit settings.
//
// Preconditions: The caller holds q.mu and q is not empty.
func (q *Queue[T]) popFront() T {
e := q.front.link
q.front.link = e.link
if e == q.back {
q.back = q.front
}
q.tracker.remove()
q.nupdates.Broadcast()
return e.item
}
// QueueOptions are the initial settings for a Queue or Deque.
type QueueOptions struct {
// The maximum number of items the queue will ever be
// permitted to hold. This value must be positive, and greater
// than or equal to SoftQuota. The hard limit is fixed and
// does not change as the queue is used.
//
// The hard limit should be chosen to exceed the largest burst
// size expected under normal operating conditions.
HardLimit int
// The initial expected maximum number of items the queue
// should contain on an average workload. If this value is
// zero, it is initialized to the hard limit. The soft quota
// is adjusted from the initial value dynamically as the queue
// is used.
SoftQuota int
// The initial burst credit score. This value must be greater
// than or equal to zero. If it is zero, the soft quota is
// used.
BurstCredit float64
}
// Validate ensures that the options are consistent. Exported as a
// convenience function. All errors have ErrConfigurationMalformed as
// their root.
func (opts *QueueOptions) Validate() error {
if opts.HardLimit <= 0 || opts.HardLimit < opts.SoftQuota {
return errHardLimit
}
if opts.BurstCredit < 0 {
return errBurstCredit
}
if opts.SoftQuota <= 0 {
opts.SoftQuota = opts.HardLimit
}
if opts.BurstCredit == 0 {
opts.BurstCredit = float64(opts.SoftQuota)
}
return nil
}
type entry[T any] struct {
item T
link *entry[T]
}
// Iterator produces an iterator implementation that wraps the
// underlying queue linked list. The iterator respects the Queue's
// mutex and is safe for concurrent access and current queue
// operations, without additional locking. The iterator does not
// modify or remove items from the queue, and will only terminate when
// the queue has been closed via the Close() method.
//
// To create a "consuming" iterator, use a Distributor.
func (q *Queue[T]) Iterator() *fun.Iterator[T] { return q.Producer().Iterator() }
// Distributor creates a object used to process the items in the
// queue: items yielded by the Distributor's iterator, are removed
// from the queue.
func (q *Queue[T]) Distributor() Distributor[T] {
return Distributor[T]{
push: fun.MakeProcessor(q.Add),
pop: func(ctx context.Context) (_ T, err error) {
msg, ok := q.Remove()
if ok {
return msg, nil
}
msg, err = q.Wait(ctx)
return msg, err
},
size: q.tracker.len,
}
}
// Producer returns a function that produces items from the
// queue, iteratively. It's not destructive, and has the same
// semantics as the Iterator.
func (q *Queue[T]) Producer() fun.Producer[T] {
var next *entry[T]
return func(ctx context.Context) (o T, _ error) {
if next == nil {
q.mu.Lock()
next = q.front
q.mu.Unlock()
}
q.mu.Lock()
if next.link == q.front {
q.mu.Unlock()
return o, io.EOF
}
if next.link != nil {
next = next.link
q.mu.Unlock()
} else if next.link == nil {
if q.closed {
q.mu.Unlock()
return o, io.EOF
}
q.mu.Unlock()
if err := q.waitForNew(ctx); err != nil {
return o, err
}
q.mu.Lock()
if next.link != q.front {
next = next.link
}
q.mu.Unlock()
}
return next.item, nil
}
}