Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bitswap): wantlist overflow handling #629

Merged
merged 17 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ The following emojis are used to highlight certain changes:
### Changed

- `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0)
- `bitswap/client` supports additional tracing
- `bitswap/client` supports additional tracing

### Removed

Expand All @@ -45,6 +45,7 @@ The following emojis are used to highlight certain changes:

- `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found
- `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup
- `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have.

## [v0.20.0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const collectTimeout = 100 * time.Millisecond
const collectTimeout = 200 * time.Millisecond

type fakeMessageNetwork struct {
connectError error
Expand Down
211 changes: 139 additions & 72 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
package decision

import (
"cmp"
"context"
"errors"
"fmt"
"math/bits"
"slices"
"sync"
"time"

"github.com/google/uuid"

wl "github.com/ipfs/boxo/bitswap/client/wantlist"
"github.com/ipfs/boxo/bitswap/internal/defaults"
bsmsg "github.com/ipfs/boxo/bitswap/message"
Expand Down Expand Up @@ -132,9 +133,11 @@
// PeerLedger is an external ledger dealing with peers and their want lists.
type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
gammazero marked this conversation as resolved.
Show resolved Hide resolved
Wants(p peer.ID, e wl.Entry)
// If peer ledger exceed internal limit, then the entry is not added
// and false is returned.
Wants(p peer.ID, e wl.Entry) bool

// CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
Expand Down Expand Up @@ -315,8 +318,11 @@
}
}

// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
// If a peer send us more than this we will truncate newest entries.
// WithMaxQueuedWantlistEntriesPerPeer limits how many individual entries each
// peer is allowed to send. If a peer sends more than this, then the lowest
// priority entries are truncated to this limit. If there is insufficient space
// to enqueue new entries, then older existing wants with no associated blocks,
// and lower priority wants, are canceled to make room for the new wants.
func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option {
return func(e *Engine) {
e.maxQueuedWantlistEntriesPerPeer = count
Expand Down Expand Up @@ -402,7 +408,6 @@
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
self: self,
peerLedger: NewDefaultPeerLedger(),
pendingGauge: bmetrics.PendingEngineGauge(ctx),
activeGauge: bmetrics.ActiveEngineGauge(ctx),
targetMessageSize: defaultTargetMessageSize,
Expand All @@ -416,6 +421,11 @@
opt(e)
}

// If peerLedger was not set by option, then create a default instance.
if e.peerLedger == nil {
e.peerLedger = NewDefaultPeerLedger(e.maxQueuedWantlistEntriesPerPeer)
}

e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx))

// default peer task queue options
Expand Down Expand Up @@ -676,14 +686,12 @@
return false
}

newWorkExists := false
defer func() {
if newWorkExists {
e.signalNewWork()
}
}()

wants, cancels, denials := e.splitWantsCancelsDenials(p, m)
wants, cancels, denials, err := e.splitWantsCancelsDenials(p, m)
if err != nil {
// This is a truely broken client, let's kill the connection.
log.Warnw(err.Error(), "local", e.self, "remote", p)
return true
}

// Get block sizes
wantKs := cid.NewSet()
Expand All @@ -702,56 +710,35 @@
e.peerLedger.ClearPeerWantlist(p)
}

s := uint(e.peerLedger.WantlistSizeForPeer(p))
if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer {
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe)
// truncate wantlist to avoid overflow
available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0)
if o != 0 {
available = 0
var overflow []bsmsg.Entry
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if !e.peerLedger.Wants(p, entry.Entry) {
// Cannot add entry because it would exceed size limit.
overflow = append(overflow, entry)
continue
}
filteredWants = append(filteredWants, entry)
}
wants = wants[:available]
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])
wants = filteredWants
gammazero marked this conversation as resolved.
Show resolved Hide resolved
}

filteredWants := wants[:0] // shift inplace

for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
if len(overflow) != 0 {
log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow))
wants = e.handleOverflow(ctx, p, overflow, wants)
}
// Clear truncated entries - early GC.
clear(wants[len(filteredWants):])

