Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bitswap should send multiple blocks per message #4378

Closed
whyrusleeping opened this issue Nov 9, 2017 · 13 comments
Closed

Bitswap should send multiple blocks per message #4378

whyrusleeping opened this issue Nov 9, 2017 · 13 comments
Labels
help wanted Seeking public contribution on this issue

Comments

@whyrusleeping
Copy link
Member

If i'm fetching a large dataset from some other peer, they will currently only send me one block per message. Even with concurrent request pipelining this is a bit inefficient. We should set things up to be able to pack multiple blocks in a given message, especially if they are smaller indirect blocks.

This will be even more important when we move towards using selectors to fetch things in bitswap.

@whyrusleeping whyrusleeping added the help wanted Seeking public contribution on this issue label Nov 9, 2017
@whyrusleeping
Copy link
Member Author

while we're on the topic of making bitswap faster:
merkledag.FetchGraph used to use GetBlocks to fetch an entire set of child links at a time, now it only fetches one node at a time, then feeds its links back into the machine. We should probably change this to do more concurrent fetching via GetBlocks again. (but avoid storing nodes in the todo list, keep it as just a set of cids).

@whyrusleeping
Copy link
Member Author

Just going to dump perf thoughts/observations here:

  • recommended usage of bitswap sessions is to tell it about as many hashes as possible as soon as you know about them and let it schedule and rate limit things. Need to rework merkledag fetch commands to do this. (really, FetchGraph needs to just be logic inside of bitswap, but that will come later)
  • bitswap sessions should write all the blocks they receive in a batch to optimize for disk hits.
  • wantlist limit of 16 might be suboptimal for some scenarios

@whyrusleeping
Copy link
Member Author

Good approach might be to deprecate the top level GetBlock(s) calls, we can make them simply create a new session under the hood. then in receiveBlockFrom we can skip putting the new block to the datastore and just pass it back to whatever session is interested in it, the session could then have a size/time bounded coalescer to write blocks to disk all at once (and maybe even concurrently!)

@kvm2116
Copy link
Contributor

kvm2116 commented Mar 20, 2018

I would like to work on improving the performance of bitswap. I did a simple directory transfer between 2 nodes in a private network and found rsync (7 seconds) did much better(more than factor of 2x) than ipfs (16 seconds) #4838

@whyrusleeping do you think the performance improvement suggestions still hold good? Any low hanging fruits that I could start working on right away?

@whyrusleeping
Copy link
Member Author

@kvm2116 most of these still hold, yes. I would first try making sure you use the new datastore badgerds on your ipfs nodes for benchmarking performance, it will likely change your results a bit. You can set this up by ipfs init --profile=badgerds.

If you do start hacking on some of these perf issues, I would ask that you please try to make your changes as small and clean and incremental as possible. And it would also help if you write up your approach making a given change in an issue for feedback before submitting a PR. We get a lot of PRs that are really big, hard to review, and contain changes that werent discussed by the team beforehand and it makes the whole process go really slow.

I need to write this into the contributing guidelines really.

As for low hanging fruit...
I wouldnt say that any of these changes are particularly low hanging. They require some moderate amount of refactoring to do properly. The easiest change to make that could have an impact would be to make it possible for the user to change their bitswap sessions wantlist limit (default 16). I'm not sure how exactly to wire that through (if i was, i'd just implement it already ;) ), it would likely be something passed in via a context value for now.

@taylormike
Copy link

@whyrusleeping
Just a thought:
I am thinking of changing the peerRequestTask to hold multiple wantlist.Entry(s).
This would allow Bitswap to send multiple blocks in bulk per a Peer's request. This also requires a change to the Envelope payload type to hold multiple blocks (perhaps a bsmsg.BitSwapMessage?)

Question:
Is this a reasonable approach?
Does this require further discussion? I am happy to submit a pull request and/ or share more details and discuss further.

type peerRequestTask struct {
	Entries 	[]*wantlist.Entry
	Priority	int
	Target 		peer.ID

	// A callback to signal that this task has been completed
	Done 			func()

	// created marks the time that the task was added to the queue
	created 	time.Time
	index   	int // book-keeping field used by the pq container
}

// Envelope contains a message for a Peer
type Envelope struct {
	// Peer is the intended recipient
	Peer peer.ID

	// Message is the payload
	Message bsmsg.BitSwapMessage

	// A callback to notify the decision queue that the task is complete
	Sent func()
}

func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
         //etc.....
	for {
		nextTask := e.peerRequestQueue.Pop()
                
		// with a task in hand, we're ready to prepare the envelope...
		msg := bsmsg.New(true)
		for _, entry := range nextTask.Entries {
			block, err := e.bs.Get(entry.Cid)
			if err != nil {
				log.Errorf("tried to execute a task and errored fetching block: %s", err)
				continue
			}
			msg.AddBlock(block)
		}

		return &Envelope{
			Peer:  nextTask.Target,
			Message: msg,
			Sent: func() {
				nextTask.Done()
				select {
				case e.workSignal <- struct{}{}:
					// work completing may mean that our queue will provide new
					// work to be done.
				default:
				}
			},
		}, nil
	}
}

