diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f27f0cc363e..dfa72ff2ff2 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -281,7 +281,10 @@ func (bs *bitswap) taskWorker(ctx context.Context) { select { case <-ctx.Done(): return - case envelope := <-nextEnvelope: + case envelope, ok := <-nextEnvelope: + if !ok { + continue + } bs.send(ctx, envelope.Peer, envelope.Message) } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index b84732e8228..05687b3128f 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -71,7 +71,7 @@ type Engine struct { // outbox contains outgoing messages to peers. This is owned by the // taskWorker goroutine - outbox chan (<-chan Envelope) + outbox chan (<-chan *Envelope) bs bstore.Blockstore @@ -85,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ledgerMap: make(map[peer.ID]*ledger), bs: bs, peerRequestQueue: newPRQ(), - outbox: make(chan (<-chan Envelope), outboxChanBuffer), + outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}), } go e.taskWorker(ctx) @@ -95,7 +95,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { func (e *Engine) taskWorker(ctx context.Context) { defer close(e.outbox) // because taskWorker uses the channel exclusively for { - oneTimeUse := make(chan Envelope, 1) // buffer to prevent blocking + oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking select { case <-ctx.Done(): return @@ -108,7 +108,7 @@ func (e *Engine) taskWorker(ctx context.Context) { close(oneTimeUse) return // ctx cancelled } - oneTimeUse <- *envelope // buffered. won't block + oneTimeUse <- envelope // buffered. won't block close(oneTimeUse) } } @@ -141,7 +141,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { } // Outbox returns a channel of one-time use Envelope channels. -func (e *Engine) Outbox() <-chan (<-chan Envelope) { +func (e *Engine) Outbox() <-chan (<-chan *Envelope) { return e.outbox }