wants = filteredWants
for _, entry := range cancels {
c := entry.Cid
if c.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", c)
if e.peerLedger.CancelWant(p, c) {
e.peerRequestQueue.Remove(c, p)
}
}

e.lock.Unlock()

var activeEntries []peertask.Task
Expand All @@ -761,21 +748,14 @@
// Only add the task to the queue if the requester wants a DONT_HAVE
if e.sendDontHaves && entry.SendDontHave {
c := entry.Cid

newWorkExists = true
isWantBlock := false
if entry.WantType == pb.Message_Wantlist_Block {
isWantBlock = true
}

activeEntries = append(activeEntries, peertask.Task{
Topic: c,
Priority: int(entry.Priority),
Work: bsmsg.BlockPresenceSize(c),
Data: &taskData{
BlockSize: 0,
HaveBlock: false,
IsWantBlock: isWantBlock,
IsWantBlock: entry.WantType == pb.Message_Wantlist_Block,
SendDontHave: entry.SendDontHave,
},
})
Expand All @@ -800,8 +780,6 @@
continue
}
// The block was found, add it to the queue
newWorkExists = true

isWantBlock := e.sendAsBlock(entry.WantType, blockSize)

log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)
Expand All @@ -827,19 +805,96 @@
})
}

// Push entries onto the request queue
if len(activeEntries) > 0 {
// Push entries onto the request queue and signal network that new work is ready.
if len(activeEntries) != 0 {
e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...)
e.updateMetrics()
e.signalNewWork()
}
return false
}

