Skip to content

Commit

Permalink
BlobSidecarsByRange rpc handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Jun 7, 2023
1 parent 58bd57a commit d7e8b59
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 1 deletion.
10 changes: 10 additions & 0 deletions beacon-chain/p2p/rpc_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const PingMessageName = "/ping"
// MetadataMessageName specifies the name for the metadata message topic.
const MetadataMessageName = "/metadata"

// BlobSidecarsByRangeName is the name for the BlobSidecarsByRange v1 message topic.
const BlobSidecarsByRangeName = "/blob_sidecars_by_range"

// BlobSidecarsByRootName is the name for the BlobSidecarsByRoot v1 message topic.
const BlobSidecarsByRootName = "/blob_sidecars_by_root"

Expand All @@ -55,6 +58,10 @@ const (
// RPCMetaDataTopicV1 defines the v1 topic for the metadata rpc method.
RPCMetaDataTopicV1 = protocolPrefix + MetadataMessageName + SchemaVersionV1

// RPCBlobSidecarsByRangeTopicV1 is a topic for requesting blob sidecars
// in the slot range [start_slot, start_slot + count), leading up to the current head block as selected by fork choice.
// Protocol ID: /eth2/beacon_chain/req/blob_sidecars_by_range/1/ - New in deneb.
RPCBlobSidecarsByRangeTopicV1 = protocolPrefix + BlobSidecarsByRangeName + SchemaVersionV1
// RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. New in deneb.
// /eth2/beacon_chain/req/blob_sidecars_by_root/1/
RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1
Expand Down Expand Up @@ -90,6 +97,8 @@ var RPCTopicMappings = map[string]interface{}{
// RPC Metadata Message
RPCMetaDataTopicV1: new(interface{}),
RPCMetaDataTopicV2: new(interface{}),
// BlobSidecarsByRange v1 Message
RPCBlobSidecarsByRangeTopicV1: new(pb.BlobSidecarsByRangeRequest),
// BlobSidecarsByRoot v1 Message
RPCBlobSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq),
}
Expand All @@ -108,6 +117,7 @@ var messageMapping = map[string]bool{
BeaconBlocksByRootsMessageName: true,
PingMessageName: true,
MetadataMessageName: true,
BlobSidecarsByRangeName: true,
BlobSidecarsByRootName: true,
}

Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"rpc.go",
"rpc_beacon_blocks_by_range.go",
"rpc_beacon_blocks_by_root.go",
"rpc_blob_sidecars_by_range.go",
"rpc_blob_sidecars_by_root.go",
"rpc_chunked_response.go",
"rpc_goodbye.go",
Expand Down Expand Up @@ -149,6 +150,7 @@ go_test(
"rate_limiter_test.go",
"rpc_beacon_blocks_by_range_test.go",
"rpc_beacon_blocks_by_root_test.go",
"rpc_blob_sidecars_by_range_test.go",
"rpc_blob_sidecars_by_root_test.go",
"rpc_chunked_response_test.go",
"rpc_goodbye_test.go",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/blobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func (c *blobsTestCase) setup(t *testing.T) (*Service, []*ethpb.BlobSidecar, fun
}

byRootRate := params.BeaconNetworkConfig().MaxRequestBlobsSidecars * fieldparams.MaxBlobsPerBlock
byRangeRate := params.BeaconNetworkConfig().MaxRequestBlobsSidecars * params.BeaconConfig().MaxBlobsPerBlock
s.setRateCollector(p2p.RPCBlobSidecarsByRootTopicV1, leakybucket.NewCollector(0.000001, int64(byRootRate), time.Second, false))
s.setRateCollector(p2p.RPCBlobSidecarsByRangeTopicV1, leakybucket.NewCollector(0.000001, int64(byRangeRate), time.Second, false))

return s, sidecars, cleanup
}
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/sync/block_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func newBlockBatch(start, reqEnd primitives.Slot, size uint64) (blockBatch, bool
if start > reqEnd {
return blockBatch{}, false
}
if size == 0 {
return blockBatch{}, false
}
nb := blockBatch{start: start, end: start.Add(size - 1)}
if nb.end > reqEnd {
nb.end = reqEnd
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/block_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,8 @@ func TestBlockBatchNext(t *testing.T) {
})
}
}

