forked from spacemeshos/go-spacemesh
-
Notifications
You must be signed in to change notification settings - Fork 0
/
swarmfindnode.go
171 lines (134 loc) · 4.44 KB
/
swarmfindnode.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package p2p
import (
"github.com/spacemeshos/go-spacemesh/crypto"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/dht"
"github.com/spacemeshos/go-spacemesh/p2p/dht/table"
"github.com/spacemeshos/go-spacemesh/p2p/node"
"time"
)
// Finds a node based on its id - internal method
// id: base58 node id string
// returns remote node or nil when not found
// not go safe - should only be called from swarm event dispatcher
func (s *swarmImpl) findNode(id string, callback chan node.RemoteNodeData) {
s.localNode.Info("finding node: %s ...", log.PrettyID(id))
// look at peer store
n := s.peers[id]
if n != nil {
s.localNode.Info(" known peer")
go func() { callback <- s.localNode.GetRemoteNodeData() }()
return
}
// look for the node at local dht table
poc := make(table.PeerOpChannel, 1)
s.routingTable.Find(table.PeerByIDRequest{ID: dht.NewIDFromBase58String(id), Callback: poc})
select {
case c := <-poc:
res := c.Peer
if res != nil {
go func() { callback <- res }()
return
}
}
// use kad to locate the node
s.kadFindNode(id, callback)
}
// Implements the kad algo for locating a remote node
// Precondition - node is not in local routing table
// nodeId: - base58 node id string
// Returns requested node via the callback or nil if not found
func (s *swarmImpl) kadFindNode(nodeID string, callback chan node.RemoteNodeData) {
s.localNode.Info("Kad find node: %s ...", nodeID)
// kad node location algo
alpha := int(s.config.RoutingTableAlpha)
k := int(s.config.RoutingTableBucketSize)
dhtID := dht.NewIDFromBase58String(nodeID)
// step 1 - get up to alpha closest nodes to the target in the local routing table
searchList := s.getNearestPeers(dhtID, k)
// step 2 - iterative lookups for nodeId using searchList
Loop:
for {
if len(searchList) == 0 {
go func() { callback <- nil }()
break Loop
}
closestNode := searchList[0]
if closestNode.ID() == nodeID {
go func() { callback <- closestNode }()
break Loop
}
// pick up to alpha servers to query from the search list
// servers that have been recently queried will not be returned
servers := node.PickFindNodeServers(searchList, nodeID, alpha)
if len(servers) == 0 {
// no more servers to query
go func() { callback <- nil }()
break Loop
}
// lookup nodeId using the target servers
res := s.lookupNode(servers, nodeID, closestNode)
if len(res) > 0 {
// merge newly found nodes
searchList = node.Union(searchList, res)
// sort by distance from target
searchList = node.SortClosestPeers(res, dhtID)
}
// keep iterating using new servers that were not queried yet from searchlist (if any)
}
}
// Lookup a target node on one or more servers
// Returns closest nodes which are closers than closestNode to targetId
// If node found it will be in top of results list
func (s *swarmImpl) lookupNode(servers []node.RemoteNodeData, targetID string, closestNode node.RemoteNodeData) []node.RemoteNodeData {
l := len(servers)
if l == 0 {
return []node.RemoteNodeData{}
}
// results channel
callback := make(chan FindNodeResp, l)
// queries are run in par and results are collected
for i := 0; i < l; i++ {
servers[i].SetLastFindNodeCall(targetID, time.Now())
// find node protocol adds found nodes to the local routing table
go s.getFindNodeProtocol().FindNode(crypto.UUID(), servers[i].ID(), targetID, callback)
}
done := 0
idSet := make(map[string]node.RemoteNodeData)
Loop:
for {
select {
case res := <-callback:
nodes := node.FromNodeInfos(res.NodeInfos)
for _, n := range nodes {
idSet[n.ID()] = n
}
done++
if done == l {
break Loop
}
case <-time.After(time.Second * 60):
// we expected nodes to return results within a reasonable time frame
break Loop
}
}
// add unique node ids that are closer to target id than closest node
res := []node.RemoteNodeData{}
targetDhtID := dht.NewIDFromBase58String(targetID)
for _, n := range idSet {
if n.DhtID().Closer(targetDhtID, closestNode.DhtID()) {
res = append(res, n)
}
}
// sort results by distance from target dht id
return node.SortClosestPeers(res, targetDhtID)
}
// helper method - a sync wrapper over routingTable.NearestPeers
func (s *swarmImpl) getNearestPeers(dhtID dht.ID, count int) []node.RemoteNodeData {
psoc := make(table.PeersOpChannel, 1)
s.routingTable.NearestPeers(table.NearestPeersReq{ID: dhtID, Count: count, Callback: psoc})
select {
case c := <-psoc:
return c.Peers
}
}