/
peermanager.go
101 lines (84 loc) · 2.37 KB
/
peermanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package peermanager
import (
"context"
"sync"
"github.com/ipfs/fs-repo-migrations/ipfs-10-to-11/_vendor/github.com/libp2p/go-libp2p-core/peer"
)
// PeerProcess is any process that provides services for a peer
type PeerProcess interface {
Startup()
Shutdown()
}
// PeerProcessFactory provides a function that will create a PeerQueue.
type PeerProcessFactory func(ctx context.Context, p peer.ID) PeerProcess
type peerProcessInstance struct {
refcnt int
process PeerProcess
}
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
peerProcesses map[peer.ID]*peerProcessInstance
peerProcessesLk sync.RWMutex
createPeerProcess PeerProcessFactory
ctx context.Context
}
// New creates a new PeerManager, given a context and a peerQueueFactory.
func New(ctx context.Context, createPeerQueue PeerProcessFactory) *PeerManager {
return &PeerManager{
peerProcesses: make(map[peer.ID]*peerProcessInstance),
createPeerProcess: createPeerQueue,
ctx: ctx,
}
}
// ConnectedPeers returns a list of peers this PeerManager is managing.
func (pm *PeerManager) ConnectedPeers() []peer.ID {
pm.peerProcessesLk.RLock()
defer pm.peerProcessesLk.RUnlock()
peers := make([]peer.ID, 0, len(pm.peerProcesses))
for p := range pm.peerProcesses {
peers = append(peers, p)
}
return peers
}
// Connected is called to add a new peer to the pool
func (pm *PeerManager) Connected(p peer.ID) {
pm.peerProcessesLk.Lock()
pq := pm.getOrCreate(p)
pq.refcnt++
pm.peerProcessesLk.Unlock()
}
// Disconnected is called to remove a peer from the pool.
func (pm *PeerManager) Disconnected(p peer.ID) {
pm.peerProcessesLk.Lock()
pq, ok := pm.peerProcesses[p]
if !ok {
pm.peerProcessesLk.Unlock()
return
}
pq.refcnt--
if pq.refcnt > 0 {
pm.peerProcessesLk.Unlock()
return
}
delete(pm.peerProcesses, p)
pm.peerProcessesLk.Unlock()
pq.process.Shutdown()
}
// GetProcess returns the process for the given peer
func (pm *PeerManager) GetProcess(
p peer.ID) PeerProcess {
pm.peerProcessesLk.Lock()
pqi := pm.getOrCreate(p)
pm.peerProcessesLk.Unlock()
return pqi.process
}
func (pm *PeerManager) getOrCreate(p peer.ID) *peerProcessInstance {
pqi, ok := pm.peerProcesses[p]
if !ok {
pq := pm.createPeerProcess(pm.ctx, p)
pq.Startup()
pqi = &peerProcessInstance{0, pq}
pm.peerProcesses[p] = pqi
}
return pqi
}