forked from evmos/ethermint
/
pubsub.go
143 lines (114 loc) · 3.05 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package pubsub
import (
"sync"
"sync/atomic"
"github.com/pkg/errors"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
)
type UnsubscribeFunc func()
type EventBus interface {
AddTopic(name string, src <-chan coretypes.ResultEvent) error
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}
type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}
func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}
func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}
func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()
topics = make([]string, 0, len(m.topics))
for topicName := range m.topics {
topics = append(topics, topicName)
}
return topics
}
func (m *memEventBus) AddTopic(name string, src <-chan coretypes.ResultEvent) error {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()
if ok {
return errors.New("topic already registered")
}
m.topicsMux.Lock()
m.topics[name] = src
m.topicsMux.Unlock()
go m.publishTopic(name, src)
return nil
}
func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
}
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()
if !ok {
return nil, nil, errors.Errorf("topic not found: %s", name)
}
ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch
unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}
return ch, unsubscribe, nil
}
func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
for {
msg, ok := <-src
if !ok {
m.closeAllSubscribers(name)
m.topicsMux.Lock()
delete(m.topics, name)
m.topicsMux.Unlock()
return
}
m.publishAllSubscribers(name, msg)
}
}
func (m *memEventBus) closeAllSubscribers(name string) {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
subsribers := m.subscribers[name]
delete(m.subscribers, name)
for _, sub := range subsribers {
close(sub)
}
}
func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEvent) {
m.subscribersMux.RLock()
subsribers := m.subscribers[name]
m.subscribersMux.RUnlock()
for _, sub := range subsribers {
select {
case sub <- msg:
default:
}
}
}