diff --git a/CHANGELOG.md b/CHANGELOG.md index 69986b36b..29356a8a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,7 +35,7 @@ The following emojis are used to highlight certain changes: ### Changed - `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0) -- `bitswap/client` supports additional tracing +- `bitswap/client` supports additional tracing ### Removed @@ -45,6 +45,7 @@ The following emojis are used to highlight certain changes: - `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found - `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup +- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. ## [v0.20.0] diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 3f6a2f622..e9b8f7c54 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -19,7 +19,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -const collectTimeout = 100 * time.Millisecond +const collectTimeout = 200 * time.Millisecond type fakeMessageNetwork struct { connectError error diff --git a/bitswap/server/internal/decision/engine.go b/bitswap/server/internal/decision/engine.go index 7114a1a41..a40345d8f 100644 --- a/bitswap/server/internal/decision/engine.go +++ b/bitswap/server/internal/decision/engine.go @@ -2,14 +2,15 @@ package decision import ( + "cmp" "context" + "errors" "fmt" - "math/bits" + "slices" "sync" "time" "github.com/google/uuid" - wl "github.com/ipfs/boxo/bitswap/client/wantlist" "github.com/ipfs/boxo/bitswap/internal/defaults" bsmsg "github.com/ipfs/boxo/bitswap/message" @@ -132,9 +133,11 @@ type PeerEntry struct { // PeerLedger is an external ledger dealing with peers and their want lists. type PeerLedger interface { // Wants informs the ledger that [peer.ID] wants [wl.Entry]. - Wants(p peer.ID, e wl.Entry) + // If peer ledger exceed internal limit, then the entry is not added + // and false is returned. + Wants(p peer.ID, e wl.Entry) bool - // CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID]. + // CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID]. CancelWant(p peer.ID, k cid.Cid) bool // CancelWantWithType will not cancel WantBlock if we sent a HAVE message. @@ -315,8 +318,11 @@ func WithMaxOutstandingBytesPerPeer(count int) Option { } } -// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send. -// If a peer send us more than this we will truncate newest entries. +// WithMaxQueuedWantlistEntriesPerPeer limits how many individual entries each +// peer is allowed to send. If a peer sends more than this, then the lowest +// priority entries are truncated to this limit. If there is insufficient space +// to enqueue new entries, then older existing wants with no associated blocks, +// and lower priority wants, are canceled to make room for the new wants. func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option { return func(e *Engine) { e.maxQueuedWantlistEntriesPerPeer = count @@ -402,7 +408,6 @@ func newEngine( taskWorkerCount: defaults.BitswapEngineTaskWorkerCount, sendDontHaves: true, self: self, - peerLedger: NewDefaultPeerLedger(), pendingGauge: bmetrics.PendingEngineGauge(ctx), activeGauge: bmetrics.ActiveEngineGauge(ctx), targetMessageSize: defaultTargetMessageSize, @@ -416,6 +421,11 @@ func newEngine( opt(e) } + // If peerLedger was not set by option, then create a default instance. + if e.peerLedger == nil { + e.peerLedger = NewDefaultPeerLedger(e.maxQueuedWantlistEntriesPerPeer) + } + e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx)) // default peer task queue options @@ -676,14 +686,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap return false } - newWorkExists := false - defer func() { - if newWorkExists { - e.signalNewWork() - } - }() - - wants, cancels, denials := e.splitWantsCancelsDenials(p, m) + wants, cancels, denials, err := e.splitWantsCancelsDenials(p, m) + if err != nil { + // This is a truely broken client, let's kill the connection. + log.Warnw(err.Error(), "local", e.self, "remote", p) + return true + } // Get block sizes wantKs := cid.NewSet() @@ -702,56 +710,35 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap e.peerLedger.ClearPeerWantlist(p) } - s := uint(e.peerLedger.WantlistSizeForPeer(p)) - if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer { - log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe) - // truncate wantlist to avoid overflow - available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0) - if o != 0 { - available = 0 + var overflow []bsmsg.Entry + if len(wants) != 0 { + filteredWants := wants[:0] // shift inplace + for _, entry := range wants { + if !e.peerLedger.Wants(p, entry.Entry) { + // Cannot add entry because it would exceed size limit. + overflow = append(overflow, entry) + continue + } + filteredWants = append(filteredWants, entry) } - wants = wants[:available] + // Clear truncated entries - early GC. + clear(wants[len(filteredWants):]) + wants = filteredWants } - filteredWants := wants[:0] // shift inplace - - for _, entry := range wants { - if entry.Cid.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer wants an identity CID", "local", e.self, "remote", p) - return true - } - if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue - } - - e.peerLedger.Wants(p, entry.Entry) - filteredWants = append(filteredWants, entry) + if len(overflow) != 0 { + log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow)) + wants = e.handleOverflow(ctx, p, overflow, wants) } - // Clear truncated entries - early GC. - clear(wants[len(filteredWants):]) - wants = filteredWants for _, entry := range cancels { c := entry.Cid - if c.Prefix().MhType == mh.IDENTITY { - // This is a truely broken client, let's kill the connection. - e.lock.Unlock() - log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p) - return true - } - if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize { - // Ignore requests about CIDs that big. - continue - } - log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c) if e.peerLedger.CancelWant(p, c) { e.peerRequestQueue.Remove(c, p) } } + e.lock.Unlock() var activeEntries []peertask.Task @@ -761,13 +748,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap // Only add the task to the queue if the requester wants a DONT_HAVE if e.sendDontHaves && entry.SendDontHave { c := entry.Cid - - newWorkExists = true - isWantBlock := false - if entry.WantType == pb.Message_Wantlist_Block { - isWantBlock = true - } - activeEntries = append(activeEntries, peertask.Task{ Topic: c, Priority: int(entry.Priority), @@ -775,7 +755,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap Data: &taskData{ BlockSize: 0, HaveBlock: false, - IsWantBlock: isWantBlock, + IsWantBlock: entry.WantType == pb.Message_Wantlist_Block, SendDontHave: entry.SendDontHave, }, }) @@ -800,8 +780,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap continue } // The block was found, add it to the queue - newWorkExists = true - isWantBlock := e.sendAsBlock(entry.WantType, blockSize) log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock) @@ -827,19 +805,96 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap }) } - // Push entries onto the request queue - if len(activeEntries) > 0 { + // Push entries onto the request queue and signal network that new work is ready. + if len(activeEntries) != 0 { e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...) e.updateMetrics() + e.signalNewWork() } return false } +// handleOverflow processes incoming wants that could not be addded to the peer +// ledger without exceeding the peer want limit. These are handled by trying to +// make room by canceling existing wants for which there is no block. If this +// does not make sufficient room, then any lower priority wants that have +// blocks are canceled. +// +// Important: handleOverflwo must be called e.lock is locked. +func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, overflow, wants []bsmsg.Entry) []bsmsg.Entry { + // Sort overflow from most to least important. + slices.SortFunc(overflow, func(a, b bsmsg.Entry) int { + return cmp.Compare(b.Entry.Priority, a.Entry.Priority) + }) + // Sort existing wants from least to most important, to try to replace + // lowest priority items first. + existingWants := e.peerLedger.WantlistForPeer(p) + slices.SortFunc(existingWants, func(a, b wl.Entry) int { + return cmp.Compare(b.Priority, a.Priority) + }) + + queuedWantKs := cid.NewSet() + for _, entry := range existingWants { + queuedWantKs.Add(entry.Cid) + } + queuedBlockSizes, err := e.bsm.getBlockSizes(ctx, queuedWantKs.Keys()) + if err != nil { + log.Info("aborting overflow processing", err) + return wants + } + + // Remove entries for blocks that are not present to make room for overflow. + var removed []int + for i, w := range existingWants { + if _, found := queuedBlockSizes[w.Cid]; !found { + // Cancel lowest priority dont-have. + if e.peerLedger.CancelWant(p, w.Cid) { + e.peerRequestQueue.Remove(w.Cid, p) + } + removed = append(removed, i) + // Pop hoghest priority overflow. + firstOver := overflow[0] + overflow = overflow[1:] + // Add highest priority overflow to wants. + e.peerLedger.Wants(p, firstOver.Entry) + wants = append(wants, firstOver) + if len(overflow) == 0 { + return wants + } + } + } + + // Replace existing entries, that are a lower priority, with overflow + // entries. + var replace int + for _, overflowEnt := range overflow { + // Do not compare with removed existingWants entry. + for len(removed) != 0 && replace == removed[0] { + replace++ + removed = removed[1:] + } + if overflowEnt.Entry.Priority < existingWants[replace].Priority { + // All overflow entries have too low of priority to replace any + // existing wants. + break + } + entCid := existingWants[replace].Cid + replace++ + if e.peerLedger.CancelWant(p, entCid) { + e.peerRequestQueue.Remove(entCid, p) + } + e.peerLedger.Wants(p, overflowEnt.Entry) + wants = append(wants, overflowEnt) + } + + return wants +} + // Split the want-havek entries from the cancel and deny entries. -func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry) { +func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) { entries := m.Wantlist() // creates copy; safe to modify if len(entries) == 0 { - return nil, nil, nil + return nil, nil, nil, nil } log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries)) @@ -848,23 +903,35 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] var cancels, denials []bsmsg.Entry for _, et := range entries { + c := et.Cid + if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize { + // Ignore requests about CIDs that big. + continue + } + if c.Prefix().MhType == mh.IDENTITY { + return nil, nil, nil, errors.New("peer canceled an identity CID") + } + if et.Cancel { cancels = append(cancels, et) continue } if et.WantType == pb.Message_Wantlist_Have { - log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid) + log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c) } else { - log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid) + log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c) } - if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, et.Cid) { + if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) { denials = append(denials, et) continue } - wants = append(wants, et) + // Do not take more wants that can be handled. + if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) { + wants = append(wants, et) + } } if len(wants) == 0 { @@ -874,7 +941,7 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([] // Clear truncated entries. clear(entries[len(wants):]) - return wants, cancels, denials + return wants, cancels, denials, nil } // ReceivedBlocks is called when new blocks are received from the network. diff --git a/bitswap/server/internal/decision/engine_test.go b/bitswap/server/internal/decision/engine_test.go index c25e3508d..b83342302 100644 --- a/bitswap/server/internal/decision/engine_test.go +++ b/bitswap/server/internal/decision/engine_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/benbjohnson/clock" + wl "github.com/ipfs/boxo/bitswap/client/wantlist" "github.com/ipfs/boxo/bitswap/internal/testutil" message "github.com/ipfs/boxo/bitswap/message" pb "github.com/ipfs/boxo/bitswap/message/pb" @@ -1733,3 +1734,235 @@ func TestKillConnectionForInlineCid(t *testing.T) { t.Fatal("connection was not killed when receiving inline in cancel") } } + +func TestWantlistBlocked(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + // Generate a set of blocks that the server has. + haveCids := make([]cid.Cid, limit) + var blockNum int + for blockNum < limit { + block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) + if blockNum != 0 { // do not put first block in blockstore. + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } + } + haveCids[blockNum] = block.Cid() + blockNum++ + } + + fpt := &fakePeerTagger{} + e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + warsaw := engineSet{ + Peer: peer.ID("warsaw"), + PeerTagger: fpt, + Blockstore: bs, + Engine: e, + } + riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + m := message.New(false) + dontHaveCids := make([]cid.Cid, limit) + for i := 0; i < limit; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + dontHaveCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + // Check that all the dontHave wants are on the wantlist. + for _, c := range dontHaveCids { + if !findCid(c, wl) { + t.Fatal("Expected all dontHaveCids to be on wantlist") + } + } + t.Log("All", len(wl), "dont-have CIDs are on wantlist") + + m = message.New(false) + for _, c := range haveCids { + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + // Check that all the dontHave wants are on the wantlist. + for _, c := range haveCids { + if !findCid(c, wl) { + t.Fatal("Missing expected want. Expected all haveCids to be on wantlist") + } + } + t.Log("All", len(wl), "new have CIDs are now on wantlist") + + m = message.New(false) + for i := 0; i < limit; i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 1, pb.Message_Wantlist_Block, true) + dontHaveCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + // Check that all the new dontHave wants are not on the wantlist. + for _, c := range dontHaveCids { + if findCid(c, wl) { + t.Fatal("No new dontHaveCids should be on wantlist") + } + } + t.Log("All", len(wl), "new dont-have CIDs are not on wantlist") +} + +func TestWantlistOverflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const limit = 32 + + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + + origCids := make([]cid.Cid, limit) + var blockNum int + m := message.New(false) + for blockNum < limit { + block := blocks.NewBlock([]byte(fmt.Sprint(blockNum))) + if blockNum != 0 { // do not put first block in blockstore. + if err := bs.Put(context.Background(), block); err != nil { + t.Fatal(err) + } + } + m.AddEntry(block.Cid(), 1, pb.Message_Wantlist_Block, true) + origCids[blockNum] = block.Cid() + blockNum++ + } + + fpt := &fakePeerTagger{} + e := newEngineForTesting(ctx, bs, fpt, "localhost", 0, WithScoreLedger(NewTestScoreLedger(shortTerm, nil, clock.New())), WithBlockstoreWorkerCount(4), WithMaxQueuedWantlistEntriesPerPeer(limit)) + e.StartWorkers(ctx, process.WithTeardown(func() error { return nil })) + warsaw := engineSet{ + Peer: peer.ID("warsaw"), + PeerTagger: fpt, + Blockstore: bs, + Engine: e, + } + riga := newTestEngine(ctx, "riga") + if warsaw.Peer == riga.Peer { + t.Fatal("Sanity Check: Peers have same Key!") + } + + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + // Check that the wantlist is at the size limit. + wl := warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + t.Log("Sent message with", limit, "medium-priority wants and", limit-1, "have blocks present") + + m = message.New(false) + lowPrioCids := make([]cid.Cid, 5) + for i := 0; i < cap(lowPrioCids); i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + lowPrioCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + // Check that one low priority entry is on the wantlist, since there is one + // existing entry without a blocks and none at a lower priority. + var count int + for _, c := range lowPrioCids { + if findCid(c, wl) { + count++ + } + } + if count != 1 { + t.Fatal("Expected 1 low priority entry on wantlist, found", count) + } + t.Log("Sent message with", len(lowPrioCids), "low-priority wants. One accepted as replacement for existig want without block.") + + m = message.New(false) + highPrioCids := make([]cid.Cid, 5) + for i := 0; i < cap(highPrioCids); i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 10, pb.Message_Wantlist_Block, true) + highPrioCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + // Check that all high priority entries are all on wantlist, since there + // were existing entries with lower priority. + for _, c := range highPrioCids { + if !findCid(c, wl) { + t.Fatal("expected high priority entry on wantlist") + } + } + t.Log("Sent message with", len(highPrioCids), "high-priority wants. All accepted replacing wants without block or low priority.") + + // These new wants should overflow and some of them should replace existing + // wants that do not have blocks (the high-priority weants from the + // previous message). + m = message.New(false) + blockCids := make([]cid.Cid, len(highPrioCids)+2) + for i := 0; i < cap(blockCids); i++ { + c := blocks.NewBlock([]byte(fmt.Sprint(blockNum))).Cid() + blockNum++ + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + blockCids[i] = c + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + if len(wl) != limit { + t.Fatal("wantlist size", len(wl), "does not match limit", limit) + } + + count = 0 + for _, c := range blockCids { + if findCid(c, wl) { + count++ + } + } + if count != len(highPrioCids) { + t.Fatal("expected", len(highPrioCids), "of the new blocks, found", count) + } + t.Log("Sent message with", len(blockCids), "low-priority wants.", count, "accepted replacing wants without blocks from previous message") + + // Send the original wants. Some should replace the existing wants that do + // not have blocks associated, and the rest should overwrite the existing + // ones. + m = message.New(false) + for _, c := range origCids { + m.AddEntry(c, 0, pb.Message_Wantlist_Block, true) + } + warsaw.Engine.MessageReceived(ctx, riga.Peer, m) + wl = warsaw.Engine.WantlistForPeer(riga.Peer) + for _, c := range origCids { + if !findCid(c, wl) { + t.Fatal("missing low-priority original wants to overwrite existing") + } + } + t.Log("Sent message with", len(origCids), "original wants at low priority. All accepted overwriting existing wants.") +} + +func findCid(c cid.Cid, wantList []wl.Entry) bool { + for i := range wantList { + if wantList[i].Cid == c { + return true + } + } + return false +} diff --git a/bitswap/server/internal/decision/peer_ledger.go b/bitswap/server/internal/decision/peer_ledger.go index b79db226d..227e50de1 100644 --- a/bitswap/server/internal/decision/peer_ledger.go +++ b/bitswap/server/internal/decision/peer_ledger.go @@ -12,20 +12,31 @@ type DefaultPeerLedger struct { // these two maps are inversions of each other peers map[peer.ID]map[cid.Cid]entry cids map[cid.Cid]map[peer.ID]entry + // value 0 mean no limit + maxEntriesPerPeer int } -func NewDefaultPeerLedger() *DefaultPeerLedger { +func NewDefaultPeerLedger(maxEntriesPerPeer uint) *DefaultPeerLedger { return &DefaultPeerLedger{ peers: make(map[peer.ID]map[cid.Cid]entry), cids: make(map[cid.Cid]map[peer.ID]entry), + + maxEntriesPerPeer: int(maxEntriesPerPeer), } } -func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { +// Wants adds an entry to the peer ledger. If adding the entry would make the +// peer ledger exceed the maxEntriesPerPeer limit, then the entry is not added +// and false is returned. +func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) bool { cids, ok := l.peers[p] if !ok { cids = make(map[cid.Cid]entry) l.peers[p] = cids + } else if l.maxEntriesPerPeer != 0 && len(cids) == l.maxEntriesPerPeer { + if _, ok = cids[e.Cid]; !ok { + return false // cannot add to peer ledger + } } cids[e.Cid] = entry{e.Priority, e.WantType} @@ -35,6 +46,8 @@ func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) { l.cids[e.Cid] = m } m[p] = entry{e.Priority, e.WantType} + + return true } func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { @@ -42,13 +55,14 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool { if !ok { return false } + _, had := wants[k] delete(wants, k) if len(wants) == 0 { delete(l.peers, p) } l.removePeerFromCid(p, k) - return true + return had } func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) { diff --git a/out b/out new file mode 100644 index 000000000..e69de29bb