func TestZeroSizeNoOp(t *testing.T) {
_, more := newBlockBatch(12345, 12345, 0)
require.Equal(t, false, more)
}
7 changes: 7 additions & 0 deletions beacon-chain/sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ var (
Buckets: []float64{5, 10, 50, 100, 150, 250, 500, 1000, 2000},
},
)
rpcBlobsByRangeResponseLatency = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "rpc_blobs_by_range_response_latency_milliseconds",
Help: "Captures total time to respond to rpc BlobsByRange requests in a milliseconds distribution",
Buckets: []float64{5, 10, 50, 100, 150, 250, 500, 1000, 2000},
},
)
arrivalBlockPropagationHistogram = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "block_arrival_latency_milliseconds",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {

// BlobSidecarsByRootV1
topicMap[addEncoding(p2p.RPCBlobSidecarsByRootTopicV1)] = blobCollector
// BlobSidecarsByRangeV1
topicMap[addEncoding(p2p.RPCBlobSidecarsByRangeTopicV1)] = blobCollector

// General topic for all rpc requests.
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestNewRateLimiter(t *testing.T) {
rlimiter := newRateLimiter(mockp2p.NewTestP2P(t))
assert.Equal(t, len(rlimiter.limiterMap), 11, "correct number of topics not registered")
assert.Equal(t, len(rlimiter.limiterMap), 12, "correct number of topics not registered")
}

func TestNewRateLimiter_FreeCorrectly(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func (s *Service) registerRPCHandlersAltair() {
}

func (s *Service) registerRPCHandlersDeneb() {
s.registerRPC(
p2p.RPCBlobSidecarsByRangeTopicV1,
s.blobSidecarsByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlobSidecarsByRootTopicV1,
s.blobSidecarByRootRPCHandler,
Expand Down
177 changes: 177 additions & 0 deletions beacon-chain/sync/rpc_blob_sidecars_by_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package sync

import (
"context"
"time"

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"go.opencensus.io/trace"
)

type BlobSidecarProcessor func(sidecar *pb.BlobSidecar) error

func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "sync.streamBlobBatch")
defer span.End()
var writes uint64
for _, b := range batch.canonical() {
root := b.Root()
scs, err := s.cfg.beaconDB.BlobSidecarsByRoot(ctx, b.Root())
if errors.Is(err, db.ErrNotFound) {
continue
}
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
return writes, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root)
}
for _, sc := range scs {
SetStreamWriteDeadline(stream, defaultWriteDuration)
if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, chunkErr)
return writes, chunkErr
}
s.rateLimiter.add(stream, 1)
writes += 1
}
}
return writes, nil
}

// blobsSidecarsByRangeRPCHandler looks up the request blobs from the database from a given start slot index
func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {
ctx, span := trace.StartSpan(ctx, "sync.BlobsSidecarsByRangeHandler")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
SetRPCStreamDeadlines(stream)
log := log.WithField("handler", p2p.BlobSidecarsByRangeName[1:]) // slice the leading slash off the name var

r, ok := msg.(*pb.BlobSidecarsByRangeRequest)
if !ok {
return errors.New("message is not type *pb.BlobsSidecarsByRangeRequest")
}
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
return err
}
rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
tracing.AnnotateError(span, err)
return err
}

// Ticker to stagger out large requests.
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)

var batch blockBatch
var totalWrites uint64
for batch, ok = batcher.next(ctx, stream); ok; batch, ok = batcher.next(ctx, stream) {
batchStart := time.Now()
writes, err := s.streamBlobBatch(ctx, batch, stream)
rpcBlobsByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds()))
if err != nil {
return err
}
totalWrites += writes
// once we have written MAX_REQUEST_BLOB_SIDECARS, we're done serving the request
if totalWrites >= params.BeaconNetworkConfig().MaxRequestBlobsSidecars {
break
}
}
if err := batch.error(); err != nil {
log.WithError(err).Debug("error in BlocksByRange batch")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
tracing.AnnotateError(span, err)
return err
}

closeStream(stream, log)
return nil
}

func blobsByRangeMinStartSlot(current primitives.Slot) (primitives.Slot, error) {
minReqEpochs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
currEpoch := slots.ToEpoch(current)
minStart := params.BeaconConfig().DenebForkEpoch
if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStart {
minStart = currEpoch - minReqEpochs
}
return slots.EpochStart(minStart)
}

func blobBatchLimit() uint64 {
return uint64(flags.Get().BlockBatchLimit) / params.BeaconConfig().MaxBlobsPerBlock
}

func validateBlobsByRange(r *pb.BlobSidecarsByRangeRequest, current primitives.Slot) (rangeParams, error) {
rp := rangeParams{
start: r.StartSlot,
size: r.Count,
}
// Peers may overshoot the current slot when in initial sync, so we don't want to penalize them by treating the
// request as an error. So instead we return a set of params that acts as a noop.
if rp.start > current {
return rangeParams{start: current, end: current, size: 0}, nil
}

var err error
rp.end, err = rp.start.SafeAdd((rp.size - 1))
if err != nil {
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "overflow start + count -1")
}

maxRequest := params.BeaconNetworkConfig().MaxRequestBlocksDeneb
// Allow some wiggle room, up to double the MaxRequestBlocks past the current slot,
// to give nodes syncing close to the head of the chain some margin for error.
maxStart, err := current.SafeAdd(maxRequest * 2)
if err != nil {
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "current + maxRequest * 2 > max uint")
}

// Clients MUST keep a record of signed blobs sidecars seen on the epoch range
// [max(current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch]
// where current_epoch is defined by the current wall-clock time,
// and clients MUST support serving requests of blobs on this range.
minStartSlot, err := blobsByRangeMinStartSlot(current)
if err != nil {
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "blobsByRangeMinStartSlot error")
}
if rp.start > maxStart {
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "start > maxStart")
}
if rp.start < minStartSlot {
rp.start = minStartSlot
}

if rp.end > current {
rp.end = current
}
if rp.end < rp.start {
rp.end = rp.start
}

limit := blobBatchLimit()
if limit > maxRequest {
limit = maxRequest
}
if rp.size > limit {
rp.size = limit
}

return rp, nil
}
Loading

0 comments on commit d7e8b59

Please sign in to comment.