-
Notifications
You must be signed in to change notification settings - Fork 79
/
subscriptions.go
67 lines (61 loc) · 1.95 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package mempool
import "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
// RunSubscriptions runs subscriptions goroutine if mempool subscriptions are enabled.
// You should manually free the resources by calling StopSubscriptions on mempool shutdown.
func (mp *Pool) RunSubscriptions() {
if !mp.subscriptionsEnabled {
panic("subscriptions are disabled")
}
if !mp.subscriptionsOn.Load() {
mp.subscriptionsOn.Store(true)
go mp.notificationDispatcher()
}
}
// StopSubscriptions stops mempool events loop.
func (mp *Pool) StopSubscriptions() {
if !mp.subscriptionsEnabled {
panic("subscriptions are disabled")
}
if mp.subscriptionsOn.Load() {
mp.subscriptionsOn.Store(false)
close(mp.stopCh)
}
}
// SubscribeForTransactions adds given channel to new mempool event broadcasting, so when
// there is a new transactions added to mempool or an existing transaction removed from
// mempool you'll receive it via this channel.
func (mp *Pool) SubscribeForTransactions(ch chan<- mempoolevent.Event) {
if mp.subscriptionsOn.Load() {
mp.subCh <- ch
}
}
// UnsubscribeFromTransactions unsubscribes given channel from new mempool notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op.
func (mp *Pool) UnsubscribeFromTransactions(ch chan<- mempoolevent.Event) {
if mp.subscriptionsOn.Load() {
mp.unsubCh <- ch
}
}
// notificationDispatcher manages subscription to events and broadcasts new events.
func (mp *Pool) notificationDispatcher() {
var (
// These are just sets of subscribers, though modelled as maps
// for ease of management (not a lot of subscriptions is really
// expected, but maps are convenient for adding/deleting elements).
txFeed = make(map[chan<- mempoolevent.Event]bool)
)
for {
select {
case <-mp.stopCh:
return
case sub := <-mp.subCh:
txFeed[sub] = true
case unsub := <-mp.unsubCh:
delete(txFeed, unsub)
case event := <-mp.events:
for ch := range txFeed {
ch <- event
}
}
}
}