Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes races in blocks fetcher #5068

Merged
merged 1 commit into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 54 additions & 37 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const fetchRequestsBuffer = 8 // number of pending fetch requests

var (
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
errCtxIsDone = errors.New("fetcher's context is done, reinitialize")
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
errStartSlotIsTooHigh = errors.New("start slot is bigger than highest finalized slot")
)

Expand All @@ -42,14 +42,14 @@ type blocksFetcherConfig struct {
// On an incoming requests, requested block range is evenly divided
// among available peers (for fair network load distribution).
type blocksFetcher struct {
ctx context.Context
cancel context.CancelFunc
headFetcher blockchain.HeadFetcher
p2p p2p.P2P
rateLimiter *leakybucket.Collector
requests chan *fetchRequestParams // incoming fetch requests from downstream clients
receivedFetchResponses chan *fetchRequestResponse // responses from peers are forwarded to downstream clients
quit chan struct{} // termination notifier
ctx context.Context
cancel context.CancelFunc
headFetcher blockchain.HeadFetcher
p2p p2p.P2P
rateLimiter *leakybucket.Collector
fetchRequests chan *fetchRequestParams
fetchResponses chan *fetchRequestResponse
quit chan struct{} // termination notifier
}

// fetchRequestParams holds parameters necessary to schedule a fetch request.
Expand All @@ -74,25 +74,25 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
rateLimiter := leakybucket.NewCollector(
allowedBlocksPerSecond, /* rate */
allowedBlocksPerSecond, /* capacity */
false /* deleteEmptyBuckets */)
false /* deleteEmptyBuckets */)

return &blocksFetcher{
ctx: ctx,
cancel: cancel,
headFetcher: cfg.headFetcher,
p2p: cfg.p2p,
rateLimiter: rateLimiter,
requests: make(chan *fetchRequestParams, fetchRequestsBuffer),
receivedFetchResponses: make(chan *fetchRequestResponse),
quit: make(chan struct{}),
ctx: ctx,
cancel: cancel,
headFetcher: cfg.headFetcher,
p2p: cfg.p2p,
rateLimiter: rateLimiter,
fetchRequests: make(chan *fetchRequestParams, fetchRequestsBuffer),
fetchResponses: make(chan *fetchRequestResponse),
quit: make(chan struct{}),
}
}