// handleOverflow processes incoming wants that could not be addded to the peer
// ledger without exceeding the peer want limit. These are handled by trying to
// make room by canceling existing wants for which there is no block. If this
// does not make sufficient room, then any lower priority wants that have
// blocks are canceled.
//
// Important: handleOverflwo must be called e.lock is locked.
func (e *Engine) handleOverflow(ctx context.Context, p peer.ID, overflow, wants []bsmsg.Entry) []bsmsg.Entry {
// Sort overflow from most to least important.
slices.SortFunc(overflow, func(a, b bsmsg.Entry) int {
return cmp.Compare(b.Entry.Priority, a.Entry.Priority)
})
// Sort existing wants from least to most important, to try to replace
// lowest priority items first.
existingWants := e.peerLedger.WantlistForPeer(p)
slices.SortFunc(existingWants, func(a, b wl.Entry) int {
return cmp.Compare(b.Priority, a.Priority)
})

queuedWantKs := cid.NewSet()
for _, entry := range existingWants {
queuedWantKs.Add(entry.Cid)
}
queuedBlockSizes, err := e.bsm.getBlockSizes(ctx, queuedWantKs.Keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get what the idea is here and if this is necessary / if we can make this much cheaper

  • Is this meant as "is there a DONT_HAVE response queued up that we should replace".
    • While I get this it might also be overkill, and it might be fine to respect the user priority in responding with DONT_HAVEs, HAVEs, and blocks in the same way.
  • Is this meant as "I previously sent a DONT_HAVE and now this is sitting on my list as a subscription".
    • As discussed this definitely seems like something we should want to knock off our list if out of space

In either case it seems like we could add some extra data to the in-memory structs here rather than going to the blockstore to see if we have the data (and being at the mercy of whatever caching, bloom filters, etc. are used there)

Copy link
Contributor Author

@gammazero gammazero Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that there is a DONT_HAVE message queued for the peer, but it has not been sent yet and is blocking new messages from being queued for the peer. So, cancel the unsent DONT_HAVE and try to enqueue something possibly more important. Either a delayed HAVE message will replace the pending DONT_HAVE, or the peer can ask again later. This should keep messages moving, even if there is some backup sending DONT_HAVE messages to peers.

This also handles the case where a DONT_HAVE message has been sent, but is not removed from the queue. Once a message is sent, the want is removed from the message queue and peer ledger only when blocks have been sent or when block presence has been sent. If a DONT_HAVE was sent the want remains on the queue and peer ledger as a place-holder should a block arrive later, and this is stopping new wants from being accepted. This is what the 5th bullet in #527 is referring to by:

This is because the bitswap server never cleanup entries after sending DONT_HAVE

So, in short, it handles both cases.

it seems like we could add some extra data to the in-memory structs here rather than going to the blockstore.

Yes, the wants for which block is found can be recorded in the peer ledger so that these can be ignored in overflow handling. However, that would need to be done in every call to engine.MessageReceived, and seems less preferable than doing something more expensive only during the exceptional case.

The task queue does already have this info, but this would require locking the tasqueue and the peer tracker for each overflow want CID to look at. Or, this would require a new taskqueue API to get a list of wants with HaveBlock set to true for a given peer. This last option might be less expensive than looking at the blockstore, but I was not comfortable with that amount of new plumbing for handling this bitswap exceptional case. WDYT?

if err != nil {
log.Info("aborting overflow processing", err)
return wants
}

Check warning on line 844 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L842-L844

Added lines #L842 - L844 were not covered by tests

// Remove entries for blocks that are not present to make room for overflow.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to above is this about blocks that aren't present or subscriptions?

Note: the reason I'm pushing on the difference is that from my perspective subscriptions are much more expensive by virtue of occupying memory for an indefinite amount of time rather than a transient "while I'm sending out a response". Not sure if that's enough to justify different lists, but it's how I'm thinking in my review here (but lmk if you disagree or think I'm missing the point).

Copy link
Contributor Author

@gammazero gammazero Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the engine perspective, I do not think there is any need for distinction between subscription and request-response since that I think only determines how long a peer is in the task queue/ledger.

Overall, it probably does make more sense to only do this overflow handling for subscriptions. I was thinking/hoping this would handle itself by subscriptions being the ones primarily affected in the first place and needing to do overflow handling. I think some real-world use is necessary to determine this. I will add logging that can be used to determine when overflow handling is happening.

var removed []int
for i, w := range existingWants {
if _, found := queuedBlockSizes[w.Cid]; !found {
// Cancel lowest priority dont-have.
if e.peerLedger.CancelWant(p, w.Cid) {
e.peerRequestQueue.Remove(w.Cid, p)
}
removed = append(removed, i)
// Pop hoghest priority overflow.
firstOver := overflow[0]
overflow = overflow[1:]
// Add highest priority overflow to wants.
e.peerLedger.Wants(p, firstOver.Entry)
wants = append(wants, firstOver)
if len(overflow) == 0 {
return wants
}
}
}

// Replace existing entries, that are a lower priority, with overflow
// entries.
var replace int
for _, overflowEnt := range overflow {
// Do not compare with removed existingWants entry.
for len(removed) != 0 && replace == removed[0] {
replace++
removed = removed[1:]
}
if overflowEnt.Entry.Priority < existingWants[replace].Priority {
// All overflow entries have too low of priority to replace any
// existing wants.
break
}
entCid := existingWants[replace].Cid
replace++
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
}
e.peerLedger.Wants(p, overflowEnt.Entry)
wants = append(wants, overflowEnt)
}

return wants
}

// Split the want-havek entries from the cancel and deny entries.
func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry) {
func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]bsmsg.Entry, []bsmsg.Entry, []bsmsg.Entry, error) {
entries := m.Wantlist() // creates copy; safe to modify
if len(entries) == 0 {
return nil, nil, nil
return nil, nil, nil, nil
}

log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
Expand All @@ -848,23 +903,35 @@
var cancels, denials []bsmsg.Entry

for _, et := range entries {
c := et.Cid
if e.maxCidSize != 0 && uint(c.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}
if c.Prefix().MhType == mh.IDENTITY {
return nil, nil, nil, errors.New("peer canceled an identity CID")
}

if et.Cancel {
cancels = append(cancels, et)
continue
}

if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
}

if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, et.Cid) {
if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
denials = append(denials, et)
continue
}

wants = append(wants, et)
// Do not take more wants that can be handled.
if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) {
wants = append(wants, et)
}
}

if len(wants) == 0 {
Expand All @@ -874,7 +941,7 @@
// Clear truncated entries.
clear(entries[len(wants):])

return wants, cancels, denials
return wants, cancels, denials, nil
}

// ReceivedBlocks is called when new blocks are received from the network.
Expand Down
Loading
Loading