Skip to content

Commit

Permalink
fixup! BlobSidecarsByRange rpc handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Jun 12, 2023
1 parent 9168637 commit a423d08
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions beacon-chain/sync/rpc_blob_sidecars_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
"go.opencensus.io/trace"
)

func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) (uint64, error) {
func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, wQuota uint64, stream libp2pcore.Stream) (uint64, error) {
// Defensive check to guard against underflow.
if wQuota == 0 {
return 0, nil
}
ctx, span := trace.StartSpan(ctx, "sync.streamBlobBatch")
defer span.End()
var writes uint64
for _, b := range batch.canonical() {
root := b.Root()
scs, err := s.cfg.beaconDB.BlobSidecarsByRoot(ctx, b.Root())
Expand All @@ -31,25 +34,30 @@ func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, stream
}
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
return writes, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root)
return wQuota, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root)
}
for _, sc := range scs {
SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return writes, chunkErr
return wQuota, chunkErr
}
s.rateLimiter.add(stream, 1)
writes += 1
wQuota -= 1
// Stop streaming results once the quota of writes for the request is consumed.
if wQuota == 0 {
return 0, nil
}
}
}
return writes, nil
return wQuota, nil
}

// blobsSidecarsByRangeRPCHandler looks up the request blobs from the database from a given start slot index
func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
var err error
ctx, span := trace.StartSpan(ctx, "sync.BlobsSidecarsByRangeHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, respTimeout)
Expand Down Expand Up @@ -78,17 +86,16 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)

var batch blockBatch
var totalWrites uint64
wQuota := params.BeaconNetworkConfig().MaxRequestBlobsSidecars
for batch, ok = batcher.next(ctx, stream); ok; batch, ok = batcher.next(ctx, stream) {
batchStart := time.Now()
writes, err := s.streamBlobBatch(ctx, batch, stream)
wQuota, err = s.streamBlobBatch(ctx, batch, wQuota, stream)
rpcBlobsByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds()))
if err != nil {
return err
}
totalWrites += writes
// once we have written MAX_REQUEST_BLOB_SIDECARS, we're done serving the request
if totalWrites >= params.BeaconNetworkConfig().MaxRequestBlobsSidecars {
if wQuota == 0 {
break
}
}
Expand Down

0 comments on commit a423d08

Please sign in to comment.