Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bitswap.decision.Engine) send only the freshest messages #601

Merged
merged 2 commits into from Jan 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions exchange/bitswap/bitswap.go
Expand Up @@ -277,10 +277,16 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
case <-ctx.Done():
log.Debugf("exiting")
return
case envelope := <-bs.engine.Outbox():
log.Debugf("message to %s sending...", envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
log.Debugf("message to %s sent", envelope.Peer)
case nextEnvelope := <-bs.engine.Outbox():
select {
case <-ctx.Done():
return
case envelope, ok := <-nextEnvelope:
if !ok {
continue
}
bs.send(ctx, envelope.Peer, envelope.Message)
}
}
}
}
Expand Down
66 changes: 39 additions & 27 deletions exchange/bitswap/decision/engine.go
Expand Up @@ -44,7 +44,8 @@ import (
var log = eventlog.Logger("engine")

const (
sizeOutboxChan = 4
// outboxChanBuffer must be 0 to prevent stale messages from being sent
outboxChanBuffer = 0
)

// Envelope contains a message for a Peer
Expand All @@ -68,8 +69,9 @@ type Engine struct {
// that case, no lock would be required.
workSignal chan struct{}

// outbox contains outgoing messages to peers
outbox chan Envelope
// outbox contains outgoing messages to peers. This is owned by the
// taskWorker goroutine
outbox chan (<-chan *Envelope)

bs bstore.Blockstore

Expand All @@ -83,53 +85,63 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
ledgerMap: make(map[peer.ID]*ledger),
bs: bs,
peerRequestQueue: newPRQ(),
outbox: make(chan Envelope, sizeOutboxChan),
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}),
}
go e.taskWorker(ctx)
return e
}

func (e *Engine) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap.Engine.taskWorker")
defer close(e.outbox) // because taskWorker uses the channel exclusively
for {
oneTimeUse := make(chan *Envelope, 1) // buffer to prevent blocking
select {
case <-ctx.Done():
return
case e.outbox <- oneTimeUse:
}
// receiver is ready for an outoing envelope. let's prepare one. first,
// we must acquire a task from the PQ...
envelope, err := e.nextEnvelope(ctx)
if err != nil {
close(oneTimeUse)
return // ctx cancelled
}
oneTimeUse <- envelope // buffered. won't block
close(oneTimeUse)
}
}

// nextEnvelope runs in the taskWorker goroutine. Returns an error if the
// context is cancelled before the next Envelope can be created.
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
// No tasks in the list?
// Wait until there are!
for nextTask == nil {
select {
case <-ctx.Done():
log.Debugf("exiting: %s", ctx.Err())
return
return nil, ctx.Err()
case <-e.workSignal:
log.Debugf("woken up")
nextTask = e.peerRequestQueue.Pop()
}
continue
}
log := log.Prefix("%s", nextTask)
log.Debugf("processing")

// with a task in hand, we're ready to prepare the envelope...

block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
log.Warning("engine: task exists to send block, but block is not in blockstore")
continue
}
// construct message here so we can make decisions about any additional
// information we may want to include at this time.
m := bsmsg.New()

m := bsmsg.New() // TODO: maybe add keys from our wantlist?
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
log.Debugf("sending...")
select {
case <-ctx.Done():
return
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
log.Debugf("sent")
}
return &Envelope{Peer: nextTask.Target, Message: m}, nil
}
}

func (e *Engine) Outbox() <-chan Envelope {
// Outbox returns a channel of one-time use Envelope channels.
func (e *Engine) Outbox() <-chan (<-chan *Envelope) {
return e.outbox
}

Expand Down
53 changes: 35 additions & 18 deletions exchange/bitswap/decision/engine_test.go
@@ -1,6 +1,8 @@
package decision

import (
"errors"
"fmt"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -104,7 +106,8 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
for _ = range e.Outbox() {
for nextEnvelope := range e.Outbox() {
<-nextEnvelope
}
wg.Done()
}()
Expand All @@ -116,6 +119,10 @@ func TestOutboxClosedWhenEngineClosed(t *testing.T) {
}

func TestPartnerWantsThenCancels(t *testing.T) {
numRounds := 10
if testing.Short() {
numRounds = 1
}
alphabet := strings.Split("abcdefghijklmnopqrstuvwxyz", "")
vowels := strings.Split("aeiou", "")

Expand All @@ -129,23 +136,31 @@ func TestPartnerWantsThenCancels(t *testing.T) {
},
}

for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)

bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)
for _, letter := range set {
block := blocks.NewBlock([]byte(letter))
bs.Put(block)
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
for _, letter := range alphabet {
block := blocks.NewBlock([]byte(letter))
if err := bs.Put(block); err != nil {
t.Fatal(err)
}
partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
assertPoppedInOrder(t, e, keeps)
}

for i := 0; i < numRounds; i++ {
for _, testcase := range testcases {
set := testcase[0]
cancels := testcase[1]
keeps := stringsComplement(set, cancels)

e := NewEngine(context.Background(), bs)
partner := testutil.RandPeerIDFatal(t)

partnerWants(e, set, partner)
partnerCancels(e, cancels, partner)
if err := checkHandledInOrder(t, e, keeps); err != nil {
t.Logf("run #%d of %d", i, numRounds)
t.Fatal(err)
}
}
}
}

func partnerWants(e *Engine, keys []string, partner peer.ID) {
Expand All @@ -166,15 +181,17 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
e.MessageReceived(partner, cancels)
}

func assertPoppedInOrder(t *testing.T, e *Engine, keys []string) {
func checkHandledInOrder(t *testing.T, e *Engine, keys []string) error {
for _, k := range keys {
envelope := <-e.Outbox()
next := <-e.Outbox()
envelope := <-next
received := envelope.Message.Blocks()[0]
expected := blocks.NewBlock([]byte(k))
if received.Key() != expected.Key() {
t.Fatal("received", string(received.Data), "expected", string(expected.Data))
return errors.New(fmt.Sprintln("received", string(received.Data), "expected", string(expected.Data)))
}
}
return nil
}

func stringsComplement(set, subset []string) []string {
Expand Down