diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range.go b/beacon-chain/sync/rpc_blob_sidecars_by_range.go index 16fcd964319d..fdae419beb47 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range.go @@ -19,10 +19,13 @@ import ( "go.opencensus.io/trace" ) -func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) (uint64, error) { +func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, wQuota uint64, stream libp2pcore.Stream) (uint64, error) { + // Defensive check to guard against underflow. + if wQuota == 0 { + return 0, nil + } ctx, span := trace.StartSpan(ctx, "sync.streamBlobBatch") defer span.End() - var writes uint64 for _, b := range batch.canonical() { root := b.Root() scs, err := s.cfg.beaconDB.BlobSidecarsByRoot(ctx, b.Root()) @@ -31,7 +34,7 @@ func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, stream } if err != nil { s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - return writes, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root) + return wQuota, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root) } for _, sc := range scs { SetStreamWriteDeadline(stream, defaultWriteDuration) @@ -39,17 +42,22 @@ func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, stream log.WithError(chunkErr).Debug("Could not send a chunked response") s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) tracing.AnnotateError(span, chunkErr) - return writes, chunkErr + return wQuota, chunkErr } s.rateLimiter.add(stream, 1) - writes += 1 + wQuota -= 1 + // Stop streaming results once the quota of writes for the request is consumed. + if wQuota == 0 { + return 0, nil + } } } - return writes, nil + return wQuota, nil } // blobsSidecarsByRangeRPCHandler looks up the request blobs from the database from a given start slot index func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + var err error ctx, span := trace.StartSpan(ctx, "sync.BlobsSidecarsByRangeHandler") defer span.End() ctx, cancel := context.WithTimeout(ctx, respTimeout) @@ -78,17 +86,16 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker) var batch blockBatch - var totalWrites uint64 + wQuota := params.BeaconNetworkConfig().MaxRequestBlobsSidecars for batch, ok = batcher.next(ctx, stream); ok; batch, ok = batcher.next(ctx, stream) { batchStart := time.Now() - writes, err := s.streamBlobBatch(ctx, batch, stream) + wQuota, err = s.streamBlobBatch(ctx, batch, wQuota, stream) rpcBlobsByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds())) if err != nil { return err } - totalWrites += writes // once we have written MAX_REQUEST_BLOB_SIDECARS, we're done serving the request - if totalWrites >= params.BeaconNetworkConfig().MaxRequestBlobsSidecars { + if wQuota == 0 { break } }