Skip to content

Commit

Permalink
bitswap/server: allow overriding peer ledger with WithPeerLedger (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Apr 26, 2024
1 parent eeea414 commit 0f223aa
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 26 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ The following emojis are used to highlight certain changes:
* `NewRemoteCarBackend` allows you to create a gateway backend that uses one or multiple Trustless Gateways as backend. These gateways must support CAR requests (`application/vnd.ipld.car`), as well as the extensions describe in [IPIP-402](https://specs.ipfs.tech/ipips/ipip-0402/). With this, we also introduced `NewCarBackend`, `NewRemoteCarFetcher` and `NewRetryCarFetcher`.
* `gateway` now sets the [`Content-Location`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Location) header for requests with non-default content format, as a result of content negotiation. This allows generic and misconfigured HTTP caches to store Deserialized, CAR and Block responses separately, under distinct cache keys.
* `gateway` now supports `car-dups`, `car-order` and `car-version` as query parameters in addition to the `application/vnd.ipld.car` parameters sent via `Accept` header. The parameters in the `Accept` header have always priority, but including them in URL simplifies HTTP caching and allows use in `Content-Location` header on CAR responses to maximize interoperability with wide array of HTTP caches.

* `bitswap/server` now allows to override the default peer ledger with `WithPeerLedger`.

### Changed

### Removed
Expand Down
4 changes: 4 additions & 0 deletions bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func WithScoreLedger(scoreLedger server.ScoreLedger) Option {
return Option{server.WithScoreLedger(scoreLedger)}
}

func WithPeerLedger(peerLedger server.PeerLedger) Option {
return Option{server.WithPeerLedger(peerLedger)}
}

func WithTargetMessageSize(tms int) Option {
return Option{server.WithTargetMessageSize(tms)}
}
Expand Down
2 changes: 2 additions & 0 deletions bitswap/server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ type (
TaskInfo = decision.TaskInfo
ScoreLedger = decision.ScoreLedger
ScorePeerFunc = decision.ScorePeerFunc
PeerLedger = decision.PeerLedger
PeerEntry = decision.PeerEntry
)
47 changes: 45 additions & 2 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,42 @@ type ScoreLedger interface {
Stop()
}

type PeerEntry struct {
Peer peer.ID
Priority int32
WantType pb.Message_Wantlist_WantType
}

// PeerLedger is an external ledger dealing with peers and their want lists.
type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
Wants(p peer.ID, e wl.Entry)

// CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType)

// Peers returns all peers that want [cid.Cid].
Peers(k cid.Cid) []PeerEntry

// CollectPeerIDs returns all peers that the ledger has an active session with.
CollectPeerIDs() []peer.ID

// WantlistSizeForPeer returns the size of the wantlist for [peer.ID].
WantlistSizeForPeer(p peer.ID) int

// WantlistForPeer returns the wantlist for [peer.ID].
WantlistForPeer(p peer.ID) []wl.Entry

// ClearPeerWantlist clears the wantlist for [peer.ID].
ClearPeerWantlist(p peer.ID)

// PeerDisconnected informs the ledger that [peer.ID] is no longer connected.
PeerDisconnected(p peer.ID)
}

// Engine manages sending requested blocks to peers.
type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
Expand Down Expand Up @@ -150,7 +186,7 @@ type Engine struct {
lock sync.RWMutex // protects the fields immediately below

// peerLedger saves which peers are waiting for a Cid
peerLedger *peerLedger
peerLedger PeerLedger

// an external ledger dealing with peer scores
scoreLedger ScoreLedger
Expand Down Expand Up @@ -240,6 +276,13 @@ func WithScoreLedger(scoreledger ScoreLedger) Option {
}
}

// WithPeerLedger sets a custom [PeerLedger] to be used with this [Engine].
func WithPeerLedger(peerLedger PeerLedger) Option {
return func(e *Engine) {
e.peerLedger = peerLedger
}
}

// WithBlockstoreWorkerCount sets the number of worker threads used for
// blockstore operations in the decision engine
func WithBlockstoreWorkerCount(count int) Option {
Expand Down Expand Up @@ -359,7 +402,7 @@ func newEngine(
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
sendDontHaves: true,
self: self,
peerLedger: newPeerLedger(),
peerLedger: NewDefaultPeerLedger(),
pendingGauge: bmetrics.PendingEngineGauge(ctx),
activeGauge: bmetrics.ActiveEngineGauge(ctx),
targetMessageSize: defaultTargetMessageSize,
Expand Down
43 changes: 20 additions & 23 deletions bitswap/server/internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

type peerLedger struct {
// thoses two maps are inversions of each other
type DefaultPeerLedger struct {
// these two maps are inversions of each other
peers map[peer.ID]map[cid.Cid]entry
cids map[cid.Cid]map[peer.ID]entry
}

func newPeerLedger() *peerLedger {
return &peerLedger{
func NewDefaultPeerLedger() *DefaultPeerLedger {
return &DefaultPeerLedger{
peers: make(map[peer.ID]map[cid.Cid]entry),
cids: make(map[cid.Cid]map[peer.ID]entry),
}
}

func (l *peerLedger) Wants(p peer.ID, e wl.Entry) {
func (l *DefaultPeerLedger) Wants(p peer.ID, e wl.Entry) {
cids, ok := l.peers[p]
if !ok {
cids = make(map[cid.Cid]entry)
Expand All @@ -37,8 +37,7 @@ func (l *peerLedger) Wants(p peer.ID, e wl.Entry) {
m[p] = entry{e.Priority, e.WantType}
}

// CancelWant returns true if the cid was present in the wantlist.
func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
wants, ok := l.peers[p]
if !ok {
return false
Expand All @@ -52,8 +51,7 @@ func (l *peerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return true
}

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
wants, ok := l.peers[p]
if !ok {
return
Expand All @@ -74,7 +72,7 @@ func (l *peerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wan
l.removePeerFromCid(p, k)
}

func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
func (l *DefaultPeerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
m, ok := l.cids[k]
if !ok {
return
Expand All @@ -85,41 +83,40 @@ func (l *peerLedger) removePeerFromCid(p peer.ID, k cid.Cid) {
}
}

type entryForPeer struct {
Peer peer.ID
entry
}

type entry struct {
Priority int32
WantType pb.Message_Wantlist_WantType
}

func (l *peerLedger) Peers(k cid.Cid) []entryForPeer {
func (l *DefaultPeerLedger) Peers(k cid.Cid) []PeerEntry {
m, ok := l.cids[k]
if !ok {
return nil
}
peers := make([]entryForPeer, 0, len(m))
peers := make([]PeerEntry, 0, len(m))
for p, e := range m {
peers = append(peers, entryForPeer{p, e})
peers = append(peers, PeerEntry{
Peer: p,
Priority: e.Priority,
WantType: e.WantType,
})
}
return peers
}

func (l *peerLedger) CollectPeerIDs() []peer.ID {
func (l *DefaultPeerLedger) CollectPeerIDs() []peer.ID {
peers := make([]peer.ID, 0, len(l.peers))
for p := range l.peers {
peers = append(peers, p)
}
return peers
}

func (l *peerLedger) WantlistSizeForPeer(p peer.ID) int {
func (l *DefaultPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return len(l.peers[p])
}

func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
func (l *DefaultPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
cids, ok := l.peers[p]
if !ok {
return nil
Expand All @@ -139,7 +136,7 @@ func (l *peerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
// ClearPeerWantlist does not take an effort to fully erase it from memory.
// This is intended when the peer is still connected and the map capacity could
// be reused. If the memory should be freed use PeerDisconnected instead.
func (l *peerLedger) ClearPeerWantlist(p peer.ID) {
func (l *DefaultPeerLedger) ClearPeerWantlist(p peer.ID) {
cids, ok := l.peers[p]
if !ok {
return
Expand All @@ -150,7 +147,7 @@ func (l *peerLedger) ClearPeerWantlist(p peer.ID) {
}
}

func (l *peerLedger) PeerDisconnected(p peer.ID) {
func (l *DefaultPeerLedger) PeerDisconnected(p peer.ID) {
l.ClearPeerWantlist(p)
delete(l.peers, p)
}
8 changes: 8 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func WithScoreLedger(scoreLedger decision.ScoreLedger) Option {
}
}

// WithPeerLedger configures the engine with a custom [decision.PeerLedger].
func WithPeerLedger(peerLedger decision.PeerLedger) Option {
o := decision.WithPeerLedger(peerLedger)
return func(bs *Server) {
bs.engineOptions = append(bs.engineOptions, o)
}
}

// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
func (bs *Server) LedgerForPeer(p peer.ID) *decision.Receipt {
Expand Down

0 comments on commit 0f223aa

Please sign in to comment.