Skip to content
Permalink
0fa397581c
Switch branches/tags
Go to file
 
 
Cannot retrieve contributors at this time
729 lines (620 sloc) 21.4 KB
// Package decision implements the decision engine for the bitswap service.
package decision
import (
"context"
"fmt"
"sync"
"time"
"github.com/google/uuid"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
wl "github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
process "github.com/jbenet/goprocess"
peer "github.com/libp2p/go-libp2p-core/peer"
)
// TODO consider taking responsibility for other types of requests. For
// example, there could be a |cancelQueue| for all of the cancellation
// messages that need to go out. There could also be a |wantlistQueue| for
// the local peer's wantlists. Alternatively, these could all be bundled
// into a single, intelligent global queue that efficiently
// batches/combines and takes all of these into consideration.
//
// Right now, messages go onto the network for four reasons:
// 1. an initial `sendwantlist` message to a provider of the first key in a
// request
// 2. a periodic full sweep of `sendwantlist` messages to all providers
// 3. upon receipt of blocks, a `cancel` message to all peers
// 4. draining the priority queue of `blockrequests` from peers
//
// Presently, only `blockrequests` are handled by the decision engine.
// However, there is an opportunity to give it more responsibility! If the
// decision engine is given responsibility for all of the others, it can
// intelligently decide how to combine requests efficiently.
//
// Some examples of what would be possible:
//
// * when sending out the wantlists, include `cancel` requests
// * when handling `blockrequests`, include `sendwantlist` and `cancel` as
// appropriate
// * when handling `cancel`, if we recently received a wanted block from a
// peer, include a partial wantlist that contains a few other high priority
// blocks
//
// In a sense, if we treat the decision engine as a black box, it could do
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).
var log = logging.Logger("engine")
const (
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
// targetMessageSize is the ideal size of the batched payload. We try to
// pop this much data off the request queue, but it may be a little more
// or less depending on what's in the queue.
targetMessageSize = 16 * 1024
// tagFormat is the tag given to peers associated an engine
tagFormat = "bs-engine-%s-%s"
// queuedTagWeight is the default weight for peers that have work queued
// on their behalf.
queuedTagWeight = 10
// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024
// Number of concurrent workers that pull tasks off the request queue
taskWorkerCount = 8
)
// Envelope contains a message for a Peer.
type Envelope struct {
// Peer is the intended recipient.
Peer peer.ID
// Message is the payload.
Message bsmsg.BitSwapMessage
// A callback to notify the decision queue that the task is complete
Sent func()
}
// PeerTagger covers the methods on the connection manager used by the decision
// engine to tag peers
type PeerTagger interface {
TagPeer(peer.ID, string, int)
UntagPeer(p peer.ID, tag string)
}
// Assigns a specific score to a peer
type ScorePeerFunc func(peer.ID, int)
// ScoreLedger is an external ledger dealing with peer scores.
type ScoreLedger interface {
// Returns aggregated data communication with a given peer.
GetReceipt(p peer.ID) *Receipt
// Increments the sent counter for the given peer.
AddToSentBytes(p peer.ID, n int)
// Increments the received counter for the given peer.
AddToReceivedBytes(p peer.ID, n int)
// PeerConnected should be called when a new peer connects,
// meaning the ledger should open accounting.
PeerConnected(p peer.ID)
// PeerDisconnected should be called when a peer disconnects to
// clean up the accounting.
PeerDisconnected(p peer.ID)
// Starts the ledger sampling process.
Start(scorePeer ScorePeerFunc)
// Stops the sampling process.
Stop()
}
// Engine manages sending requested blocks to peers.
type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
// Requests are popped from the queue, packaged up, and placed in the
// outbox.
peerRequestQueue *peertaskqueue.PeerTaskQueue
// FIXME it's a bit odd for the client and the worker to both share memory
// (both modify the peerRequestQueue) and also to communicate over the
// workSignal channel. consider sending requests over the channel and
// allowing the worker to have exclusive access to the peerRequestQueue. In
// that case, no lock would be required.
workSignal chan struct{}
// outbox contains outgoing messages to peers. This is owned by the
// taskWorker goroutine
outbox chan (<-chan *Envelope)
bsm *blockstoreManager
peerTagger PeerTagger
tagQueued, tagUseful string
lock sync.RWMutex // protects the fields immediatly below
// ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
// an external ledger dealing with peer scores
scoreLedger ScoreLedger
ticker *time.Ticker
taskWorkerLock sync.Mutex
taskWorkerCount int
// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
sendDontHaves bool
self peer.ID
}
// NewEngine creates a new block sending engine for the given block store
func NewEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID, scoreLedger ScoreLedger) *Engine {
return newEngine(bs, bstoreWorkerCount, peerTagger, self, maxBlockSizeReplaceHasWithBlock, scoreLedger)
}
// This constructor is used by the tests
func newEngine(bs bstore.Blockstore, bstoreWorkerCount int, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, scoreLedger ScoreLedger) *Engine {
if scoreLedger == nil {
scoreLedger = NewDefaultScoreLedger()
}
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(bs, bstoreWorkerCount),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(
peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
peertaskqueue.TaskMerger(newTaskMerger()),
peertaskqueue.IgnoreFreezing(true))
return e
}
// SetSendDontHaves indicates what to do when the engine receives a want-block
// for a block that is not in the blockstore. Either
// - Send a DONT_HAVE message
// - Simply don't respond
// Older versions of Bitswap did not respond, so this allows us to simulate
// those older versions for testing.
func (e *Engine) SetSendDontHaves(send bool) {
e.sendDontHaves = send
}
// Starts the score ledger. Before start the function checks and,
// if it is unset, initializes the scoreLedger with the default
// implementation.
func (e *Engine) startScoreLedger(px process.Process) {
e.scoreLedger.Start(func(p peer.ID, score int) {
if score == 0 {
e.peerTagger.UntagPeer(p, e.tagUseful)
} else {
e.peerTagger.TagPeer(p, e.tagUseful, score)
}
})
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.scoreLedger.Stop()
})
}
// Start up workers to handle requests from other nodes for the data on this node
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// Start up blockstore manager
e.bsm.start(px)
e.startScoreLedger(px)
for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(px process.Process) {
e.taskWorker(ctx)
})
}
}
func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}
func (e *Engine) onPeerRemoved(p peer.ID) {
e.peerTagger.UntagPeer(p, e.tagQueued)
}
// WantlistForPeer returns the list of keys that the given peer has asked for
func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
partner := e.findOrCreate(p)
partner.lk.Lock()
entries := partner.wantList.Entries()
partner.lk.Unlock()
wl.SortEntries(entries)
return entries
}
// LedgerForPeer returns aggregated data communication with a given peer.
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
return e.scoreLedger.GetReceipt(p)
}
// Each taskWorker pulls items off the request queue up to the maximum size
// and adds them to an envelope that is passed off to the bitswap workers,
// which send the message to the network.
func (e *Engine) taskWorker(ctx context.Context) {
defer e.taskWorkerExit()
for {
oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
select {
case <-ctx.Done():
return
case e.outbox <- oneTimeUse:
}
// receiver is ready for an outoing envelope. let's prepare one. first,
// we must acquire a task from the PQ...
envelope, err := e.nextEnvelope(ctx)
if err != nil {
close(oneTimeUse)
return // ctx cancelled
}
oneTimeUse <- envelope // buffered. won't block
close(oneTimeUse)
}
}
// taskWorkerExit handles cleanup of task workers
func (e *Engine) taskWorkerExit() {
e.taskWorkerLock.Lock()
defer e.taskWorkerLock.Unlock()
e.taskWorkerCount--
if e.taskWorkerCount == 0 {
close(e.outbox)
}
}
// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
// Pop some tasks off the request queue
p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
for len(nextTasks) == 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.workSignal:
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
case <-e.ticker.C:
// When a task is cancelled, the queue may be "frozen" for a
// period of time. We periodically "thaw" the queue to make
// sure it doesn't get stuck in a frozen state.
e.peerRequestQueue.ThawRound()
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
}
}
// Create a new message
msg := bsmsg.New(false)
log.Debugw("Bitswap process tasks", "local", e.self, "taskCount", len(nextTasks))
// Amount of data in the request queue still waiting to be popped
msg.SetPendingBytes(int32(pendingBytes))
// Split out want-blocks, want-haves and DONT_HAVEs
blockCids := make([]cid.Cid, 0, len(nextTasks))
blockTasks := make(map[cid.Cid]*taskData, len(nextTasks))
for _, t := range nextTasks {
c := t.Topic.(cid.Cid)
td := t.Data.(*taskData)
if td.HaveBlock {
if td.IsWantBlock {
blockCids = append(blockCids, c)
blockTasks[c] = td
} else {
// Add HAVES to the message
msg.AddHave(c)
}
} else {
// Add DONT_HAVEs to the message
msg.AddDontHave(c)
}
}
// Fetch blocks from datastore
blks, err := e.bsm.getBlocks(ctx, blockCids)
if err != nil {
// we're dropping the envelope but that's not an issue in practice.
return nil, err
}
for c, t := range blockTasks {
blk := blks[c]
// If the block was not found (it has been removed)
if blk == nil {
// If the client requested DONT_HAVE, add DONT_HAVE to the message
if t.SendDontHave {
msg.AddDontHave(c)
}
} else {
// Add the block to the message
// log.Debugf(" make evlp %s->%s block: %s (%d bytes)", e.self, p, c, len(blk.RawData()))
msg.AddBlock(blk)
}
}
// If there's nothing in the message, bail out
if msg.Empty() {
e.peerRequestQueue.TasksDone(p, nextTasks...)
continue
}
log.Debugw("Bitswap engine -> msg", "local", e.self, "to", p, "blockCount", len(msg.Blocks()), "presenceCount", len(msg.BlockPresences()), "size", msg.Size())
return &Envelope{
Peer: p,
Message: msg,
Sent: func() {
// Once the message has been sent, signal the request queue so
// it can be cleared from the queue
e.peerRequestQueue.TasksDone(p, nextTasks...)
// Signal the worker to check for more work
e.signalNewWork()
},
}, nil
}
}
// Outbox returns a channel of one-time use Envelope channels.
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
return e.outbox
}
// Peers returns a slice of Peers with whom the local node has active sessions.
func (e *Engine) Peers() []peer.ID {
e.lock.RLock()
defer e.lock.RUnlock()
response := make([]peer.ID, 0, len(e.ledgerMap))
for _, ledger := range e.ledgerMap {
response = append(response, ledger.Partner)
}
return response
}
// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
entries := m.Wantlist()
if len(entries) > 0 {
log.Debugw("Bitswap engine <- msg", "local", e.self, "from", p, "entryCount", len(entries))
for _, et := range entries {
if !et.Cancel {
if et.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", et.Cid)
} else {
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", et.Cid)
}
}
}
}
if m.Empty() {
log.Infof("received empty message from %s", p)
}
newWorkExists := false
defer func() {
if newWorkExists {
e.signalNewWork()
}
}()
// Get block sizes
wants, cancels := e.splitWantsCancels(entries)
wantKs := cid.NewSet()
for _, entry := range wants {
wantKs.Add(entry.Cid)
}
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
if err != nil {
log.Info("aborting message processing", err)
return
}
// Get the ledger for the peer
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
// If the peer sent a full wantlist, replace the ledger's wantlist
if m.Full() {
l.wantList = wl.New()
}
var activeEntries []peertask.Task
// Remove cancelled blocks from the queue
for _, entry := range cancels {
log.Debugw("Bitswap engine <- cancel", "local", e.self, "from", p, "cid", entry.Cid)
if l.CancelWant(entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
}
}
// For each want-have / want-block
for _, entry := range wants {
c := entry.Cid
blockSize, found := blockSizes[entry.Cid]
// Add each want-have / want-block to the ledger
l.Wants(c, entry.Priority, entry.WantType)
// If the block was not found
if !found {
log.Debugw("Bitswap engine: block not found", "local", e.self, "from", p, "cid", entry.Cid, "sendDontHave", entry.SendDontHave)
// Only add the task to the queue if the requester wants a DONT_HAVE
if e.sendDontHaves && entry.SendDontHave {
newWorkExists = true
isWantBlock := false
if entry.WantType == pb.Message_Wantlist_Block {
isWantBlock = true
}
activeEntries = append(activeEntries, peertask.Task{
Topic: c,
Priority: int(entry.Priority),
Work: bsmsg.BlockPresenceSize(c),
Data: &taskData{
BlockSize: 0,
HaveBlock: false,
IsWantBlock: isWantBlock,
SendDontHave: entry.SendDontHave,
},
})
}
} else {
// The block was found, add it to the queue
newWorkExists = true
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)
log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", entry.Cid, "isWantBlock", isWantBlock)
// entrySize is the amount of space the entry takes up in the
// message we send to the recipient. If we're sending a block, the
// entrySize is the size of the block. Otherwise it's the size of
// a block presence entry.
entrySize := blockSize
if !isWantBlock {
entrySize = bsmsg.BlockPresenceSize(c)
}
activeEntries = append(activeEntries, peertask.Task{
Topic: c,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
BlockSize: blockSize,
HaveBlock: true,
IsWantBlock: isWantBlock,
SendDontHave: entry.SendDontHave,
},
})
}
}
// Push entries onto the request queue
if len(activeEntries) > 0 {
e.peerRequestQueue.PushTasks(p, activeEntries...)
}
}
// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
wants := make([]bsmsg.Entry, 0, len(es))
cancels := make([]bsmsg.Entry, 0, len(es))
for _, et := range es {
if et.Cancel {
cancels = append(cancels, et)
} else {
wants = append(wants, et)
}
}
return wants, cancels
}
// ReceiveFrom is called when new blocks are received and added to the block
// store, meaning there may be peers who want those blocks, so we should send
// the blocks to them.
//
// This function also updates the receive side of the ledger.
func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid) {
if len(blks) == 0 {
return
}
if from != "" {
l := e.findOrCreate(from)
l.lk.Lock()
// Record how many bytes were received in the ledger
for _, blk := range blks {
log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData()))
}
l.lk.Unlock()
}
// Get the size of each block
blockSizes := make(map[cid.Cid]int, len(blks))
for _, blk := range blks {
blockSizes[blk.Cid()] = len(blk.RawData())
}
// Check each peer to see if it wants one of the blocks we received
work := false
e.lock.RLock()
for _, l := range e.ledgerMap {
l.lk.RLock()
for _, b := range blks {
k := b.Cid()
if entry, ok := l.WantListContains(k); ok {
work = true
blockSize := blockSizes[k]
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)
entrySize := blockSize
if !isWantBlock {
entrySize = bsmsg.BlockPresenceSize(k)
}
e.peerRequestQueue.PushTasks(l.Partner, peertask.Task{
Topic: entry.Cid,
Priority: int(entry.Priority),
Work: entrySize,
Data: &taskData{
BlockSize: blockSize,
HaveBlock: true,
IsWantBlock: isWantBlock,
SendDontHave: false,
},
})
}
}
l.lk.RUnlock()
}
e.lock.RUnlock()
if work {
e.signalNewWork()
}
}
// TODO add contents of m.WantList() to my local wantlist? NB: could introduce
// race conditions where I send a message, but MessageSent gets handled after
// MessageReceived. The information in the local wantlist could become
// inconsistent. Would need to ensure that Sends and acknowledgement of the
// send happen atomically
// MessageSent is called when a message has successfully been sent out, to record
// changes.
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
l := e.findOrCreate(p)
l.lk.Lock()
defer l.lk.Unlock()
// Remove sent blocks from the want list for the peer
for _, block := range m.Blocks() {
e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData()))
l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
}
// Remove sent block presences from the want list for the peer
for _, bp := range m.BlockPresences() {
// Don't record sent data. We reserve that for data blocks.
if bp.Type == pb.Message_Have {
l.wantList.RemoveType(bp.Cid, pb.Message_Wantlist_Have)
}
}
}
// PeerConnected is called when a new peer connects, meaning we should start
// sending blocks.
func (e *Engine) PeerConnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
_, ok := e.ledgerMap[p]
if !ok {
e.ledgerMap[p] = newLedger(p)
}
e.scoreLedger.PeerConnected(p)
}
// PeerDisconnected is called when a peer disconnects.
func (e *Engine) PeerDisconnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
delete(e.ledgerMap, p)
e.scoreLedger.PeerDisconnected(p)
}
// If the want is a want-have, and it's below a certain size, send the full
// block (instead of sending a HAVE)
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
isWantBlock := wantType == pb.Message_Wantlist_Block
return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
}
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
return e.LedgerForPeer(p).Sent
}
func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
return e.LedgerForPeer(p).Recv
}
// ledger lazily instantiates a ledger
func (e *Engine) findOrCreate(p peer.ID) *ledger {
// Take a read lock (as it's less expensive) to check if we have a ledger
// for the peer
e.lock.RLock()
l, ok := e.ledgerMap[p]
e.lock.RUnlock()
if ok {
return l
}
// There's no ledger, so take a write lock, then check again and create the
// ledger if necessary
e.lock.Lock()
defer e.lock.Unlock()
l, ok = e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
}
return l
}
func (e *Engine) signalNewWork() {
// Signal task generation to restart (if stopped!)
select {
case e.workSignal <- struct{}{}:
default:
}
}