diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index fe6b8d7c497..dfa72ff2ff2 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -277,10 +277,16 @@ func (bs *bitswap) taskWorker(ctx context.Context) { case <-ctx.Done(): log.Debugf("exiting") return - case envelope := <-bs.engine.Outbox(): - log.Debugf("message to %s sending...", envelope.Peer) - bs.send(ctx, envelope.Peer, envelope.Message) - log.Debugf("message to %s sent", envelope.Peer) + case nextEnvelope := <-bs.engine.Outbox(): + select { + case <-ctx.Done(): + return + case envelope, ok := <-nextEnvelope: + if !ok { + continue + } + bs.send(ctx, envelope.Peer, envelope.Message) + } } } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index ea0491c2c87..05687b3128f 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -44,7 +44,8 @@ import ( var log = eventlog.Logger("engine") const ( - sizeOutboxChan = 4 + // outboxChanBuffer must be 0 to prevent stale messages from being sent + outboxChanBuffer = 0 ) // Envelope contains a message for a Peer @@ -68,8 +69,9 @@ type Engine struct { // that case, no lock would be required. workSignal chan struct{} - // outbox contains outgoing messages to peers - outbox chan Envelope + // outbox contains outgoing messages to peers. This is owned by the + // taskWorker goroutine + outbox chan (<-chan *Envelope) bs bstore.Blockstore @@ -83,7 +85,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { ledgerMap: make(map[peer.ID]*ledger), bs: bs, peerRequestQueue: newPRQ(), - outbox: make(chan Envelope, sizeOutboxChan), + outbox: make(chan (<-chan *Envelope), outboxChanBuffer), workSignal: make(chan struct{}), } go e.taskWorker(ctx) @@ -91,45 +93,55 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { } func (e *Engine) taskWorker(ctx context.Context) { - log := log.Prefix("bitswap.Engine.taskWorker") + defer close(e.outbox) // because taskWorker uses the channel exclusively + for { + oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking + select { + case <-ctx.Done(): + return + case e.outbox <- oneTimeUse: + } + // receiver is ready for an outoing envelope. let's prepare one. first, + // we must acquire a task from the PQ... + envelope, err := e.nextEnvelope(ctx) + if err != nil { + close(oneTimeUse) + return // ctx cancelled + } + oneTimeUse <- envelope // buffered. won't block + close(oneTimeUse) + } +} + +// nextEnvelope runs in the taskWorker goroutine. Returns an error if the +// context is cancelled before the next Envelope can be created. +func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) { for { nextTask := e.peerRequestQueue.Pop() - if nextTask == nil { - // No tasks in the list? - // Wait until there are! + for nextTask == nil { select { case <-ctx.Done(): - log.Debugf("exiting: %s", ctx.Err()) - return + return nil, ctx.Err() case <-e.workSignal: - log.Debugf("woken up") + nextTask = e.peerRequestQueue.Pop() } - continue } - log := log.Prefix("%s", nextTask) - log.Debugf("processing") + + // with a task in hand, we're ready to prepare the envelope... block, err := e.bs.Get(nextTask.Entry.Key) if err != nil { - log.Warning("engine: task exists to send block, but block is not in blockstore") continue } - // construct message here so we can make decisions about any additional - // information we may want to include at this time. - m := bsmsg.New() + + m := bsmsg.New() // TODO: maybe add keys from our wantlist? m.AddBlock(block) - // TODO: maybe add keys from our wantlist? - log.Debugf("sending...") - select { - case <-ctx.Done(): - return - case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}: - log.Debugf("sent") - } + return &Envelope{Peer: nextTask.Target, Message: m}, nil } } -func (e *Engine) Outbox() <-chan Envelope { +// Outbox returns a channel of one-time use Envelope channels. +func (e *Engine) Outbox() <-chan (<-chan *Envelope) { return e.outbox } diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index b2583a020ee..8e5ab672c0b 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -1,6 +1,8 @@ package decision import ( + "errors" + "fmt" "math" "strings" "sync" @@ -104,7 +106,8 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) { var wg sync.WaitGroup wg.Add(1) go func() { - for _ = range e.Outbox() { + for nextEnvelope := range e.Outbox() { + <-nextEnvelope } wg.Done() }() @@ -116,6 +119,10 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) { } func TestPartnerWantsThenCancels(t *testing.T) { + numRounds := 10 + if testing.Short() { + numRounds = 1 + } alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "") vowels := strings.Split("aeiou", "") @@ -129,23 +136,31 @@ func TestPartnerWantsThenCancels(t *testing.T) { }, } - for _, testcase := range testcases { - set := testcase[0] - cancels := testcase[1] - keeps := stringsComplement(set, cancels) - - bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - e := NewEngine(context.Background(), bs) - partner := testutil.RandPeerIDFatal(t) - for _, letter := range set { - block := blocks.NewBlock([]byte(letter)) - bs.Put(block) + bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) + for _, letter := range alphabet { + block := blocks.NewBlock([]byte(letter)) + if err := bs.Put(block); err != nil { + t.Fatal(err) } - partnerWants(e, set, partner) - partnerCancels(e, cancels, partner) - assertPoppedInOrder(t, e, keeps) } + for i := 0; i < numRounds; i++ { + for _, testcase := range testcases { + set := testcase[0] + cancels := testcase[1] + keeps := stringsComplement(set, cancels) + + e := NewEngine(context.Background(), bs) + partner := testutil.RandPeerIDFatal(t) + + partnerWants(e, set, partner) + partnerCancels(e, cancels, partner) + if err := checkHandledInOrder(t, e, keeps); err != nil { + t.Logf("run #%d of %d", i, numRounds) + t.Fatal(err) + } + } + } } func partnerWants(e *Engine, keys []string, partner peer.ID) { @@ -166,15 +181,17 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) { e.MessageReceived(partner, cancels) } -func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) { +func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error { for _, k := range keys { - envelope := <-e.Outbox() + next := <-e.Outbox() + envelope := <-next received := envelope.Message.Blocks()[0] expected := blocks.NewBlock([]byte(k)) if received.Key() != expected.Key() { - t.Fatal("received", string(received.Data), "expected", string(expected.Data)) + return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data))) } } + return nil } func stringsComplement(set, subset []string) []string {