diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 52d7b07e942..f45a5d13fe0 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -28,7 +28,7 @@ const fetchRequestsBuffer = 8 // number of pending fetch requests var ( errNoPeersAvailable = errors.New("no peers available, waiting for reconnect") - errCtxIsDone = errors.New("fetcher's context is done, reinitialize") + errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize") errStartSlotIsTooHigh = errors.New("start slot is bigger than highest finalized slot") ) @@ -42,14 +42,14 @@ type blocksFetcherConfig struct { // On an incoming requests, requested block range is evenly divided // among available peers (for fair network load distribution). type blocksFetcher struct { - ctx context.Context - cancel context.CancelFunc - headFetcher blockchain.HeadFetcher - p2p p2p.P2P - rateLimiter *leakybucket.Collector - requests chan *fetchRequestParams // incoming fetch requests from downstream clients - receivedFetchResponses chan *fetchRequestResponse // responses from peers are forwarded to downstream clients - quit chan struct{} // termination notifier + ctx context.Context + cancel context.CancelFunc + headFetcher blockchain.HeadFetcher + p2p p2p.P2P + rateLimiter *leakybucket.Collector + fetchRequests chan *fetchRequestParams + fetchResponses chan *fetchRequestResponse + quit chan struct{} // termination notifier } // fetchRequestParams holds parameters necessary to schedule a fetch request. @@ -74,17 +74,17 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc rateLimiter := leakybucket.NewCollector( allowedBlocksPerSecond, /* rate */ allowedBlocksPerSecond, /* capacity */ - false /* deleteEmptyBuckets */) + false /* deleteEmptyBuckets */) return &blocksFetcher{ - ctx: ctx, - cancel: cancel, - headFetcher: cfg.headFetcher, - p2p: cfg.p2p, - rateLimiter: rateLimiter, - requests: make(chan *fetchRequestParams, fetchRequestsBuffer), - receivedFetchResponses: make(chan *fetchRequestResponse), - quit: make(chan struct{}), + ctx: ctx, + cancel: cancel, + headFetcher: cfg.headFetcher, + p2p: cfg.p2p, + rateLimiter: rateLimiter, + fetchRequests: make(chan *fetchRequestParams, fetchRequestsBuffer), + fetchResponses: make(chan *fetchRequestResponse), + quit: make(chan struct{}), } } @@ -92,7 +92,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc func (f *blocksFetcher) start() error { select { case <-f.ctx.Done(): - return errCtxIsDone + return errFetcherCtxIsDone default: go f.loop() return nil @@ -107,21 +107,32 @@ func (f *blocksFetcher) stop() { // requestResponses exposes a channel into which fetcher pushes generated request responses. func (f *blocksFetcher) requestResponses() <-chan *fetchRequestResponse { - return f.receivedFetchResponses + return f.fetchResponses } // loop is a main fetcher loop, listens for incoming requests/cancellations, forwards outgoing responses. func (f *blocksFetcher) loop() { - defer close(f.receivedFetchResponses) defer close(f.quit) + // Wait for all loop's goroutines to finish, and safely release resources. + wg := &sync.WaitGroup{} + defer func() { + wg.Wait() + close(f.fetchResponses) + }() + for { select { - case req := <-f.requests: - go f.handleRequest(req.ctx, req.start, req.count) case <-f.ctx.Done(): - log.Debug("Context closed, exiting goroutine") + log.Debug("Context closed, exiting goroutine (blocks fetcher)") return + case req := <-f.fetchRequests: + wg.Add(1) + go func() { + defer wg.Done() + + f.handleRequest(req.ctx, req.start, req.count) + }() } } } @@ -130,9 +141,9 @@ func (f *blocksFetcher) loop() { func (f *blocksFetcher) scheduleRequest(ctx context.Context, start, count uint64) error { select { case <-f.ctx.Done(): - return errCtxIsDone + return errFetcherCtxIsDone default: - f.requests <- &fetchRequestParams{ + f.fetchRequests <- &fetchRequestParams{ ctx: ctx, start: start, count: count, @@ -146,12 +157,18 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) ctx, span := trace.StartSpan(ctx, "initialsync.handleRequest") defer span.End() - if ctx.Err() != nil { - f.receivedFetchResponses <- &fetchRequestResponse{ - start: start, - count: count, - err: ctx.Err(), + // sendResponse ensures that response is not sent to a closed channel (when context is done). + sendResponse := func(ctx context.Context, response *fetchRequestResponse) { + if ctx.Err() != nil { + log.WithError(ctx.Err()).Debug("Can not send fetch request response") + return } + + f.fetchResponses <- response + } + + if ctx.Err() != nil { + sendResponse(ctx, nil) return } @@ -173,31 +190,31 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64) highestFinalizedSlot := helpers.StartSlot(finalizedEpoch + 1) if start > highestFinalizedSlot { log.WithError(errStartSlotIsTooHigh).Debug("Block fetch request failed") - f.receivedFetchResponses <- &fetchRequestResponse{ + sendResponse(ctx, &fetchRequestResponse{ start: start, count: count, err: errStartSlotIsTooHigh, - } + }) return } resp, err := f.collectPeerResponses(ctx, root, finalizedEpoch, start, 1, count, peers) if err != nil { log.WithError(err).Debug("Block fetch request failed") - f.receivedFetchResponses <- &fetchRequestResponse{ + sendResponse(ctx, &fetchRequestResponse{ start: start, count: count, err: err, - } + }) return } - f.receivedFetchResponses <- &fetchRequestResponse{ + sendResponse(ctx, &fetchRequestResponse{ start: start, count: count, blocks: resp, peers: peers, - } + }) } // collectPeerResponses orchestrates block fetching from the available peers. diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index f0e5578d431..03310850275 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -44,13 +44,13 @@ func TestBlocksFetcherInitStartStop(t *testing.T) { select { case <-fetcher.requestResponses(): default: - t.Error("receivedFetchResponses channel is leaked") + t.Error("fetchResponses channel is leaked") } }) t.Run("re-starting of stopped fetcher", func(t *testing.T) { if err := fetcher.start(); err == nil { - t.Errorf("expected error not returned: %v", errCtxIsDone) + t.Errorf("expected error not returned: %v", errFetcherCtxIsDone) } }) @@ -405,6 +405,20 @@ func TestBlocksFetcherRoundRobin(t *testing.T) { } } +func TestBlocksFetcherScheduleRequest(t *testing.T) { + t.Run("context cancellation", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ + headFetcher: nil, + p2p: nil, + }) + cancel() + if err := fetcher.scheduleRequest(ctx, 1, blockBatchSize); err == nil { + t.Errorf("expected error: %v", errFetcherCtxIsDone) + } + }) +} + func TestBlocksFetcherHandleRequest(t *testing.T) { chainConfig := struct { expectedBlockSlots []uint64 @@ -427,43 +441,58 @@ func TestBlocksFetcherHandleRequest(t *testing.T) { hook := logTest.NewGlobal() mc, p2p, beaconDB := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - fetcher := newBlocksFetcher( - ctx, - &blocksFetcherConfig{ + defer dbtest.TeardownDB(t, beaconDB) + + t.Run("context cancellation", func(t *testing.T) { + hook.Reset() + ctx, cancel := context.WithCancel(context.Background()) + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ headFetcher: mc, p2p: p2p, }) - requestCtx, _ := context.WithTimeout(context.Background(), 2*time.Second) - go fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */) - - var blocks []*eth.SignedBeaconBlock - select { - case <-ctx.Done(): - t.Error(ctx.Err()) - case resp := <-fetcher.requestResponses(): - if resp.err != nil { - t.Error(resp.err) - } else { - blocks = resp.blocks - } - } - if len(blocks) != blockBatchSize { - t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks)) - } - testutil.AssertLogsContain(t, hook, "Received blocks") + cancel() + fetcher.handleRequest(ctx, 1, blockBatchSize) + testutil.AssertLogsContain(t, hook, "Can not send fetch request response") + testutil.AssertLogsContain(t, hook, "context canceled") + }) - var receivedBlockSlots []uint64 - for _, blk := range blocks { - receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot) - } - if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots); len(missing) > 0 { - t.Errorf("Missing blocks at slots %v", missing) - } + t.Run("receive blocks", func(t *testing.T) { + hook.Reset() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ + headFetcher: mc, + p2p: p2p, + }) - dbtest.TeardownDB(t, beaconDB) + requestCtx, _ := context.WithTimeout(context.Background(), 2*time.Second) + go fetcher.handleRequest(requestCtx, 1 /* start */, blockBatchSize /* count */) + + var blocks []*eth.SignedBeaconBlock + select { + case <-ctx.Done(): + t.Error(ctx.Err()) + case resp := <-fetcher.requestResponses(): + if resp.err != nil { + t.Error(resp.err) + } else { + blocks = resp.blocks + } + } + if len(blocks) != blockBatchSize { + t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchSize, len(blocks)) + } + testutil.AssertLogsContain(t, hook, "Received blocks") + + var receivedBlockSlots []uint64 + for _, blk := range blocks { + receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot) + } + if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots); len(missing) > 0 { + t.Errorf("Missing blocks at slots %v", missing) + } + }) } func TestBlocksFetcherRequestBeaconBlocksByRangeRequest(t *testing.T) {