forked from weaveworks/weave
-
Notifications
You must be signed in to change notification settings - Fork 0
/
peer.go
116 lines (106 loc) · 3.5 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package router
import (
"fmt"
"sort"
"strconv"
)
type PeerUID uint64
func ParsePeerUID(s string) (PeerUID, error) {
uid, err := strconv.ParseUint(s, 10, 64)
return PeerUID(uid), err
}
type Peer struct {
Name PeerName
NameByte []byte
NickName string
UID PeerUID
version uint64
localRefCount uint64 // maintained by Peers
connections map[PeerName]Connection
}
type ConnectionSet map[Connection]struct{}
func NewPeer(name PeerName, nickName string, uid PeerUID, version uint64) *Peer {
if uid == 0 {
uid = PeerUID(randUint64())
}
return &Peer{
Name: name,
NameByte: name.Bin(),
NickName: nickName,
UID: uid,
version: version,
connections: make(map[PeerName]Connection)}
}
func (peer *Peer) String() string {
return fmt.Sprint(peer.Name, "(", peer.NickName, ")")
}
func (peer *Peer) Info() string {
return fmt.Sprint(peer.String(), " (v", peer.version, ") (UID ", peer.UID, ")")
}
// Calculate the routing table from this peer to all peers reachable
// from it, returning a "next hop" map of PeerNameX -> PeerNameY,
// which says "in order to send a message to X, the peer should send
// the message to its neighbour Y".
//
// Because currently we do not have weightings on the connections
// between peers, there is no need to use a minimum spanning tree
// algorithm. Instead we employ the simpler and cheaper breadth-first
// widening. The computation is deterministic, which ensures that when
// it is performed on the same data by different peers, they get the
// same result. This is important since otherwise we risk message loss
// or routing cycles.
//
// When the 'establishedAndSymmetric' flag is set, only connections
// that are marked as 'established' and are symmetric (i.e. where both
// sides indicate they have a connection to the other) are considered.
//
// When a non-nil stopAt peer is supplied, the widening stops when it
// reaches that peer. The boolean return indicates whether that has
// happened.
//
// NB: This function should generally be invoked while holding a read
// lock on Peers and LocalPeer.
func (peer *Peer) Routes(stopAt *Peer, establishedAndSymmetric bool) (bool, map[PeerName]PeerName) {
routes := make(map[PeerName]PeerName)
routes[peer.Name] = UnknownPeerName
nextWorklist := []*Peer{peer}
for len(nextWorklist) > 0 {
worklist := nextWorklist
sort.Sort(ListOfPeers(worklist))
nextWorklist = []*Peer{}
for _, curPeer := range worklist {
if curPeer == stopAt {
return true, routes
}
curPeer.ForEachConnectedPeer(establishedAndSymmetric, routes,
func(remotePeer *Peer) {
nextWorklist = append(nextWorklist, remotePeer)
remoteName := remotePeer.Name
// We now know how to get to remoteName: the same
// way we get to curPeer. Except, if curPeer is
// the starting peer in which case we know we can
// reach remoteName directly.
if curPeer == peer {
routes[remoteName] = remoteName
} else {
routes[remoteName] = routes[curPeer.Name]
}
})
}
}
return false, routes
}
func (peer *Peer) ForEachConnectedPeer(establishedAndSymmetric bool, exclude map[PeerName]PeerName, f func(*Peer)) {
for remoteName, conn := range peer.connections {
if establishedAndSymmetric && !conn.Established() {
continue
}
if _, found := exclude[remoteName]; found {
continue
}
remotePeer := conn.Remote()
if remoteConn, found := remotePeer.connections[peer.Name]; !establishedAndSymmetric || (found && remoteConn.Established()) {
f(remotePeer)
}
}
}