Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #240 from ipfs/fix/abort-ctx
Browse files Browse the repository at this point in the history
fix: abort when the context is canceled while getting blocks
  • Loading branch information
dirkmc committed Jan 23, 2020
2 parents 84f8ab6 + 0bc3d5a commit dcfe40e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 51 deletions.
34 changes: 21 additions & 13 deletions decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package decision

import (
"context"
"fmt"
"sync"

blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -50,25 +51,29 @@ func (bsm *blockstoreManager) worker() {
}
}

func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) {
func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-bsm.px.Closing():
return fmt.Errorf("shutting down")
case bsm.jobs <- job:
return nil
}
}

func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) map[cid.Cid]int {
func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (map[cid.Cid]int, error) {
res := make(map[cid.Cid]int)
if len(ks) == 0 {
return res
return res, nil
}

var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.GetSize(%s) error: %s", c, err)
}
} else {
Expand All @@ -77,21 +82,20 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) m
lk.Unlock()
}
})

return res
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[cid.Cid]blocks.Block {
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
res := make(map[cid.Cid]blocks.Block)
if len(ks) == 0 {
return res
return res, nil
}

var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
if err != bstore.ErrNotFound {
// Note: this isn't a fatal error. We shouldn't abort the request
log.Errorf("blockstore.Get(%s) error: %s", c, err)
}
} else {
Expand All @@ -100,19 +104,23 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[c
lk.Unlock()
}
})

return res
}

func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) {
func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error {
var err error
wg := sync.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
bsm.addJob(ctx, func() {
err = bsm.addJob(ctx, func() {
jobFn(c)
wg.Done()
})
if err != nil {
wg.Done()
break
}
}
wg.Wait()
return err
}
79 changes: 43 additions & 36 deletions decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package decision
import (
"context"
"crypto/rand"
"errors"
"sync"
"testing"
"time"
Expand All @@ -30,7 +29,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
bsm.start(process.WithTeardown(func() error { return nil }))

cids := testutil.GenerateCids(4)
sizes := bsm.getBlockSizes(ctx, cids)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != 0 {
t.Fatal("Wrong response length")
}
Expand All @@ -41,7 +43,10 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
}
}

blks := bsm.getBlocks(ctx, cids)
blks, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(blks) != 0 {
t.Fatal("Wrong response length")
}
Expand Down Expand Up @@ -82,7 +87,10 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes := bsm.getBlockSizes(ctx, cids)
sizes, err := bsm.getBlockSizes(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(sizes) != len(blks)-1 {
t.Fatal("Wrong response length")
}
Expand All @@ -106,7 +114,10 @@ func TestBlockstoreManager(t *testing.T) {
}
}

fetched := bsm.getBlocks(ctx, cids)
fetched, err := bsm.getBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
}
if len(fetched) != len(blks)-1 {
t.Fatal("Wrong response length")
}
Expand Down Expand Up @@ -160,17 +171,16 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
go func(t *testing.T) {
defer wg.Done()

sizes := bsm.getBlockSizes(ctx, ks)
sizes, err := bsm.getBlockSizes(ctx, ks)
if err != nil {
t.Error(err)
}
if len(sizes) != len(blks) {
err = errors.New("Wrong response length")
t.Error("Wrong response length")
}
}(t)
}
wg.Wait()

if err != nil {
t.Fatal(err)
}
}

func TestBlockstoreManagerClose(t *testing.T) {
Expand All @@ -184,7 +194,7 @@ func TestBlockstoreManagerClose(t *testing.T) {
px := process.WithTeardown(func() error { return nil })
bsm.start(px)

blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
Expand All @@ -199,34 +209,29 @@ func TestBlockstoreManagerClose(t *testing.T) {

time.Sleep(5 * time.Millisecond)

fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()

select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-px.Closed():
before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}

func TestBlockstoreManagerCtxDone(t *testing.T) {
delayTime := 20 * time.Millisecond
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
bsdelay := delay.Fixed(delayTime)

dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 3)
bsm := newBlockstoreManager(context.Background(), bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)

blks := testutil.GenerateBlocksOfSize(3, 1024)
blks := testutil.GenerateBlocksOfSize(10, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
Expand All @@ -237,15 +242,17 @@ func TestBlockstoreManagerCtxDone(t *testing.T) {
t.Fatal(err)
}

fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()

before := time.Now()
_, err = bsm.getBlockSizes(ctx, ks)
if err == nil {
t.Error("expected an error")
}

select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-ctx.Done():
// would expect to wait delayTime*10 if we didn't cancel.
if time.Since(before) > delayTime*2 {
t.Error("expected a fast timeout")
}
}
12 changes: 10 additions & 2 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for _, t := range nextTask.Tasks {
blockCids.Add(t.Identifier.(cid.Cid))
}
blks := e.bsm.getBlocks(ctx, blockCids.Keys())
blks, err := e.bsm.getBlocks(ctx, blockCids.Keys())
if err != nil {
// we're dropping the envelope but that's not an issue in practice.
return nil, err
}

msg := bsmsg.New(true)
for _, b := range blks {
Expand Down Expand Up @@ -437,7 +441,11 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
wantKs.Add(entry.Cid)
}
}
blockSizes := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
if err != nil {
log.Info("aborting message processing", err)
return
}

l := e.findOrCreate(p)
l.lk.Lock()
Expand Down

0 comments on commit dcfe40e

Please sign in to comment.