generated from ipfs/ipfs-repository-template
/
peerwantmanager.go
464 lines (393 loc) · 12 KB
/
peerwantmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
package peermanager
import (
"bytes"
"fmt"
cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
)
// Gauge can be used to keep track of a metric that increases and decreases
// incrementally. It is used by the peerWantManager to track the number of
// want-blocks that are active (ie sent but no response received)
type Gauge interface {
Inc()
Dec()
}
// peerWantManager keeps track of which want-haves and want-blocks have been
// sent to each peer, so that the PeerManager doesn't send duplicates.
type peerWantManager struct {
// peerWants maps peers to outstanding wants.
// A peer's wants is the _union_ of the broadcast wants and the wants in
// this list.
peerWants map[peer.ID]*peerWant
// Reverse index of all wants in peerWants.
wantPeers map[cid.Cid]map[peer.ID]struct{}
// broadcastWants tracks all the current broadcast wants.
broadcastWants *cid.Set
// Keeps track of the number of active want-haves & want-blocks
wantGauge Gauge
// Keeps track of the number of active want-blocks
wantBlockGauge Gauge
}
type peerWant struct {
wantBlocks *cid.Set
wantHaves *cid.Set
peerQueue PeerQueue
}
// New creates a new peerWantManager with a Gauge that keeps track of the
// number of active want-blocks (ie sent but no response received)
func newPeerWantManager(wantGauge Gauge, wantBlockGauge Gauge) *peerWantManager {
return &peerWantManager{
broadcastWants: cid.NewSet(),
peerWants: make(map[peer.ID]*peerWant),
wantPeers: make(map[cid.Cid]map[peer.ID]struct{}),
wantGauge: wantGauge,
wantBlockGauge: wantBlockGauge,
}
}
// addPeer adds a peer whose wants we need to keep track of. It sends the
// current list of broadcast wants to the peer.
func (pwm *peerWantManager) addPeer(peerQueue PeerQueue, p peer.ID) {
if _, ok := pwm.peerWants[p]; ok {
return
}
pwm.peerWants[p] = &peerWant{
wantBlocks: cid.NewSet(),
wantHaves: cid.NewSet(),
peerQueue: peerQueue,
}
// Broadcast any live want-haves to the newly connected peer
if pwm.broadcastWants.Len() > 0 {
wants := pwm.broadcastWants.Keys()
peerQueue.AddBroadcastWantHaves(wants)
}
}
// RemovePeer removes a peer and its associated wants from tracking
func (pwm *peerWantManager) removePeer(p peer.ID) {
pws, ok := pwm.peerWants[p]
if !ok {
return
}
// Clean up want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Clean up want-blocks from the reverse index
pwm.reverseIndexRemove(c, p)
// Decrement the gauges by the number of pending want-blocks to the peer
peerCounts := pwm.wantPeerCounts(c)
if peerCounts.wantBlock == 0 {
pwm.wantBlockGauge.Dec()
}
if !peerCounts.wanted() {
pwm.wantGauge.Dec()
}
return nil
})
// Clean up want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Clean up want-haves from the reverse index
pwm.reverseIndexRemove(c, p)
// Decrement the gauge by the number of pending want-haves to the peer
peerCounts := pwm.wantPeerCounts(c)
if !peerCounts.wanted() {
pwm.wantGauge.Dec()
}
return nil
})
delete(pwm.peerWants, p)
}
// broadcastWantHaves sends want-haves to any peers that have not yet been sent them.
func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
unsent := make([]cid.Cid, 0, len(wantHaves))
for _, c := range wantHaves {
if pwm.broadcastWants.Has(c) {
// Already a broadcast want, skip it.
continue
}
pwm.broadcastWants.Add(c)
unsent = append(unsent, c)
// If no peer has a pending want for the key
if _, ok := pwm.wantPeers[c]; !ok {
// Increment the total wants gauge
pwm.wantGauge.Inc()
}
}
if len(unsent) == 0 {
return
}
// Allocate a single buffer to filter broadcast wants for each peer
bcstWantsBuffer := make([]cid.Cid, 0, len(unsent))
// Send broadcast wants to each peer
for _, pws := range pwm.peerWants {
peerUnsent := bcstWantsBuffer[:0]
for _, c := range unsent {
// If we've already sent a want to this peer, skip them.
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
peerUnsent = append(peerUnsent, c)
}
}
if len(peerUnsent) > 0 {
pws.peerQueue.AddBroadcastWantHaves(peerUnsent)
}
}
}
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))
// Get the existing want-blocks and want-haves for the peer
pws, ok := pwm.peerWants[p]
if !ok {
// In practice this should never happen
log.Errorf("sendWants() called with peer %s but peer not found in peerWantManager", string(p))
return
}
// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
if pws.wantBlocks.Has(c) {
continue
}
// Increment the want gauges
peerCounts := pwm.wantPeerCounts(c)
if peerCounts.wantBlock == 0 {
pwm.wantBlockGauge.Inc()
}
if !peerCounts.wanted() {
pwm.wantGauge.Inc()
}
// Make sure the CID is no longer recorded as a want-have
pws.wantHaves.Remove(c)
// Record that the CID was sent as a want-block
pws.wantBlocks.Add(c)
// Add the CID to the results
fltWantBlks = append(fltWantBlks, c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
}
// Iterate over the requested want-haves
for _, c := range wantHaves {
// If we've already broadcasted this want, don't bother with a
// want-have.
if pwm.broadcastWants.Has(c) {
continue
}
// If the CID has not been sent as a want-block or want-have
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
// Increment the total wants gauge
peerCounts := pwm.wantPeerCounts(c)
if !peerCounts.wanted() {
pwm.wantGauge.Inc()
}
// Record that the CID was sent as a want-have
pws.wantHaves.Add(c)
// Add the CID to the results
fltWantHvs = append(fltWantHvs, c)
// Update the reverse index
pwm.reverseIndexAdd(c, p)
}
}
// Send the want-blocks and want-haves to the peer
pws.peerQueue.AddWants(fltWantBlks, fltWantHvs)
}
// sendCancels sends a cancel to each peer to which a corresponding want was
// sent
func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) {
if len(cancelKs) == 0 {
return
}
// Record how many peers have a pending want-block and want-have for each
// key to be cancelled
peerCounts := make(map[cid.Cid]wantPeerCnts, len(cancelKs))
for _, c := range cancelKs {
peerCounts[c] = pwm.wantPeerCounts(c)
}
// Create a buffer to use for filtering cancels per peer, with the
// broadcast wants at the front of the buffer (broadcast wants are sent to
// all peers)
broadcastCancels := make([]cid.Cid, 0, len(cancelKs))
for _, c := range cancelKs {
if pwm.broadcastWants.Has(c) {
broadcastCancels = append(broadcastCancels, c)
}
}
// Send cancels to a particular peer
send := func(p peer.ID, pws *peerWant) {
// Start from the broadcast cancels
toCancel := broadcastCancels
// For each key to be cancelled
for _, c := range cancelKs {
// Check if a want was sent for the key
if !pws.wantBlocks.Has(c) && !pws.wantHaves.Has(c) {
continue
}
// Unconditionally remove from the want lists.
pws.wantBlocks.Remove(c)
pws.wantHaves.Remove(c)
// If it's a broadcast want, we've already added it to
// the peer cancels.
if !pwm.broadcastWants.Has(c) {
toCancel = append(toCancel, c)
}
}
// Send cancels to the peer
if len(toCancel) > 0 {
pws.peerQueue.AddCancels(toCancel)
}
}
if len(broadcastCancels) > 0 {
// If a broadcast want is being cancelled, send the cancel to all
// peers
for p, pws := range pwm.peerWants {
send(p, pws)
}
} else {
// Only send cancels to peers that received a corresponding want
cancelPeers := make(map[peer.ID]struct{}, len(pwm.wantPeers[cancelKs[0]]))
for _, c := range cancelKs {
for p := range pwm.wantPeers[c] {
cancelPeers[p] = struct{}{}
}
}
for p := range cancelPeers {
pws, ok := pwm.peerWants[p]
if !ok {
// Should never happen but check just in case
log.Errorf("sendCancels - peerWantManager index missing peer %s", p)
continue
}
send(p, pws)
}
}
// Decrement the wants gauges
for _, c := range cancelKs {
peerCnts := peerCounts[c]
// If there were any peers that had a pending want-block for the key
if peerCnts.wantBlock > 0 {
// Decrement the want-block gauge
pwm.wantBlockGauge.Dec()
}
// If there was a peer that had a pending want or it was a broadcast want
if peerCnts.wanted() {
// Decrement the total wants gauge
pwm.wantGauge.Dec()
}
}
// Remove cancelled broadcast wants
for _, c := range broadcastCancels {
pwm.broadcastWants.Remove(c)
}
// Batch-remove the reverse-index. There's no need to clear this index
// peer-by-peer.
for _, c := range cancelKs {
delete(pwm.wantPeers, c)
}
}
// wantPeerCnts stores the number of peers that have pending wants for a CID
type wantPeerCnts struct {
// number of peers that have a pending want-block for the CID
wantBlock int
// number of peers that have a pending want-have for the CID
wantHave int
// whether the CID is a broadcast want
isBroadcast bool
}
// wanted returns true if any peer wants the CID or it's a broadcast want
func (pwm *wantPeerCnts) wanted() bool {
return pwm.wantBlock > 0 || pwm.wantHave > 0 || pwm.isBroadcast
}
// wantPeerCounts counts how many peers have a pending want-block and want-have
// for the given CID
func (pwm *peerWantManager) wantPeerCounts(c cid.Cid) wantPeerCnts {
blockCount := 0
haveCount := 0
for p := range pwm.wantPeers[c] {
pws, ok := pwm.peerWants[p]
if !ok {
log.Errorf("reverse index has extra peer %s for key %s in peerWantManager", string(p), c)
continue
}
if pws.wantBlocks.Has(c) {
blockCount++
} else if pws.wantHaves.Has(c) {
haveCount++
}
}
return wantPeerCnts{blockCount, haveCount, pwm.broadcastWants.Has(c)}
}
// Add the peer to the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexAdd(c cid.Cid, p peer.ID) bool {
peers, ok := pwm.wantPeers[c]
if !ok {
peers = make(map[peer.ID]struct{}, 10)
pwm.wantPeers[c] = peers
}
peers[p] = struct{}{}
return !ok
}
// Remove the peer from the list of peers that have sent a want with the cid
func (pwm *peerWantManager) reverseIndexRemove(c cid.Cid, p peer.ID) {
if peers, ok := pwm.wantPeers[c]; ok {
delete(peers, p)
if len(peers) == 0 {
delete(pwm.wantPeers, c)
}
}
}
// GetWantBlocks returns the set of all want-blocks sent to all peers
func (pwm *peerWantManager) getWantBlocks() []cid.Cid {
res := cid.NewSet()
// Iterate over all known peers
for _, pws := range pwm.peerWants {
// Iterate over all want-blocks
_ = pws.wantBlocks.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
return nil
})
}
return res.Keys()
}
// GetWantHaves returns the set of all want-haves sent to all peers
func (pwm *peerWantManager) getWantHaves() []cid.Cid {
res := cid.NewSet()
// Iterate over all peers with active wants.
for _, pws := range pwm.peerWants {
// Iterate over all want-haves
_ = pws.wantHaves.ForEach(func(c cid.Cid) error {
// Add the CID to the results
res.Add(c)
return nil
})
}
_ = pwm.broadcastWants.ForEach(func(c cid.Cid) error {
res.Add(c)
return nil
})
return res.Keys()
}
// GetWants returns the set of all wants (both want-blocks and want-haves).
func (pwm *peerWantManager) getWants() []cid.Cid {
res := pwm.broadcastWants.Keys()
// Iterate over all targeted wants, removing ones that are also in the
// broadcast list.
for c := range pwm.wantPeers {
if pwm.broadcastWants.Has(c) {
continue
}
res = append(res, c)
}
return res
}
func (pwm *peerWantManager) String() string {
var b bytes.Buffer
for p, ws := range pwm.peerWants {
b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
for _, c := range ws.wantHaves.Keys() {
b.WriteString(fmt.Sprintf(" want-have %s\n", c))
}
for _, c := range ws.wantBlocks.Keys() {
b.WriteString(fmt.Sprintf(" want-block %s\n", c))
}
}
return b.String()
}