Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Prune peers that send too many consecutive DONT_HAVEs #261

Merged
merged 1 commit into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 42 additions & 10 deletions internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

// Maximum number of changes to accept before blocking
const changesBufferSize = 128
const (
// Maximum number of changes to accept before blocking
changesBufferSize = 128
// If the session receives this many DONT_HAVEs in a row from a peer,
// it prunes the peer from the session
peerDontHaveLimit = 16
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider dropping this to something really small for now (4?).

)

// BlockPresence indicates whether a peer has a block.
// Note that the order is important, we decide which peer to send a want to
Expand Down Expand Up @@ -76,13 +81,14 @@ type sessionWantSender struct {
changes chan change
// Information about each want indexed by CID
wants map[cid.Cid]*wantInfo
// Keeps track of how many consecutive DONT_HAVEs a peer has sent
peerConsecutiveDontHaves map[peer.ID]int
// Tracks which peers we have send want-block to
swbt *sentWantBlocksTracker
// Maintains a list of peers and whether they are connected
peerAvlMgr *peerAvailabilityManager
// Tracks the number of blocks each peer sent us
peerRspTrkr *peerResponseTracker

// Sends wants to peers
pm PeerManager
// Keeps track of which peer has / doesn't have a block
Expand All @@ -97,13 +103,14 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, bpm *
onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

spm := sessionWantSender{
ctx: ctx,
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
swbt: newSentWantBlocksTracker(),
peerAvlMgr: newPeerAvailabilityManager(),
peerRspTrkr: newPeerResponseTracker(),
ctx: ctx,
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
peerConsecutiveDontHaves: make(map[peer.ID]int),
swbt: newSentWantBlocksTracker(),
peerAvlMgr: newPeerAvailabilityManager(),
peerRspTrkr: newPeerResponseTracker(),

pm: pm,
bpm: bpm,
Expand Down Expand Up @@ -258,13 +265,22 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool)
if isNowAvailable {
newlyAvailable = append(newlyAvailable, p)
}
// Reset the count of consecutive DONT_HAVEs received from the
// peer
delete(spm.peerConsecutiveDontHaves, p)
}
}
}

return newlyAvailable
}

// isAvailable indicates whether the peer is available and whether
// it's been tracked by the Session (used by the tests)
func (spm *sessionWantSender) isAvailable(p peer.ID) (bool, bool) {
return spm.peerAvlMgr.isAvailable(p)
}

// trackWant creates a new entry in the map of CID -> want info
func (spm *sessionWantSender) trackWant(c cid.Cid) {
// fmt.Printf("trackWant %s\n", lu.C(c))
Expand All @@ -285,6 +301,7 @@ func (spm *sessionWantSender) trackWant(c cid.Cid) {

// processUpdates processes incoming blocks and HAVE / DONT_HAVEs
func (spm *sessionWantSender) processUpdates(updates []update) {
prunePeers := make(map[peer.ID]struct{})
dontHaves := cid.NewSet()
for _, upd := range updates {
// TODO: If there is a timeout for the want from the peer, remove want.sentTo
Expand All @@ -308,12 +325,20 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
spm.setWantSentTo(c, "")
}
}

// Track the number of consecutive DONT_HAVEs each peer receives
if spm.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
prunePeers[upd.from] = struct{}{}
} else {
spm.peerConsecutiveDontHaves[upd.from]++
}
}

// For each HAVE
for _, c := range upd.haves {
// Update the block presence for the peer
spm.updateWantBlockPresence(c, upd.from)
delete(spm.peerConsecutiveDontHaves, upd.from)
}

// For each received block
Expand All @@ -325,6 +350,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
// us the block
spm.peerRspTrkr.receivedBlockFrom(upd.from)
}
delete(spm.peerConsecutiveDontHaves, upd.from)
}
}

Expand All @@ -337,6 +363,12 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
spm.onPeersExhausted(newlyExhausted)
}
}

// If any peers have sent us too many consecutive DONT_HAVEs, remove them
// from the session
for p := range prunePeers {
spm.SignalAvailability(p, false)
}
}

// convenience structs for passing around want-blocks and want-haves for a peer
Expand Down
195 changes: 195 additions & 0 deletions internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,198 @@ func TestPeersExhausted(t *testing.T) {
t.Fatal("Wrong keys")
}
}

func TestConsecutiveDontHaveLimit(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)

go spm.Run()

// Add all cids as wants
spm.Add(cids)

// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that do not exceed limit
for _, c := range cids[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
}

func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)

go spm.Run()

// Add all cids as wants
spm.Add(cids)

// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVE then HAVE then DONT_HAVE from peer,
// where consecutive DONT_HAVEs would have exceeded limit
// (but they are not consecutive)
for _, c := range cids[1:peerDontHaveLimit] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
for _, c := range cids[peerDontHaveLimit : peerDontHaveLimit+1] {
// HAVEs
bpm.ReceiveFrom(p, []cid.Cid{c}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}, false)
}
for _, c := range cids[peerDontHaveLimit+1:] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
}

func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)

go spm.Run()

// Add all cids as wants
spm.Add(cids)

// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[1 : peerDontHaveLimit+2] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}

// Receive a HAVE from peer (adds it back into the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

cids2 := testutil.GenerateCids(peerDontHaveLimit + 10)

// Receive DONT_HAVEs from peer that don't exceed limit
for _, c := range cids2[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids2[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
}