Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #261 from ipfs/fix/prune-session-peers
Browse files Browse the repository at this point in the history
Prune peers that send too many consecutive DONT_HAVEs
  • Loading branch information
Stebalien committed Feb 14, 2020
2 parents f4c9702 + 4d2bdc2 commit 9ddd13e
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 10 deletions.
52 changes: 42 additions & 10 deletions internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
)

// Maximum number of changes to accept before blocking
const changesBufferSize = 128
const (
// Maximum number of changes to accept before blocking
changesBufferSize = 128
// If the session receives this many DONT_HAVEs in a row from a peer,
// it prunes the peer from the session
peerDontHaveLimit = 16
)

// BlockPresence indicates whether a peer has a block.
// Note that the order is important, we decide which peer to send a want to
Expand Down Expand Up @@ -76,13 +81,14 @@ type sessionWantSender struct {
changes chan change
// Information about each want indexed by CID
wants map[cid.Cid]*wantInfo
// Keeps track of how many consecutive DONT_HAVEs a peer has sent
peerConsecutiveDontHaves map[peer.ID]int
// Tracks which peers we have send want-block to
swbt *sentWantBlocksTracker
// Maintains a list of peers and whether they are connected
peerAvlMgr *peerAvailabilityManager
// Tracks the number of blocks each peer sent us
peerRspTrkr *peerResponseTracker

// Sends wants to peers
pm PeerManager
// Keeps track of which peer has / doesn't have a block
Expand All @@ -97,13 +103,14 @@ func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, bpm *
onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

spm := sessionWantSender{
ctx: ctx,
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
swbt: newSentWantBlocksTracker(),
peerAvlMgr: newPeerAvailabilityManager(),
peerRspTrkr: newPeerResponseTracker(),
ctx: ctx,
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
peerConsecutiveDontHaves: make(map[peer.ID]int),
swbt: newSentWantBlocksTracker(),
peerAvlMgr: newPeerAvailabilityManager(),
peerRspTrkr: newPeerResponseTracker(),

pm: pm,
bpm: bpm,
Expand Down Expand Up @@ -258,13 +265,22 @@ func (spm *sessionWantSender) processAvailability(availability map[peer.ID]bool)
if isNowAvailable {
newlyAvailable = append(newlyAvailable, p)
}
// Reset the count of consecutive DONT_HAVEs received from the
// peer
delete(spm.peerConsecutiveDontHaves, p)
}
}
}

return newlyAvailable
}

// isAvailable indicates whether the peer is available and whether
// it's been tracked by the Session (used by the tests)
func (spm *sessionWantSender) isAvailable(p peer.ID) (bool, bool) {
return spm.peerAvlMgr.isAvailable(p)
}

// trackWant creates a new entry in the map of CID -> want info
func (spm *sessionWantSender) trackWant(c cid.Cid) {
// fmt.Printf("trackWant %s\n", lu.C(c))
Expand All @@ -285,6 +301,7 @@ func (spm *sessionWantSender) trackWant(c cid.Cid) {

// processUpdates processes incoming blocks and HAVE / DONT_HAVEs
func (spm *sessionWantSender) processUpdates(updates []update) {
prunePeers := make(map[peer.ID]struct{})
dontHaves := cid.NewSet()
for _, upd := range updates {
// TODO: If there is a timeout for the want from the peer, remove want.sentTo
Expand All @@ -308,12 +325,20 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
spm.setWantSentTo(c, "")
}
}

// Track the number of consecutive DONT_HAVEs each peer receives
if spm.peerConsecutiveDontHaves[upd.from] == peerDontHaveLimit {
prunePeers[upd.from] = struct{}{}
} else {
spm.peerConsecutiveDontHaves[upd.from]++
}
}

// For each HAVE
for _, c := range upd.haves {
// Update the block presence for the peer
spm.updateWantBlockPresence(c, upd.from)
delete(spm.peerConsecutiveDontHaves, upd.from)
}

// For each received block
Expand All @@ -325,6 +350,7 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
// us the block
spm.peerRspTrkr.receivedBlockFrom(upd.from)
}
delete(spm.peerConsecutiveDontHaves, upd.from)
}
}

Expand All @@ -337,6 +363,12 @@ func (spm *sessionWantSender) processUpdates(updates []update) {
spm.onPeersExhausted(newlyExhausted)
}
}

// If any peers have sent us too many consecutive DONT_HAVEs, remove them
// from the session
for p := range prunePeers {
spm.SignalAvailability(p, false)
}
}

// convenience structs for passing around want-blocks and want-haves for a peer
Expand Down
195 changes: 195 additions & 0 deletions internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,3 +346,198 @@ func TestPeersExhausted(t *testing.T) {
t.Fatal("Wrong keys")
}
}

func TestConsecutiveDontHaveLimit(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)

go spm.Run()

// Add all cids as wants
spm.Add(cids)

// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that do not exceed limit
for _, c := range cids[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
}

func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)

go spm.Run()

// Add all cids as wants
spm.Add(cids)

// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVE then HAVE then DONT_HAVE from peer,
// where consecutive DONT_HAVEs would have exceeded limit
// (but they are not consecutive)
for _, c := range cids[1:peerDontHaveLimit] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}
for _, c := range cids[peerDontHaveLimit : peerDontHaveLimit+1] {
// HAVEs
bpm.ReceiveFrom(p, []cid.Cid{c}, []cid.Cid{})
spm.Update(p, []cid.Cid{}, []cid.Cid{c}, []cid.Cid{}, false)
}
for _, c := range cids[peerDontHaveLimit+1:] {
// DONT_HAVEs
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}
}

func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
cids := testutil.GenerateCids(peerDontHaveLimit + 10)
p := testutil.GeneratePeers(1)[0]
sid := uint64(1)
pm := newMockPeerManager()
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, bpm, onSend, onPeersExhausted)

go spm.Run()

// Add all cids as wants
spm.Add(cids)

// Receive a HAVE from peer (adds it to the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids[1 : peerDontHaveLimit+2] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}

// Receive a HAVE from peer (adds it back into the session)
bpm.ReceiveFrom(p, cids[:1], []cid.Cid{})
spm.Update(p, []cid.Cid{}, cids[:1], []cid.Cid{}, true)

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

cids2 := testutil.GenerateCids(peerDontHaveLimit + 10)

// Receive DONT_HAVEs from peer that don't exceed limit
for _, c := range cids2[1:peerDontHaveLimit] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Peer should be available
if avail, ok := spm.isAvailable(p); !ok || !avail {
t.Fatal("Expected peer to be available")
}

// Receive DONT_HAVEs from peer that exceed limit
for _, c := range cids2[peerDontHaveLimit:] {
bpm.ReceiveFrom(p, []cid.Cid{}, []cid.Cid{c})
spm.Update(p, []cid.Cid{}, []cid.Cid{}, []cid.Cid{c}, false)
}

// Wait for processing to complete
time.Sleep(5 * time.Millisecond)

// Session should remove peer
if avail, _ := spm.isAvailable(p); avail {
t.Fatal("Expected peer not to be available")
}
}

0 comments on commit 9ddd13e

Please sign in to comment.