Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

fix: abort when the context is canceled while getting blocks #240

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 21 additions & 13 deletions decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package decision

import (
"context"
"fmt"
"sync"

blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -50,25 +51,29 @@ func (bsm *blockstoreManager) worker() {
}
}

func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) {
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-bsm.px.Closing():
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
return nil
}
}

func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) map[cid.Cid]int {
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (map[cid.Cid]int, error) {
res := make(map[cid.Cid]int)
if len(ks) == 0 {
return res
return res, nil
}

var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
}
} else {
Expand All @@ -77,21 +82,20 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) m
lk.Unlock()
}
})

return res
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[cid.Cid]blocks.Block {
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
res := make(map[cid.Cid]blocks.Block)
if len(ks) == 0 {
return res
return res, nil
}

var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.Get(%s) error: %s", c, err)
}
} else {
Expand All @@ -100,19 +104,23 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[c
lk.Unlock()
}
})

return res
}

func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) {
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error {
var err error
wg := sync.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
bsm.addJob(ctx, func() {
err = bsm.addJob(ctx, func() {
jobFn(c)
wg.Done()
})
if err != nil {
wg.Done()
break
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}
}
wg.Wait()
return err
}
79 changes: 43 additions & 36 deletions decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package decision
import (
"context"
"crypto/rand"
"errors"
"sync"
"testing"
"time"
Expand All @@ -30,7 +29,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
bsm.start(process.WithTeardown(func() error { return nil }))

cids := testutil.GenerateCids(4)
sizes := bsm.getBlockSizes(ctx, cids)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != 0 {
t.Fatal("Wrong response length")
}
Expand All @@ -41,7 +43,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
}
}

blks := bsm.getBlocks(ctx, cids)
blks, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(blks) != 0 {
t.Fatal("Wrong response length")
}
Expand Down Expand Up @@ -82,7 +87,10 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes := bsm.getBlockSizes(ctx, cids)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
t.Fatal("Wrong response length")
}
Expand All @@ -106,7 +114,10 @@ func TestBlockstoreManager(t *testing.T) {
}
}

fetched := bsm.getBlocks(ctx, cids)
fetched, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(fetched) != len(blks)-1 {
t.Fatal("Wrong response length")
}
Expand Down Expand Up @@ -160,17 +171,16 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
go func(t *testing.T) {
defer wg.Done()

sizes := bsm.getBlockSizes(ctx, ks)
sizes, err := bsm.getBlockSizes(ctx, ks)
if err != nil {
t.Error(err)
}
if len(sizes) != len(blks) {
err = errors.New("Wrong response length")
t.Error("Wrong response length")
}
}(t)
}
wg.Wait()

if err != nil {
t.Fatal(err)
}
}

func TestBlockstoreManagerClose(t *testing.T) {
Expand All @@ -184,7 +194,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
px := process.WithTeardown(func() error { return nil })
bsm.start(px)

blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
Expand All @@ -199,34 +209,29 @@ func TestBlockstoreManagerClose(t *testing.T) {

time.Sleep(5 * time.Millisecond)

fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()

select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-px.Closed():
before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}

func TestBlockstoreManagerCtxDone(t *testing.T) {
delayTime := 20 * time.Millisecond
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
bsdelay := delay.Fixed(delayTime)

dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 3)
bsm := newBlockstoreManager(context.Background(), bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)

blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
Expand All @@ -237,15 +242,17 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
t.Fatal(err)
}

fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()

before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}

select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-ctx.Done():
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}
12 changes: 10 additions & 2 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for _, t := range nextTask.Tasks {
blockCids.Add(t.Identifier.(cid.Cid))
}
blks := e.bsm.getBlocks(ctx, blockCids.Keys())
blks, err := e.bsm.getBlocks(ctx, blockCids.Keys())
if err != nil {
// we're dropping the envelope but that's not an issue in practice.
return nil, err
}

msg := bsmsg.New(true)
for _, b := range blks {
Expand Down Expand Up @@ -437,7 +441,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
wantKs.Add(entry.Cid)
}
}
blockSizes := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
if err != nil {
log.Info("aborting message processing", err)
return
}

l := e.findOrCreate(p)
l.lk.Lock()
Expand Down