-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
queue.go
147 lines (125 loc) · 4.21 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package chainview
import "sync"
// blockEventType is the possible types of a blockEvent.
type blockEventType uint8
const (
// connected is the type of a blockEvent representing a block
// that was connected to our current chain.
connected blockEventType = iota
// disconnected is the type of a blockEvent representing a
// block that is stale/disconnected from our current chain.
disconnected
)
// blockEvent represent a block that was either connected
// or disconnected from the current chain.
type blockEvent struct {
eventType blockEventType
block *FilteredBlock
}
// blockEventQueue is an ordered queue for block events sent from a
// FilteredChainView. The two types of possible block events are
// connected/new blocks, and disconnected/stale blocks. The
// blockEventQueue keeps the order of these events intact, while
// still being non-blocking. This is important in order for the
// chainView's call to onBlockConnected/onBlockDisconnected to not
// get blocked, and for the consumer of the block events to always
// get the events in the correct order.
type blockEventQueue struct {
queueCond *sync.Cond
queueMtx sync.Mutex
queue []*blockEvent
// newBlocks is the channel where the consumer of the queue
// will receive connected/new blocks from the FilteredChainView.
newBlocks chan *FilteredBlock
// staleBlocks is the channel where the consumer of the queue will
// receive disconnected/stale blocks from the FilteredChainView.
staleBlocks chan *FilteredBlock
wg sync.WaitGroup
quit chan struct{}
}
// newBlockEventQueue creates a new blockEventQueue.
func newBlockEventQueue() *blockEventQueue {
b := &blockEventQueue{
newBlocks: make(chan *FilteredBlock),
staleBlocks: make(chan *FilteredBlock),
quit: make(chan struct{}),
}
b.queueCond = sync.NewCond(&b.queueMtx)
return b
}
// Start starts the blockEventQueue coordinator such that it can start handling
// events.
func (b *blockEventQueue) Start() {
b.wg.Add(1)
go b.queueCoordinator()
}
// Stop signals the queue coordinator to stop, such that the queue can be
// shut down.
func (b *blockEventQueue) Stop() {
close(b.quit)
b.queueCond.Signal()
}
// queueCoordinator is the queue's main loop, handling incoming block events
// and handing them off to the correct output channel.
//
// NB: MUST be run as a goroutine from the Start() method.
func (b *blockEventQueue) queueCoordinator() {
defer b.wg.Done()
for {
// First, we'll check our condition. If the queue of events is
// empty, then we'll wait until a new item is added.
b.queueCond.L.Lock()
for len(b.queue) == 0 {
b.queueCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the queue for any new
// items.
select {
case <-b.quit:
b.queueCond.L.Unlock()
return
default:
}
}
// Grab the first element in the queue, and nil the index to
// avoid gc leak.
event := b.queue[0]
b.queue[0] = nil
b.queue = b.queue[1:]
b.queueCond.L.Unlock()
// In the case this is a connected block, we'll send it on the
// newBlocks channel. In case it is a disconnected block, we'll
// send it on the staleBlocks channel. This send will block
// until it is received by the consumer on the other end, making
// sure we won't try to send any other block event before the
// consumer is aware of this one.
switch event.eventType {
case connected:
select {
case b.newBlocks <- event.block:
case <-b.quit:
return
}
case disconnected:
select {
case b.staleBlocks <- event.block:
case <-b.quit:
return
}
}
}
}
// Add puts the provided blockEvent at the end of the event queue, making sure
// it will first be received after all previous events. This method is
// non-blocking, in the sense that it will never wait for the consumer of the
// queue to read form the other end, making it safe to call from the
// FilteredChainView's onBlockConnected/onBlockDisconnected.
func (b *blockEventQueue) Add(event *blockEvent) {
// Lock the condition, and add the event to the end of queue.
b.queueCond.L.Lock()
b.queue = append(b.queue, event)
b.queueCond.L.Unlock()
// With the event added, we signal to the queueCoordinator that
// there are new events to handle.
b.queueCond.Signal()
}