forked from onflow/flow-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fifoqueue.go
137 lines (116 loc) · 3.97 KB
/
fifoqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package fifoqueue
import (
"fmt"
mathbits "math/bits"
"sync"
"github.com/ef-ds/deque"
)
// CapacityUnlimited specifies the largest possible capacity for a FifoQueue.
// maximum value for platform-specific int: https://yourbasic.org/golang/max-min-int-uint/
const CapacityUnlimited = 1<<(mathbits.UintSize-1) - 1
// FifoQueue implements a FIFO queue with max capacity and length observer.
// Elements that exceeds the queue's max capacity are silently dropped.
// By default, the theoretical capacity equals to the largest `int` value
// (platform dependent). Capacity can be set at construction time via the
// option `WithCapacity`.
// Each time the queue's length changes, the QueueLengthObserver is called
// with the new length. By default, the QueueLengthObserver is a NoOp.
// A single QueueLengthObserver can be set at construction time via the
// option `WithLengthObserver`.
//
// Caution:
// * the QueueLengthObserver must be non-blocking
type FifoQueue struct {
mu sync.RWMutex
queue deque.Deque
maxCapacity int
lengthObserver QueueLengthObserver
}
// ConstructorOptions are optional arguments for the `NewFifoQueue`
// constructor to specify properties of the FifoQueue.
type ConstructorOption func(*FifoQueue) error
// QueueLengthObserver is a optional callback that can be provided to
// the `NewFifoQueue` constructor (via `WithLengthObserver` option).
type QueueLengthObserver func(int)
// WithLengthObserver is a constructor option for NewFifoQueue. Each time the
// queue's length changes, the queue calls the provided callback with the new
// length. By default, the QueueLengthObserver is a NoOp.
// CAUTION:
// - QueueLengthObserver implementations must be non-blocking
// - The values published to queue length observer might be in different order
// than the actual length values at the time of insertion. This is a
// performance optimization, which allows to reduce the duration during
// which the queue is internally locked when inserting elements.
func WithLengthObserver(callback QueueLengthObserver) ConstructorOption {
return func(queue *FifoQueue) error {
if callback == nil {
return fmt.Errorf("nil is not a valid QueueLengthObserver")
}
queue.lengthObserver = callback
return nil
}
}
// NewFifoQueue is the Constructor for FifoQueue
func NewFifoQueue(maxCapacity int, options ...ConstructorOption) (*FifoQueue, error) {
if maxCapacity < 1 {
return nil, fmt.Errorf("capacity for Fifo queue must be positive")
}
queue := &FifoQueue{
maxCapacity: maxCapacity,
lengthObserver: func(int) { /* noop */ },
}
for _, opt := range options {
err := opt(queue)
if err != nil {
return nil, fmt.Errorf("failed to apply constructor option to fifoqueue queue: %w", err)
}
}
return queue, nil
}
// Push appends the given value to the tail of the queue.
// If queue capacity is reached, the message is silently dropped.
func (q *FifoQueue) Push(element interface{}) bool {
length, pushed := q.push(element)
if pushed {
q.lengthObserver(length)
}
return pushed
}
func (q *FifoQueue) push(element interface{}) (int, bool) {
q.mu.Lock()
defer q.mu.Unlock()
length := q.queue.Len()
if length < q.maxCapacity {
q.queue.PushBack(element)
return q.queue.Len(), true
}
return length, false
}
// Front peeks message at the head of the queue (without removing the head).
func (q *FifoQueue) Head() (interface{}, bool) {
q.mu.RLock()
defer q.mu.RUnlock()
return q.queue.Front()
}
// Pop removes and returns the queue's head element.
// If the queue is empty, (nil, false) is returned.
func (q *FifoQueue) Pop() (interface{}, bool) {
event, length, ok := q.pop()
if !ok {
return nil, false
}
q.lengthObserver(length)
return event, true
}
func (q *FifoQueue) pop() (interface{}, int, bool) {
q.mu.Lock()
defer q.mu.Unlock()
event, ok := q.queue.PopFront()
return event, q.queue.Len(), ok
}
// Len returns the current length of the queue.
func (q *FifoQueue) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
return q.queue.Len()
}