Skip to content

Commit

Permalink
bitswap/server/internal/decision: add filtering on CIDs
Browse files Browse the repository at this point in the history
- Ignore cids that are too big.
- Kill connection for peers that are using inline CIDs.
  • Loading branch information
Jorropo committed Feb 17, 2023
1 parent 9cb5cb5 commit 62cbac4
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 4 deletions.
6 changes: 6 additions & 0 deletions bitswap/internal/defaults/defaults.go
@@ -1,6 +1,7 @@
package defaults

import (
"encoding/binary"
"time"
)

Expand All @@ -27,4 +28,9 @@ const (

// Maximum size of the wantlist we are willing to keep in memory.
MaxQueuedWantlistEntiresPerPeer = 1024

// Copied from github.com/ipfs/go-verifcid#maximumHashLength
// FIXME: expose this in go-verifcid.
MaximumHashLength = 128
MaximumAllowedCid = binary.MaxVarintLen64*4 + MaximumHashLength
)
2 changes: 1 addition & 1 deletion bitswap/network/ipfs_impl.go
Expand Up @@ -370,7 +370,7 @@ func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
}

func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error {
panic("Not implemented: DisconnectFrom() is only used by tests")
return bsnet.host.Network().ClosePeer(p)
}

// FindProvidersAsync returns a channel of providers for the given key.
Expand Down
6 changes: 6 additions & 0 deletions bitswap/options.go
Expand Up @@ -33,6 +33,12 @@ func MaxQueuedWantlistEntriesPerPeer(count uint) Option {
return Option{server.MaxQueuedWantlistEntriesPerPeer(count)}
}

// MaxCidSize only affects the server.
// If it is 0 no limit is applied.
func MaxCidSize(n uint) Option {
return Option{server.MaxCidSize(n)}
}

func TaskWorkerCount(count int) Option {
return Option{server.TaskWorkerCount(count)}
}
Expand Down
45 changes: 43 additions & 2 deletions bitswap/server/internal/decision/engine.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ipfs/go-peertaskqueue/peertracker"
process "github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p/core/peer"
mh "github.com/multiformats/go-multihash"
)

// TODO consider taking responsibility for other types of requests. For
Expand Down Expand Up @@ -187,6 +188,7 @@ type Engine struct {
maxOutstandingBytesPerPeer int

maxQueuedWantlistEntriesPerPeer uint
maxCidSize uint
}

// TaskInfo represents the details of a request from a peer.
Expand Down Expand Up @@ -272,13 +274,20 @@ func WithMaxOutstandingBytesPerPeer(count int) Option {

// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
// If a peer send us more than this we will truncate newest entries.
// It defaults to DefaultMaxQueuedWantlistEntiresPerPeer.
func WithMaxQueuedWantlistEntriesPerPeer(count uint) Option {
return func(e *Engine) {
e.maxQueuedWantlistEntriesPerPeer = count
}
}

// WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
// If a peer send us more than this we will truncate newest entries.
func WithMaxCidSize(n uint) Option {
return func(e *Engine) {
e.maxCidSize = n
}
}

func WithSetSendDontHave(send bool) Option {
return func(e *Engine) {
e.sendDontHaves = send
Expand Down Expand Up @@ -357,6 +366,7 @@ func newEngine(
tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()),
tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()),
maxQueuedWantlistEntriesPerPeer: defaults.MaxQueuedWantlistEntiresPerPeer,
maxCidSize: defaults.MaximumAllowedCid,
}

for _, opt := range opts {
Expand Down Expand Up @@ -617,7 +627,7 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) (mustKillConnection bool) {
entries := m.Wantlist()

if len(entries) > 0 {
Expand Down Expand Up @@ -676,10 +686,40 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
wants = wants[:available]
}

filteredWants := wants[:0] // shift inplace

for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
}
clear := wants[len(filteredWants):]
for i := range clear {
clear[i] = bsmsg.Entry{} // early GC
}
wants = filteredWants
for _, entry := range cancels {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer canceled an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}

log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
if e.peerLedger.CancelWant(p, entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
Expand Down Expand Up @@ -765,6 +805,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
e.peerRequestQueue.PushTasksTruncated(e.maxQueuedWantlistEntriesPerPeer, p, activeEntries...)
e.updateMetrics()
}
return false
}

// Split the want-have / want-block entries from the cancel entries
Expand Down
16 changes: 15 additions & 1 deletion bitswap/server/server.go
Expand Up @@ -220,6 +220,17 @@ func MaxQueuedWantlistEntriesPerPeer(count uint) Option {
}
}

// MaxCidSize limits how big CIDs we are willing to serve.
// We will ignore CIDs over this limit.
// It defaults to [defaults.MaxCidSize].
// If it is 0 no limit is applied.
func MaxCidSize(n uint) Option {
o := decision.WithMaxCidSize(n)
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, o)
}
}

// HasBlockBufferSize configure how big the new blocks buffer should be.
func HasBlockBufferSize(count int) Option {
if count < 0 {
Expand Down Expand Up @@ -511,7 +522,10 @@ func (bs *Server) provideWorker(px process.Process) {
func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(ctx, p, incoming)
mustKillConnection := bs.engine.MessageReceived(ctx, p, incoming)
if mustKillConnection {
bs.network.DisconnectFrom(ctx, p)
}
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

Expand Down

0 comments on commit 62cbac4

Please sign in to comment.