// Push currently adds a new peerRequestTask to the end of the list
func (tl *prq) Push(entries []*wantlist.Entry, to peer.ID) {
         //etc.....
	task := &peerRequestTask{
		Entries:   newEntries,
		Target:  to,
		created: time.Now(),
		Done: func() {
			tl.lock.Lock()
			for _, entry := range newEntries {
				partner.TaskDone(entry.Cid)
			}
			tl.pQueue.Update(partner.Index())
			tl.lock.Unlock()
		},
	}

	partner.taskQueue.Push(task)
	tl.taskMap[task.Key()] = task
	partner.requests++
	tl.pQueue.Update(partner.Index())
}

func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
	idmap := logging.LoggableMap{"ID": id}
	defer log.Debug("bitswap task worker shutting down...")
	for {
		log.Event(ctx, "Bitswap.TaskWorker.Loop", idmap)
		select {
		case nextEnvelope := <-bs.engine.Outbox():
			select {
			case envelope, ok := <-nextEnvelope:
				if !ok {
					continue
				}
				// update the BS ledger to reflect sent message
				// TODO: Should only track *useful* messages in ledger
				outgoing := bsmsg.New(false)
				for _, block := range envelope.Message.Blocks() {
					log.Event(ctx, "Bitswap.TaskWorker.Work", logging.LoggableF(func() map[string]interface{} {
						return logging.LoggableMap{
							"ID":     id,
							"Target": envelope.Peer.Pretty(),
							"Block":  block.Cid().String(),
						}
					}))
					outgoing.AddBlock(block)
				}
				bs.engine.MessageSent(envelope.Peer, outgoing)
				bs.wm.SendBlocks(ctx, envelope)
				bs.counterLk.Lock()
				for _, block := range envelope.Message.Blocks() {
					bs.counters.blocksSent++
					bs.counters.dataSent += uint64(len(block.RawData()))
				}
				bs.counterLk.Unlock()
			case <-ctx.Done():
				return
			}
		case <-ctx.Done():
			return
		}
	}
}

@whyrusleeping
Copy link
Member Author

@taylormike That does seem like a pretty reasonable approach, i think thats roughly what I was envisioning. I'm curious how you're going to select multiple blocks for the envelope (not in a doubting way), we need some sort of metric for how many blocks to put in a given message. That could just be 'up to 500kb per message' or something.

@dignifiedquire
Copy link
Member

That could just be 'up to 500kb per message'

This is actually what we are doing in js-ipfs-bitswap. We aggregate blocks until we hit 512 * 1024 bytes, and then send the message out.

@taylormike
Copy link

@whyrusleeping
Thank you for your time. I was thinking of having the number of blocks selected for the envelope be based on the number of peerRequestTask.wantlist.Entry(s). This would allow Bitswap to send multiple blocks in bulk per a Peer's request. However... like you said this creates issues if the size of this message is > 512kb.

Solution: (See SendBlocks below)
I was thinking of adding the '512kb per message' check/split logic prior to the transmission of the Envelope's payload in WantManager.SendBlocks. This would allow SendBlocks to handle envelope payloads greater than 512kb accordingly. Thoughts?

const maxMessageSize = 512 * 1024
// etc...

func (pm *WantManager) SendBlocks(ctx context.Context, env *engine.Envelope) {
	// Blocks need to be sent synchronously to maintain proper backpressure
	// throughout the network stack
	defer env.Sent()

	blockSize := 0
	msgSize := 0
	msg := bsmsg.New(false)
	for _, block := range env.Message.Blocks() {
		blockSize = len(block.RawData())
		// Split into messages of max 512 * 1024 bytes.
		if msgSize + blockSize > maxMessageSize {
			err := pm.network.SendMessage(ctx, env.Peer, msg)
			if err != nil {
				log.Infof("sendblock error: %s", err)
			}
			// Reset message
			msgSize = 0
			msg = bsmsg.New(false)
		}

		msgSize += blockSize
		pm.sentHistogram.Observe(float64(blockSize))

		msg.AddBlock(block)
		log.Infof("Sending block %s to %s", block, env.Peer)
	}

	// Send remaining blocks
	err := pm.network.SendMessage(ctx, env.Peer, msg)
	if err != nil {
		log.Infof("sendblock error: %s", err)
	}
}

