-
Notifications
You must be signed in to change notification settings - Fork 81
/
subscriber.go
95 lines (75 loc) · 1.51 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package broker
import (
"errors"
"fmt"
"sync"
)
// Subscriber .
type Subscriber interface {
// Options .
Options() SubscribeOptions
// Topic .
Topic() string
// Unsubscribe .
Unsubscribe(removeFromManager bool) error
}
type SubscriberMap map[string]Subscriber
type SubscriberSyncMap struct {
sync.RWMutex
m SubscriberMap
}
func NewSubscriberSyncMap() *SubscriberSyncMap {
return &SubscriberSyncMap{
m: make(SubscriberMap),
}
}
func (sm *SubscriberSyncMap) Add(topic string, sub Subscriber) {
sm.Lock()
defer sm.Unlock()
sm.m[topic] = sub
}
func (sm *SubscriberSyncMap) Remove(topic string) error {
sm.Lock()
defer sm.Unlock()
if sub, ok := sm.m[topic]; ok {
delete(sm.m, topic)
return sub.Unsubscribe(true)
} else {
return errors.New(fmt.Sprintf("topic[%s] not found", topic))
}
}
func (sm *SubscriberSyncMap) RemoveOnly(topic string) bool {
sm.Lock()
defer sm.Unlock()
if _, ok := sm.m[topic]; ok {
delete(sm.m, topic)
return true
} else {
return false
}
}
func (sm *SubscriberSyncMap) Clear() {
sm.Lock()
defer sm.Unlock()
for _, sub := range sm.m {
_ = sub.Unsubscribe(false)
}
sm.m = make(SubscriberMap)
}
func (sm *SubscriberSyncMap) ForceClear() {
sm.Lock()
defer sm.Unlock()
sm.m = make(SubscriberMap)
}
func (sm *SubscriberSyncMap) Get(topic string) Subscriber {
sm.RLock()
defer sm.RUnlock()
return sm.m[topic]
}
func (sm *SubscriberSyncMap) Foreach(fnc func(topic string, sub Subscriber)) {
sm.RLock()
defer sm.RUnlock()
for k, v := range sm.m {
fnc(k, v)
}
}