/
subscription_topic_handler.go
90 lines (79 loc) · 1.96 KB
/
subscription_topic_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package sync
import (
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)
// This is a subscription topic handler that is used to handle basic
// CRUD operations on the topic map. All operations are thread safe
// so they can be called from multiple routines.
type subTopicHandler struct {
sync.RWMutex
subTopics map[string]*pubsub.Subscription
digestMap map[[4]byte]int
}
func newSubTopicHandler() *subTopicHandler {
return &subTopicHandler{
subTopics: map[string]*pubsub.Subscription{},
digestMap: map[[4]byte]int{},
}
}
func (s *subTopicHandler) addTopic(topic string, sub *pubsub.Subscription) {
s.Lock()
defer s.Unlock()
s.subTopics[topic] = sub
digest, err := p2p.ExtractGossipDigest(topic)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
return
}
s.digestMap[digest] += 1
}
func (s *subTopicHandler) topicExists(topic string) bool {
s.RLock()
defer s.RUnlock()
_, ok := s.subTopics[topic]
return ok
}
func (s *subTopicHandler) removeTopic(topic string) {
s.Lock()
defer s.Unlock()
delete(s.subTopics, topic)
digest, err := p2p.ExtractGossipDigest(topic)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
return
}
currAmt, ok := s.digestMap[digest]
// Should never be possible, is a
// defensive check.
if !ok || currAmt <= 0 {
delete(s.digestMap, digest)
return
}
s.digestMap[digest] -= 1
if s.digestMap[digest] == 0 {
delete(s.digestMap, digest)
}
}
func (s *subTopicHandler) digestExists(digest [4]byte) bool {
s.RLock()
defer s.RUnlock()
count, ok := s.digestMap[digest]
return ok && count > 0
}
func (s *subTopicHandler) allTopics() []string {
s.RLock()
defer s.RUnlock()
var topics []string
for t := range s.subTopics {
copiedTopic := t
topics = append(topics, copiedTopic)
}
return topics
}
func (s *subTopicHandler) subForTopic(topic string) *pubsub.Subscription {
s.RLock()
defer s.RUnlock()
return s.subTopics[topic]
}