func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
         //etc.....
	for {
		nextTask := e.peerRequestQueue.Pop()
                
		// with a task in hand, we're ready to prepare the envelope...
		msg := bsmsg.New(true)
		for _, entry := range nextTask.Entries {
			block, err := e.bs.Get(entry.Cid)
			if err != nil {
				log.Errorf("tried to execute a task and errored fetching block: %s", err)
				continue
			}
			msg.AddBlock(block)
		}

		return &Envelope{
			Peer:  nextTask.Target,
			Message: msg,
			Sent: func() {
				nextTask.Done()
				select {
				case e.workSignal <- struct{}{}:
					// work completing may mean that our queue will provide new
					// work to be done.
				default:
				}
			},
		}, nil
	}
}

@whyrusleeping
Copy link
Member Author

@taylormike sorry for the delay, took some time off.

I think this is close, but not quite the right approach. This approach means that a peer requesting a large amount of data would end up taking up an entire worker to send them all the blocks they requested. Each task unit should be roughly the same size in order for the task scheduler to work correctly.

Your next envelope function looks right, we should either modify places where we call peerRequestQueue.Push to be smarter about coalescing blocks for a certain peer into fixed size tasks, or we should change the peerRequestQueue to do that coalescing (that option might not be good as it produces potentially unexpected behavior where q.Push() might not affect q.Len() )

@taylormike
Copy link

@whyrusleeping
I agree. This approach would unfortunately consume an entire worker when a peer requests a large amount of data.
To correct this problem I think it makes sense to modify places where we call peerRequestQueue.Push to be smarter about coalescing blocks for a certain peer into fixed size tasks.

Solution:
I was thinking of coalescing blocks for a certain peer into fixed size tasks based on the number of 'Entry.Cid(s)'.

e.peerRequestQueue.Push(activeEntries, p) //TODO: Coalesce/ Split 'activeEntries'
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
	if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
		log.Debugf("received empty message from %s", p)
	}

	newWorkExists := false
	defer func() {
		if newWorkExists {
			e.signalNewWork()
		}
	}()

	l := e.findOrCreate(p)
	l.lk.Lock()
	defer l.lk.Unlock()
	if m.Full() {
		l.wantList = wl.New()
	}
	var activeEntries []*wl.Entry
	for _, entry := range m.Wantlist() { //TODO: Coalesce/ Split
		if entry.Cancel {
			log.Debugf("%s cancel %s", p, entry.Cid)
			l.CancelWant(entry.Cid)
			e.peerRequestQueue.Remove(entry.Cid, p)
		} else {
			log.Debugf("wants %s - %d", entry.Cid, entry.Priority)
			l.Wants(entry.Cid, entry.Priority)
			if exists, err := e.bs.Has(entry.Cid); err == nil && exists {
				activeEntries = append(activeEntries, entry.Entry)
				newWorkExists = true
			}
		}
	}
	if len(activeEntries) > 0 {
		e.peerRequestQueue.Push(activeEntries, p) //TODO: Coalesce/ Split 'activeEntries'
	}
	for _, block := range m.Blocks() {
		log.Debugf("got block %s %d bytes", block, len(block.RawData()))
		l.ReceivedBytes(len(block.RawData()))
	}
	return nil
}

@taylormike
Copy link

@whyrusleeping
I need feedback on one more thing...
My original plan was to coalesce blocks into fixed size tasks based on the number of 'Entry.Cid(s)'.

Instead...
I'm thinking I can coalesce blocks based on the 'Entry.Cid(s)' block size. This would more precisely help ensure that each task unit is roughly the same size.

Solution:
Given that the coalescing would occur where we call peerRequestQueue.Push I think we need a performant way to map from Cid to BlockSize. I believe this would require a change to the BlockStore interface. Stat(*cid.Cid) (uint, error)

Question:
Is this a reasonable improvement to my approach?

type Blockstore interface {
	DeleteBlock(*cid.Cid) error
	Has(*cid.Cid) (bool, error)

	Get(*cid.Cid) (blocks.Block, error)

        // Stat returns the CIDs mapped BlockSize    <-- Interface change
        Stat(*cid.Cid) (uint, error)

	// Put puts a given block to the underlying datastore
	Put(blocks.Block) error

	// PutMany puts a slice of blocks at the same time using batching
	// capabilities of the underlying datastore whenever possible.
	PutMany([]blocks.Block) error

	// AllKeysChan returns a channel from which
	// the CIDs in the Blockstore can be read. It should respect
	// the given context, closing the channel if it becomes Done.
	AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)

	// HashOnRead specifies if every read block should be
	// rehashed to make sure it matches its CID.
	HashOnRead(enabled bool)
}

@whyrusleeping
Copy link
Member Author

@taylormike Yeah, that seems like a reasonable addition to the blockstore interface. I think we've wanted this before for one reason or another. cc @kevina @magik6k

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Seeking public contribution on this issue
Projects
None yet
Development

No branches or pull requests

4 participants