// start boots up the fetcher, which starts listening for incoming fetch requests.
func (f *blocksFetcher) start() error {
select {
case <-f.ctx.Done():
return errCtxIsDone
return errFetcherCtxIsDone
default:
go f.loop()
return nil
Expand All @@ -107,21 +107,32 @@ func (f *blocksFetcher) stop() {

// requestResponses exposes a channel into which fetcher pushes generated request responses.
func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse {
return f.receivedFetchResponses
return f.fetchResponses
}

// loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses.
func (f *blocksFetcher) loop() {
defer close(f.receivedFetchResponses)
defer close(f.quit)

// Wait for all loop's goroutines to finish, and safely release resources.
wg := &sync.WaitGroup{}
defer func() {
wg.Wait()
close(f.fetchResponses)
}()

for {
select {
case req := <-f.requests:
go f.handleRequest(req.ctx, req.start, req.count)
case <-f.ctx.Done():
log.Debug("Context closed, exiting goroutine")
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
return
case req := <-f.fetchRequests:
wg.Add(1)
go func() {
defer wg.Done()

f.handleRequest(req.ctx, req.start, req.count)
}()
}
}
}
Expand All @@ -130,9 +141,9 @@ func (f *blocksFetcher) loop() {
func (f *blocksFetcher) scheduleRequest(ctx context.Context, start, count uint64) error {
select {
case <-f.ctx.Done():
return errCtxIsDone
return errFetcherCtxIsDone
default:
f.requests <- &fetchRequestParams{
f.fetchRequests <- &fetchRequestParams{
ctx: ctx,
start: start,
count: count,
Expand All @@ -146,12 +157,18 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest")
defer span.End()

if ctx.Err() != nil {
f.receivedFetchResponses <- &fetchRequestResponse{
start: start,
count: count,
err: ctx.Err(),
// sendResponse ensures that response is not sent to a closed channel (when context is done).
sendResponse := func(ctx context.Context, response *fetchRequestResponse) {
if ctx.Err() != nil {
log.WithError(ctx.Err()).Debug("Can not send fetch request response")
return
}

f.fetchResponses <- response
}

if ctx.Err() != nil {
sendResponse(ctx, nil)
return
}

Expand All @@ -173,31 +190,31 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1)
if start > highestFinalizedSlot {
log.WithError(errStartSlotIsTooHigh).Debug("Block fetch request failed")
f.receivedFetchResponses <- &fetchRequestResponse{
sendResponse(ctx, &fetchRequestResponse{
start: start,
count: count,
err: errStartSlotIsTooHigh,
}
})
return
}

resp, err := f.collectPeerResponses(ctx, root, finalizedEpoch, start, 1, count, peers)
if err != nil {
log.WithError(err).Debug("Block fetch request failed")
f.receivedFetchResponses <- &fetchRequestResponse{
sendResponse(ctx, &fetchRequestResponse{
start: start,
count: count,
err: err,
}
})
return
}

f.receivedFetchResponses <- &fetchRequestResponse{
sendResponse(ctx, &fetchRequestResponse{
start: start,
count: count,
blocks: resp,
peers: peers,
}
})
}

// collectPeerResponses orchestrates block fetching from the available peers.
Expand Down
95 changes: 62 additions & 33 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func TestBlocksFetcherInitStartStop(t *testing.T) {
select {
case <-fetcher.requestResponses():
default:
t.Error("receivedFetchResponses channel is leaked")
t.Error("fetchResponses channel is leaked")
}
})

t.Run("re-starting of stopped fetcher", func(t *testing.T) {
if err := fetcher.start(); err == nil {
t.Errorf("expected error not returned: %v", errCtxIsDone)
t.Errorf("expected error not returned: %v", errFetcherCtxIsDone)
}
})

Expand Down Expand Up @@ -405,6 +405,20 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
}
}

func TestBlocksFetcherScheduleRequest(t *testing.T) {
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: nil,
p2p: nil,
})
cancel()
if err := fetcher.scheduleRequest(ctx, 1, blockBatchSize); err == nil {
t.Errorf("expected error: %v", errFetcherCtxIsDone)
}
})
}

func TestBlocksFetcherHandleRequest(t *testing.T) {
chainConfig := struct {
expectedBlockSlots []uint64
Expand All @@ -427,43 +441,58 @@ func TestBlocksFetcherHandleRequest(t *testing.T) {

hook := logTest.NewGlobal()
mc, p2p, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
defer dbtest.TeardownDB(t, beaconDB)

t.Run("context cancellation", func(t *testing.T) {
hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
p2p: p2p,
})

requestCtx, _ := context.WithTimeout(context.Background(), 2*time.Second)
go fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */)

var blocks []*eth.SignedBeaconBlock
select {
case <-ctx.Done():
t.Error(ctx.Err())
case resp := <-fetcher.requestResponses():
if resp.err != nil {
t.Error(resp.err)
} else {
blocks = resp.blocks
}
}
if len(blocks) != blockBatchSize {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks))
}
testutil.AssertLogsContain(t, hook, "Received blocks")
cancel()
fetcher.handleRequest(ctx, 1, blockBatchSize)
testutil.AssertLogsContain(t, hook, "Can not send fetch request response")
testutil.AssertLogsContain(t, hook, "context canceled")
})

var receivedBlockSlots []uint64
for _, blk := range blocks {
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
}
if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots); len(missing) > 0 {
t.Errorf("Missing blocks at slots %v", missing)
}
t.Run("receive blocks", func(t *testing.T) {
hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
p2p: p2p,
})

dbtest.TeardownDB(t, beaconDB)
requestCtx, _ := context.WithTimeout(context.Background(), 2*time.Second)
go fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */)

var blocks []*eth.SignedBeaconBlock
select {
case <-ctx.Done():
t.Error(ctx.Err())
case resp := <-fetcher.requestResponses():
if resp.err != nil {
t.Error(resp.err)
} else {
blocks = resp.blocks
}
}
if len(blocks) != blockBatchSize {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks))
}
testutil.AssertLogsContain(t, hook, "Received blocks")

var receivedBlockSlots []uint64
for _, blk := range blocks {
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
}
if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots); len(missing) > 0 {
t.Errorf("Missing blocks at slots %v", missing)
}
})
}

func TestBlocksFetcherRequestBeaconBlocksByRangeRequest(t *testing.T) {
Expand Down