Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

refactor: remove WantManager #374

Merged
merged 4 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 9 additions & 16 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/internal/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/internal/wantmanager"
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -123,13 +122,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil
})

var wm *bswm.WantManager
// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
var sm *bssm.SessionManager
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
// Simulate a DONT_HAVE message arriving to the WantManager
wm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
// Simulate a message arriving with DONT_HAVEs
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
Expand All @@ -138,25 +137,23 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
}
notif := notifications.New()
sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
wm.SetSessionManager(sm)
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())

bs := &Bitswap{
Expand All @@ -166,7 +163,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm,
pqm: pqm,
sm: sm,
Expand Down Expand Up @@ -207,9 +203,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,

// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

pm *bspm.PeerManager

// the provider query manager manages requests to find providers
Expand Down Expand Up @@ -357,7 +350,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)
Expand Down Expand Up @@ -480,14 +473,14 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
bs.pm.Connected(p)
bs.engine.PeerConnected(p)
}

// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
Binary file modified docs/go-bitswap.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 6 additions & 8 deletions docs/go-bitswap.puml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ node "Sending Blocks" {
[Engine] --> [TaskWorker (workers.go)]
}

node "Requesting Blocks" {
[Bitswap] --* [WantManager]
[WantManager] --> [BlockPresenceManager]
[WantManager] --> [PeerManager]
[PeerManager] --* [MessageQueue]
}

node "Providing" {
[Bitswap] --* [Provide Collector (workers.go)]
[Provide Collector (workers.go)] --* [Provide Worker (workers.go)]
Expand All @@ -31,14 +24,19 @@ node "Sessions (smart requests)" {
[Bitswap] --* [SessionManager]
[SessionManager] --> [SessionInterestManager]
[SessionManager] --o [Session]
[SessionManager] --> [BlockPresenceManager]
[Session] --* [sessionWantSender]
[Session] --* [SessionPeerManager]
[Session] --> [WantManager]
[Session] --> [ProvideQueryManager]
[Session] --* [sessionWants]
[Session] --> [SessionInterestManager]
[sessionWantSender] --> [BlockPresenceManager]
}

node "Requesting Blocks" {
[SessionManager] --> [PeerManager]
[sessionWantSender] --> [PeerManager]
[PeerManager] --* [MessageQueue]
}

node "Network" {
Expand Down
13 changes: 7 additions & 6 deletions docs/how-bitswap-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ When a message is received, Bitswap
So that the Engine can send responses to the wants
- Informs the Engine of any received blocks
So that the Engine can send the received blocks to any peers that want them
- Informs the WantManager of received blocks, HAVEs and DONT_HAVEs
So that the WantManager can inform interested sessions
- Informs the SessionManager of received blocks, HAVEs and DONT_HAVEs
So that the SessionManager can inform interested sessions

When the client makes an API call, Bitswap creates a new Session and calls the corresponding method (eg `GetBlocks()`).

Expand All @@ -101,9 +101,10 @@ The PeerTaskQueue prioritizes tasks such that the peers with the least amount of

### Requesting Blocks

When the WantManager is informed of a new message, it
- informs the SessionManager
The SessionManager informs the Sessions that are interested in the received blocks and wants
When the SessionManager is informed of a new message, it
- informs the BlockPresenceManager
The BlockPresenceManager keeps track of which peers have sent HAVES and DONT_HAVEs for each block
- informs the Sessions that are interested in the received blocks and wants
- informs the PeerManager of received blocks
The PeerManager checks if any wants were send to a peer for the received blocks. If so it sends a `CANCEL` message to those peers.

Expand All @@ -114,7 +115,7 @@ The Session starts in "discovery" mode. This means it doesn't have any peers yet
When the client initially requests blocks from a Session, the Session
- informs the SessionInterestManager that it is interested in the want
- informs the sessionWantManager of the want
- tells the WantManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block
- tells the PeerManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block
- queries the ProviderQueryManager to discover which peers have the block

When the session receives a message with `HAVE` or a `block`, it informs the SessionPeerManager. The SessionPeerManager keeps track of all peers in the session.
Expand Down
68 changes: 42 additions & 26 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@ const (
broadcastLiveWantsLimit = 64
)

// WantManager is an interface that can be used to request blocks
// from given peers.
type WantManager interface {
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, uint64, []cid.Cid)
// RemoveSession removes the session from the WantManager (when the
// session shuts down)
RemoveSession(context.Context, uint64)
}

// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
Expand All @@ -47,6 +36,11 @@ type PeerManager interface {
UnregisterSession(uint64)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid)
}

// SessionPeerManager keeps track of peers in the session
Expand Down Expand Up @@ -97,8 +91,10 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
wm WantManager
bsctx context.Context // context for bitswap
ctx context.Context // context for session
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
providerFinder ProviderFinder
sim *bssim.SessionInterestManager
Expand Down Expand Up @@ -129,9 +125,10 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context,
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
id uint64,
wm WantManager,
sprm SessionPeerManager,
providerFinder ProviderFinder,
sim *bssim.SessionInterestManager,
Expand All @@ -144,8 +141,10 @@ func New(ctx context.Context,
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
wm: wm,
pm: pm,
bpm: bpm,
sprm: sprm,
providerFinder: providerFinder,
sim: sim,
Expand Down Expand Up @@ -301,13 +300,13 @@ func (s *Session) run(ctx context.Context) {
s.sw.WantsSent(oper.keys)
case opBroadcast:
// Broadcast want-haves to all peers
s.broadcastWantHaves(ctx, oper.keys)
s.broadcast(ctx, oper.keys)
default:
panic("unhandled operation")
}
case <-s.idleTick.C:
// The session hasn't received blocks for a while, broadcast
s.broadcastWantHaves(ctx, nil)
s.broadcast(ctx, nil)
case <-s.periodicSearchTimer.C:
// Periodically search for a random live want
s.handlePeriodicSearch(ctx)
Expand All @@ -325,23 +324,23 @@ func (s *Session) run(ctx context.Context) {
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) {
// If this broadcast is because of an idle timeout (we haven't received
// any blocks for a while) then broadcast all pending wants
if wants == nil {
wants = s.sw.PrepareBroadcast()
}

// Broadcast a want-have for the live wants to everyone we're connected to
s.wm.BroadcastWantHaves(ctx, s.id, wants)
s.broadcastWantHaves(ctx, wants)

// do not find providers on consecutive ticks
// -- just rely on periodic search widening
if len(wants) > 0 && (s.consecutiveTicks == 0) {
// Search for providers who have the first want in the list.
// Typically if the provider has the first block they will have
// the rest of the blocks also.
log.Debugf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants))
log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants))
s.findMorePeers(ctx, wants[0])
}
s.resetIdleTick()
Expand All @@ -364,7 +363,7 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
// for new providers for blocks.
s.findMorePeers(ctx, randomWant)

s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
s.broadcastWantHaves(ctx, []cid.Cid{randomWant})

s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}
Expand All @@ -390,8 +389,19 @@ func (s *Session) handleShutdown() {
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id)

// Remove session's interest in the given blocks.
cancelKs := s.sim.RemoveSessionInterest(s.id)

// Free up block presence tracking for keys that no session is interested
// in anymore
s.bpm.RemoveKeys(cancelKs)

// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down Expand Up @@ -439,11 +449,17 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks))
s.broadcastWantHaves(ctx, ks)
}
}

// Send want-haves to all connected peers
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants)
s.pm.BroadcastWantHaves(ctx, wants)
}

// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
Expand Down