-
Notifications
You must be signed in to change notification settings - Fork 200
/
peersOnChannel.go
116 lines (96 loc) · 3.15 KB
/
peersOnChannel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package libp2p
import (
"sync"
"time"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/libp2p/go-libp2p-core/peer"
)
// peersOnChannel manages peers on topics
// it buffers the data and refresh the peers list continuously (in refreshInterval intervals)
type peersOnChannel struct {
mutPeers sync.RWMutex
peers map[string][]core.PeerID
lastUpdated map[string]time.Time
refreshInterval time.Duration
ttlInterval time.Duration
fetchPeersHandler func(topic string) []peer.ID
getTimeHandler func() time.Time
}
// newPeersOnChannel returns a new peersOnChannel object
func newPeersOnChannel(
fetchPeersHandler func(topic string) []peer.ID,
refreshInterval time.Duration,
ttlInterval time.Duration,
) (*peersOnChannel, error) {
if fetchPeersHandler == nil {
return nil, p2p.ErrNilFetchPeersOnTopicHandler
}
if refreshInterval == 0 {
return nil, p2p.ErrInvalidDurationProvided
}
if ttlInterval == 0 {
return nil, p2p.ErrInvalidDurationProvided
}
poc := &peersOnChannel{
peers: make(map[string][]core.PeerID),
lastUpdated: make(map[string]time.Time),
refreshInterval: refreshInterval,
ttlInterval: ttlInterval,
fetchPeersHandler: fetchPeersHandler,
}
poc.getTimeHandler = poc.clockTime
go poc.refreshPeersOnAllKnownTopics()
return poc, nil
}
func (poc *peersOnChannel) clockTime() time.Time {
return time.Now()
}
// ConnectedPeersOnChannel returns the known peers on a topic
// if the list was not initialized, it will trigger a manual fetch
func (poc *peersOnChannel) ConnectedPeersOnChannel(topic string) []core.PeerID {
poc.mutPeers.RLock()
peers := poc.peers[topic]
poc.mutPeers.RUnlock()
if peers != nil {
return peers
}
return poc.refreshPeersOnTopic(topic)
}
// updateConnectedPeersOnTopic updates the connected peers on a topic and the last update timestamp
func (poc *peersOnChannel) updateConnectedPeersOnTopic(topic string, connectedPeers []core.PeerID) {
poc.mutPeers.Lock()
poc.peers[topic] = connectedPeers
poc.lastUpdated[topic] = poc.getTimeHandler()
poc.mutPeers.Unlock()
}
// refreshPeersOnAllKnownTopics iterates each topic, fetching its last timestamp
// it the timestamp + ttlInterval < time.Now, will trigger a fetch of connected peers on topic
func (poc *peersOnChannel) refreshPeersOnAllKnownTopics() {
for {
listTopicsToBeRefreshed := make([]string, 0)
//build required topic list
poc.mutPeers.RLock()
for topic, lastRefreshed := range poc.lastUpdated {
needsToBeRefreshed := poc.getTimeHandler().Sub(lastRefreshed) > poc.ttlInterval
if needsToBeRefreshed {
listTopicsToBeRefreshed = append(listTopicsToBeRefreshed, topic)
}
}
poc.mutPeers.RUnlock()
for _, topic := range listTopicsToBeRefreshed {
_ = poc.refreshPeersOnTopic(topic)
}
time.Sleep(poc.refreshInterval)
}
}
// refreshPeersOnTopic
func (poc *peersOnChannel) refreshPeersOnTopic(topic string) []core.PeerID {
list := poc.fetchPeersHandler(topic)
connectedPeers := make([]core.PeerID, len(list))
for i, pid := range list {
connectedPeers[i] = core.PeerID(pid)
}
poc.updateConnectedPeersOnTopic(topic, connectedPeers)
return connectedPeers
}