From 4f73f4f8e3729aed4b4690a71c55935724814bde Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Thu, 15 Jun 2023 14:38:08 -0500 Subject: [PATCH] BlobSidecarsByRoot (#12420) * BlobSidecarsByRoot RPC handler * BlobSidecarsByRange rpc handler (#12499) Co-authored-by: Kasey Kirkham --------- Co-authored-by: Kasey Kirkham --- beacon-chain/p2p/peers/peers_test.go | 2 + beacon-chain/p2p/rpc_topic_mappings.go | 33 ++ beacon-chain/p2p/types/BUILD.bazel | 2 + beacon-chain/p2p/types/rpc_errors.go | 2 + beacon-chain/p2p/types/types.go | 93 ++++- beacon-chain/p2p/types/types_test.go | 72 +++- beacon-chain/sync/BUILD.bazel | 6 + beacon-chain/sync/blobs_test.go | 344 ++++++++++++++++++ beacon-chain/sync/block_batcher.go | 3 + beacon-chain/sync/block_batcher_test.go | 5 + beacon-chain/sync/context.go | 33 +- beacon-chain/sync/error.go | 1 + beacon-chain/sync/metrics.go | 7 + beacon-chain/sync/rate_limiter.go | 12 + beacon-chain/sync/rate_limiter_test.go | 3 +- beacon-chain/sync/rpc.go | 14 + .../sync/rpc_blob_sidecars_by_range.go | 186 ++++++++++ .../sync/rpc_blob_sidecars_by_range_test.go | 286 +++++++++++++++ .../sync/rpc_blob_sidecars_by_root.go | 132 +++++++ .../sync/rpc_blob_sidecars_by_root_test.go | 306 ++++++++++++++++ beacon-chain/sync/rpc_chunked_response.go | 21 ++ beacon-chain/sync/rpc_send_request.go | 116 ++++++ beacon-chain/sync/rpc_send_request_test.go | 113 ++++++ beacon-chain/sync/service.go | 5 + beacon-chain/sync/sync_test.go | 2 + cmd/beacon-chain/flags/base.go | 12 + cmd/beacon-chain/flags/config.go | 4 + config/fieldparams/mainnet.go | 1 + config/fieldparams/minimal.go | 1 + config/params/BUILD.bazel | 1 + config/params/config.go | 22 +- config/params/configset.go | 10 +- config/params/interop.go | 1 + config/params/loader.go | 2 + config/params/mainnet_config.go | 3 + config/params/testnet_e2e_config.go | 2 + 36 files changed, 1842 insertions(+), 16 deletions(-) create mode 100644 beacon-chain/sync/blobs_test.go create mode 100644 beacon-chain/sync/rpc_blob_sidecars_by_range.go create mode 100644 beacon-chain/sync/rpc_blob_sidecars_by_range_test.go create mode 100644 beacon-chain/sync/rpc_blob_sidecars_by_root.go create mode 100644 beacon-chain/sync/rpc_blob_sidecars_by_root_test.go diff --git a/beacon-chain/p2p/peers/peers_test.go b/beacon-chain/p2p/peers/peers_test.go index 661e4896b9a0..dd568cf189e4 100644 --- a/beacon-chain/p2p/peers/peers_test.go +++ b/beacon-chain/p2p/peers/peers_test.go @@ -22,6 +22,8 @@ func TestMain(m *testing.M) { flags.Init(&flags.GlobalFlags{ BlockBatchLimit: 64, BlockBatchLimitBurstFactor: 10, + BlobBatchLimit: 8, + BlobBatchLimitBurstFactor: 2, }) defer func() { flags.Init(resetFlags) diff --git a/beacon-chain/p2p/rpc_topic_mappings.go b/beacon-chain/p2p/rpc_topic_mappings.go index 346be8973bfb..f2ed7d3b8726 100644 --- a/beacon-chain/p2p/rpc_topic_mappings.go +++ b/beacon-chain/p2p/rpc_topic_mappings.go @@ -37,6 +37,12 @@ const PingMessageName = "/ping" // MetadataMessageName specifies the name for the metadata message topic. const MetadataMessageName = "/metadata" +// BlobSidecarsByRangeName is the name for the BlobSidecarsByRange v1 message topic. +const BlobSidecarsByRangeName = "/blob_sidecars_by_range" + +// BlobSidecarsByRootName is the name for the BlobSidecarsByRoot v1 message topic. +const BlobSidecarsByRootName = "/blob_sidecars_by_root" + const ( // V1 RPC Topics // RPCStatusTopicV1 defines the v1 topic for the status rpc method. @@ -52,6 +58,14 @@ const ( // RPCMetaDataTopicV1 defines the v1 topic for the metadata rpc method. RPCMetaDataTopicV1 = protocolPrefix + MetadataMessageName + SchemaVersionV1 + // RPCBlobSidecarsByRangeTopicV1 is a topic for requesting blob sidecars + // in the slot range [start_slot, start_slot + count), leading up to the current head block as selected by fork choice. + // Protocol ID: /eth2/beacon_chain/req/blob_sidecars_by_range/1/ - New in deneb. + RPCBlobSidecarsByRangeTopicV1 = protocolPrefix + BlobSidecarsByRangeName + SchemaVersionV1 + // RPCBlobSidecarsByRootTopicV1 is a topic for requesting blob sidecars by their block root. New in deneb. + // /eth2/beacon_chain/req/blob_sidecars_by_root/1/ + RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1 + // V2 RPC Topics // RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method. RPCBlocksByRangeTopicV2 = protocolPrefix + BeaconBlocksByRangeMessageName + SchemaVersionV2 @@ -83,6 +97,10 @@ var RPCTopicMappings = map[string]interface{}{ // RPC Metadata Message RPCMetaDataTopicV1: new(interface{}), RPCMetaDataTopicV2: new(interface{}), + // BlobSidecarsByRange v1 Message + RPCBlobSidecarsByRangeTopicV1: new(pb.BlobSidecarsByRangeRequest), + // BlobSidecarsByRoot v1 Message + RPCBlobSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq), } // Maps all registered protocol prefixes. @@ -99,6 +117,8 @@ var messageMapping = map[string]bool{ BeaconBlocksByRootsMessageName: true, PingMessageName: true, MetadataMessageName: true, + BlobSidecarsByRangeName: true, + BlobSidecarsByRootName: true, } // Maps all the RPC messages which are to updated in altair. @@ -113,6 +133,19 @@ var versionMapping = map[string]bool{ SchemaVersionV2: true, } +// OmitContextBytesV1 keeps track of which RPC methods do not write context bytes in their v1 incarnations. +// Phase0 did not have the notion of context bytes, which prefix wire-encoded values with a [4]byte identifier +// to convey the schema for the receiver to use. These RPCs had a version bump to V2 when the context byte encoding +// was introduced. For other RPC methods, context bytes are always required. +var OmitContextBytesV1 = map[string]bool{ + StatusMessageName: true, + GoodbyeMessageName: true, + BeaconBlocksByRangeMessageName: true, + BeaconBlocksByRootsMessageName: true, + PingMessageName: true, + MetadataMessageName: true, +} + // VerifyTopicMapping verifies that the topic and its accompanying // message type is correct. func VerifyTopicMapping(topic string, msg interface{}) error { diff --git a/beacon-chain/p2p/types/BUILD.bazel b/beacon-chain/p2p/types/BUILD.bazel index 719714ecc34a..c317507ee9f0 100644 --- a/beacon-chain/p2p/types/BUILD.bazel +++ b/beacon-chain/p2p/types/BUILD.bazel @@ -41,7 +41,9 @@ go_test( "//config/params:go_default_library", "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", + "//proto/prysm/v1alpha1:go_default_library", "//testing/assert:go_default_library", "//testing/require:go_default_library", + "@com_github_prysmaticlabs_fastssz//:go_default_library", ], ) diff --git a/beacon-chain/p2p/types/rpc_errors.go b/beacon-chain/p2p/types/rpc_errors.go index 6a9764289bb1..9fb7d6e6b4de 100644 --- a/beacon-chain/p2p/types/rpc_errors.go +++ b/beacon-chain/p2p/types/rpc_errors.go @@ -12,4 +12,6 @@ var ( ErrRateLimited = errors.New("rate limited") ErrIODeadline = errors.New("i/o deadline exceeded") ErrInvalidRequest = errors.New("invalid range, step or count") + ErrBlobLTMinRequest = errors.New("blob slot < minimum_request_epoch") + ErrMaxBlobReqExceeded = errors.New("requested more than MAX_REQUEST_BLOB_SIDECARS") ) diff --git a/beacon-chain/p2p/types/types.go b/beacon-chain/p2p/types/types.go index 44def488a17b..b88325bf4790 100644 --- a/beacon-chain/p2p/types/types.go +++ b/beacon-chain/p2p/types/types.go @@ -4,9 +4,13 @@ package types import ( + "bytes" + "sort" + "github.com/pkg/errors" ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v4/config/params" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ) const rootLength = 32 @@ -64,7 +68,7 @@ func (r *BeaconBlockByRootsReq) UnmarshalSSZ(buf []byte) error { bufLen := len(buf) maxLength := int(params.BeaconNetworkConfig().MaxRequestBlocks * rootLength) if bufLen > maxLength { - return errors.Errorf("expected buffer with length of upto %d but received length %d", maxLength, bufLen) + return errors.Errorf("expected buffer with length of up to %d but received length %d", maxLength, bufLen) } if bufLen%rootLength != 0 { return ssz.ErrIncorrectByteSize @@ -120,3 +124,90 @@ func (m *ErrorMessage) UnmarshalSSZ(buf []byte) error { *m = errMsg return nil } + +// BlobSidecarsByRootReq is used to specify a list of blob targets (root+index) in a BlobSidecarsByRoot RPC request. +type BlobSidecarsByRootReq []*eth.BlobIdentifier + +// BlobIdentifier is a fixed size value, so we can compute its fixed size at start time (see init below) +var blobIdSize int + +// SizeSSZ returns the size of the serialized representation. +func (b *BlobSidecarsByRootReq) SizeSSZ() int { + return len(*b) * blobIdSize +} + +// MarshalSSZTo appends the serialized BlobSidecarsByRootReq value to the provided byte slice. +func (b *BlobSidecarsByRootReq) MarshalSSZTo(dst []byte) ([]byte, error) { + // A List without an enclosing container is marshaled exactly like a vector, no length offset required. + marshalledObj, err := b.MarshalSSZ() + if err != nil { + return nil, err + } + return append(dst, marshalledObj...), nil +} + +// MarshalSSZ serializes the BlobSidecarsByRootReq value to a byte slice. +func (b *BlobSidecarsByRootReq) MarshalSSZ() ([]byte, error) { + buf := make([]byte, len(*b)*blobIdSize) + for i, id := range *b { + by, err := id.MarshalSSZ() + if err != nil { + return nil, err + } + copy(buf[i*blobIdSize:(i+1)*blobIdSize], by) + } + return buf, nil +} + +// UnmarshalSSZ unmarshals the provided bytes buffer into the +// BlobSidecarsByRootReq value. +func (b *BlobSidecarsByRootReq) UnmarshalSSZ(buf []byte) error { + bufLen := len(buf) + maxLength := int(params.BeaconNetworkConfig().MaxRequestBlobSidecars) * blobIdSize + if bufLen > maxLength { + return errors.Errorf("expected buffer with length of up to %d but received length %d", maxLength, bufLen) + } + if bufLen%blobIdSize != 0 { + return errors.Wrapf(ssz.ErrIncorrectByteSize, "size=%d", bufLen) + } + count := bufLen / blobIdSize + *b = make([]*eth.BlobIdentifier, count) + for i := 0; i < count; i++ { + id := ð.BlobIdentifier{} + err := id.UnmarshalSSZ(buf[i*blobIdSize : (i+1)*blobIdSize]) + if err != nil { + return err + } + (*b)[i] = id + } + return nil +} + +var _ sort.Interface = BlobSidecarsByRootReq{} + +// Less reports whether the element with index i must sort before the element with index j. +// BlobIdentifier will be sorted in lexicographic order by root, with Blob Index as tiebreaker for a given root. +func (s BlobSidecarsByRootReq) Less(i, j int) bool { + rootCmp := bytes.Compare(s[i].BlockRoot, s[j].BlockRoot) + if rootCmp != 0 { + // They aren't equal; return true if i < j, false if i > j. + return rootCmp < 0 + } + // They are equal; blob index is the tie breaker. + return s[i].Index < s[j].Index +} + +// Swap swaps the elements with indexes i and j. +func (s BlobSidecarsByRootReq) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Len is the number of elements in the collection. +func (s BlobSidecarsByRootReq) Len() int { + return len(s) +} + +func init() { + sizer := ð.BlobIdentifier{} + blobIdSize = sizer.SizeSSZ() +} diff --git a/beacon-chain/p2p/types/types_test.go b/beacon-chain/p2p/types/types_test.go index 0e2ff8f40c50..0d9341062305 100644 --- a/beacon-chain/p2p/types/types_test.go +++ b/beacon-chain/p2p/types/types_test.go @@ -4,12 +4,82 @@ import ( "encoding/hex" "testing" + ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" ) +func generateBlobIdentifiers(n int) []*eth.BlobIdentifier { + r := make([]*eth.BlobIdentifier, n) + for i := 0; i < n; i++ { + r[i] = ð.BlobIdentifier{ + BlockRoot: bytesutil.PadTo([]byte{byte(i)}, 32), + Index: 0, + } + } + return r +} + +func TestBlobSidecarsByRootReq_MarshalSSZ(t *testing.T) { + cases := []struct { + name string + ids []*eth.BlobIdentifier + marshalErr error + unmarshalErr error + unmarshalMod func([]byte) []byte + }{ + { + name: "empty list", + }, + { + name: "single item list", + ids: generateBlobIdentifiers(1), + }, + { + name: "10 item list", + ids: generateBlobIdentifiers(10), + }, + { + name: "wonky unmarshal size", + ids: generateBlobIdentifiers(10), + unmarshalMod: func(in []byte) []byte { + in = append(in, byte(0)) + return in + }, + unmarshalErr: ssz.ErrIncorrectByteSize, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + r := BlobSidecarsByRootReq(c.ids) + by, err := r.MarshalSSZ() + if c.marshalErr != nil { + require.ErrorIs(t, err, c.marshalErr) + return + } + require.NoError(t, err) + if c.unmarshalMod != nil { + by = c.unmarshalMod(by) + } + got := &BlobSidecarsByRootReq{} + err = got.UnmarshalSSZ(by) + if c.unmarshalErr != nil { + require.ErrorIs(t, err, c.unmarshalErr) + return + } + require.NoError(t, err) + for i, gid := range *got { + require.DeepEqual(t, c.ids[i], gid) + } + }) + } +} + func TestBeaconBlockByRootsReq_Limit(t *testing.T) { fixedRoots := make([][32]byte, 0) for i := uint64(0); i < params.BeaconNetworkConfig().MaxRequestBlocks+100; i++ { @@ -25,7 +95,7 @@ func TestBeaconBlockByRootsReq_Limit(t *testing.T) { buf = append(buf, rt[:]...) } req2 := BeaconBlockByRootsReq(nil) - require.ErrorContains(t, "expected buffer with length of upto", req2.UnmarshalSSZ(buf)) + require.ErrorContains(t, "expected buffer with length of up to", req2.UnmarshalSSZ(buf)) } func TestErrorResponse_Limit(t *testing.T) { diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 7c5ac46b28ac..54db67110a74 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -22,6 +22,8 @@ go_library( "rpc.go", "rpc_beacon_blocks_by_range.go", "rpc_beacon_blocks_by_root.go", + "rpc_blob_sidecars_by_range.go", + "rpc_blob_sidecars_by_root.go", "rpc_chunked_response.go", "rpc_goodbye.go", "rpc_metadata.go", @@ -89,6 +91,7 @@ go_library( "//cache/lru:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", @@ -136,6 +139,7 @@ go_test( size = "small", srcs = [ "batch_verifier_test.go", + "blobs_test.go", "block_batcher_test.go", "broadcast_bls_changes_test.go", "context_test.go", @@ -147,6 +151,8 @@ go_test( "rate_limiter_test.go", "rpc_beacon_blocks_by_range_test.go", "rpc_beacon_blocks_by_root_test.go", + "rpc_blob_sidecars_by_range_test.go", + "rpc_blob_sidecars_by_root_test.go", "rpc_chunked_response_test.go", "rpc_goodbye_test.go", "rpc_handler_test.go", diff --git a/beacon-chain/sync/blobs_test.go b/beacon-chain/sync/blobs_test.go new file mode 100644 index 000000000000..07a8dba04ace --- /dev/null +++ b/beacon-chain/sync/blobs_test.go @@ -0,0 +1,344 @@ +package sync + +import ( + "context" + "encoding/binary" + "math" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" + mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" + db "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/config/params" + types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/network/forks" + enginev1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/testing/util" + "github.com/prysmaticlabs/prysm/v4/time/slots" +) + +type blobsTestCase struct { + name string + nblocks int // how many blocks to loop through in setting up test fixtures & requests + missing map[int]bool // skip this blob index, so that we can test different custody scenarios + expired map[int]bool // mark block expired to test scenarios where requests are outside retention window + chain *mock.ChainService + clock *startup.Clock // allow tests to control retention window via current slot and finalized checkpoint + total *int // allow a test to specify the total number of responses received + err error + serverHandle testHandler + defineExpected expectedDefiner + requestFromSidecars requestFromSidecars + topic protocol.ID + oldestSlot oldestSlotCallback + streamReader expectedRequirer +} + +type testHandler func(s *Service) rpcHandler +type expectedDefiner func(t *testing.T, scs []*ethpb.BlobSidecar, req interface{}) []*expectedBlobChunk +type requestFromSidecars func([]*ethpb.BlobSidecar) interface{} +type oldestSlotCallback func(t *testing.T) types.Slot +type expectedRequirer func(*testing.T, *Service, []*expectedBlobChunk) func(network.Stream) + +func generateTestBlockWithSidecars(t *testing.T, parent [32]byte, slot types.Slot, nblobs int) (*ethpb.SignedBeaconBlockDeneb, []*ethpb.BlobSidecar) { + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). + stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength) + receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength) + logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength) + parentHash := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength) + tx := gethTypes.NewTransaction( + 0, + common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), + big.NewInt(0), 0, big.NewInt(0), + nil, + ) + txs := []*gethTypes.Transaction{tx} + encodedBinaryTxs := make([][]byte, 1) + var err error + encodedBinaryTxs[0], err = txs[0].MarshalBinary() + require.NoError(t, err) + blockHash := bytesutil.ToBytes32([]byte("foo")) + payload := &enginev1.ExecutionPayloadDeneb{ + ParentHash: parentHash, + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: stateRoot, + ReceiptsRoot: receiptsRoot, + LogsBloom: logsBloom, + PrevRandao: blockHash[:], + BlockNumber: 0, + GasLimit: 0, + GasUsed: 0, + Timestamp: 0, + ExtraData: make([]byte, 0), + BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength), + ExcessDataGas: 0, + BlockHash: blockHash[:], + Transactions: encodedBinaryTxs, + } + block := util.NewBeaconBlockDeneb() + block.Block.Body.ExecutionPayload = payload + block.Block.Slot = slot + block.Block.ParentRoot = parent[:] + commitments := make([][48]byte, nblobs) + block.Block.Body.BlobKzgCommitments = make([][]byte, nblobs) + for i := range commitments { + binary.LittleEndian.PutUint64(commitments[i][:], uint64(i)) + block.Block.Body.BlobKzgCommitments[i] = commitments[i][:] + } + + root, err := block.Block.HashTreeRoot() + require.NoError(t, err) + + sidecars := make([]*ethpb.BlobSidecar, len(commitments)) + for i, c := range block.Block.Body.BlobKzgCommitments { + sidecars[i] = generateTestSidecar(root, block, i, c) + } + return block, sidecars +} + +func generateTestSidecar(root [32]byte, block *ethpb.SignedBeaconBlockDeneb, index int, commitment []byte) *ethpb.BlobSidecar { + blob := make([]byte, fieldparams.BlobSize) + binary.LittleEndian.PutUint64(blob, uint64(index)) + sc := ðpb.BlobSidecar{ + BlockRoot: root[:], + Index: uint64(index), + Slot: block.Block.Slot, + BlockParentRoot: block.Block.ParentRoot, + ProposerIndex: block.Block.ProposerIndex, + Blob: blob, + KzgCommitment: commitment, + KzgProof: commitment, + } + return sc +} + +type expectedBlobChunk struct { + code uint8 + sidecar *ethpb.BlobSidecar + message string +} + +func (r *expectedBlobChunk) requireExpected(t *testing.T, s *Service, stream network.Stream) { + encoding := s.cfg.p2p.Encoding() + + code, _, err := ReadStatusCode(stream, encoding) + require.NoError(t, err) + require.Equal(t, r.code, code, "unexpected response code") + if code != responseCodeSuccess { + return + } + + c, err := readContextFromStream(stream) + require.NoError(t, err) + + valRoot := s.cfg.chain.GenesisValidatorsRoot() + ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(r.sidecar.GetSlot()), valRoot[:]) + require.NoError(t, err) + require.Equal(t, ctxBytes, bytesutil.ToBytes4(c)) + + sc := ðpb.BlobSidecar{} + require.NoError(t, encoding.DecodeWithMaxLength(stream, sc)) + require.Equal(t, bytesutil.ToBytes32(sc.BlockRoot), bytesutil.ToBytes32(r.sidecar.BlockRoot)) + require.Equal(t, sc.Index, r.sidecar.Index) +} + +func (c *blobsTestCase) setup(t *testing.T) (*Service, []*ethpb.BlobSidecar, func()) { + cfg := params.BeaconConfig() + repositionFutureEpochs(cfg) + undo, err := params.SetActiveWithUndo(cfg) + require.NoError(t, err) + cleanup := func() { + require.NoError(t, undo()) + } + maxBlobs := fieldparams.MaxBlobsPerBlock + chain, clock := defaultMockChain(t) + if c.chain == nil { + c.chain = chain + } + if c.clock == nil { + c.clock = clock + } + d := db.SetupDB(t) + + sidecars := make([]*ethpb.BlobSidecar, 0) + oldest := c.oldestSlot(t) + var parentRoot [32]byte + for i := 0; i < c.nblocks; i++ { + // check if there is a slot override for this index + // ie to create a block outside the minimum_request_epoch + var bs types.Slot + if c.expired[i] { + // the lowest possible bound of the retention period is the deneb epoch, so make sure + // the slot of an expired block is at least one slot less than the deneb epoch. + bs = oldest - 1 - types.Slot(i) + } else { + bs = oldest + types.Slot(i) + } + block, bsc := generateTestBlockWithSidecars(t, parentRoot, bs, maxBlobs) + root, err := block.Block.HashTreeRoot() + require.NoError(t, err) + sidecars = append(sidecars, bsc...) + util.SaveBlock(t, context.Background(), d, block) + parentRoot = root + } + + client := p2ptest.NewTestP2P(t) + s := &Service{ + cfg: &config{p2p: client, chain: c.chain, clock: clock, beaconDB: d}, + rateLimiter: newRateLimiter(client), + } + + byRootRate := params.BeaconNetworkConfig().MaxRequestBlobSidecars * fieldparams.MaxBlobsPerBlock + byRangeRate := params.BeaconNetworkConfig().MaxRequestBlobSidecars * fieldparams.MaxBlobsPerBlock + s.setRateCollector(p2p.RPCBlobSidecarsByRootTopicV1, leakybucket.NewCollector(0.000001, int64(byRootRate), time.Second, false)) + s.setRateCollector(p2p.RPCBlobSidecarsByRangeTopicV1, leakybucket.NewCollector(0.000001, int64(byRangeRate), time.Second, false)) + + return s, sidecars, cleanup +} + +func defaultExpectedRequirer(t *testing.T, s *Service, expect []*expectedBlobChunk) func(network.Stream) { + return func(stream network.Stream) { + for _, ex := range expect { + ex.requireExpected(t, s, stream) + } + } +} + +func (c *blobsTestCase) run(t *testing.T) { + s, sidecars, cleanup := c.setup(t) + defer cleanup() + req := c.requestFromSidecars(sidecars) + expect := c.defineExpected(t, sidecars, req) + m := map[types.Slot][]*ethpb.BlobSidecar{} + for _, sc := range expect { + // If define expected omits a sidecar from an expected result, we don't need to save it. + // This can happen in particular when there are no expected results, because the nth part of the + // response is an error (or none at all when the whole request is invalid). + if sc.sidecar != nil { + m[sc.sidecar.Slot] = append(m[sc.sidecar.Slot], sc.sidecar) + } + } + for _, blobSidecars := range m { + require.NoError(t, s.cfg.beaconDB.SaveBlobSidecar(context.Background(), blobSidecars)) + } + if c.total != nil { + require.Equal(t, *c.total, len(expect)) + } + rht := &rpcHandlerTest{ + t: t, + topic: c.topic, + timeout: time.Second * 10, + err: c.err, + s: s, + } + rht.testHandler(c.streamReader(t, s, expect), c.serverHandle(s), req) +} + +// we use max uints for future forks, but this causes overflows when computing slots +// so it is helpful in tests to temporarily reposition the epochs to give room for some math. +func repositionFutureEpochs(cfg *params.BeaconChainConfig) { + if cfg.CapellaForkEpoch == math.MaxUint64 { + cfg.CapellaForkEpoch = cfg.BellatrixForkEpoch + 100 + } + if cfg.DenebForkEpoch == math.MaxUint64 { + cfg.DenebForkEpoch = cfg.CapellaForkEpoch + 100 + } +} + +func defaultMockChain(t *testing.T) (*mock.ChainService, *startup.Clock) { + de := params.BeaconConfig().DenebForkEpoch + df, err := forks.Fork(de) + require.NoError(t, err) + denebBuffer := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + 1000 + ce := de + denebBuffer + fe := ce - 2 + cs, err := slots.EpochStart(ce) + require.NoError(t, err) + now := time.Now() + genOffset := types.Slot(params.BeaconConfig().SecondsPerSlot) * cs + genesis := now.Add(-1 * time.Second * time.Duration(int64(genOffset))) + clock := startup.NewClock(genesis, [32]byte{}) + chain := &mock.ChainService{ + FinalizedCheckPoint: ðpb.Checkpoint{Epoch: fe}, + Fork: df, + } + + return chain, clock +} + +func TestTestcaseSetup_BlocksAndBlobs(t *testing.T) { + ctx := context.Background() + nblocks := 10 + c := &blobsTestCase{nblocks: nblocks} + c.oldestSlot = c.defaultOldestSlotByRoot + s, sidecars, cleanup := c.setup(t) + req := blobRootRequestFromSidecars(sidecars) + expect := c.filterExpectedByRoot(t, sidecars, req) + defer cleanup() + maxed := nblocks * fieldparams.MaxBlobsPerBlock + require.Equal(t, maxed, len(sidecars)) + require.Equal(t, maxed, len(expect)) + for _, sc := range sidecars { + blk, err := s.cfg.beaconDB.Block(ctx, bytesutil.ToBytes32(sc.BlockRoot)) + require.NoError(t, err) + var found *int + comms, err := blk.Block().Body().BlobKzgCommitments() + require.NoError(t, err) + for i, cm := range comms { + if bytesutil.ToBytes48(sc.KzgCommitment) == bytesutil.ToBytes48(cm) { + found = &i + } + } + require.Equal(t, true, found != nil) + } +} + +func TestRoundTripDenebSave(t *testing.T) { + ctx := context.Background() + cfg := params.BeaconConfig() + repositionFutureEpochs(cfg) + undo, err := params.SetActiveWithUndo(cfg) + require.NoError(t, err) + defer func() { + require.NoError(t, undo()) + }() + parentRoot := [32]byte{} + c := blobsTestCase{nblocks: 10} + chain, clock := defaultMockChain(t) + c.chain = chain + c.clock = clock + oldest, err := slots.EpochStart(blobMinReqEpoch(c.chain.FinalizedCheckPoint.Epoch, slots.ToEpoch(c.clock.CurrentSlot()))) + require.NoError(t, err) + maxBlobs := fieldparams.MaxBlobsPerBlock + block, bsc := generateTestBlockWithSidecars(t, parentRoot, oldest, maxBlobs) + require.Equal(t, len(block.Block.Body.BlobKzgCommitments), len(bsc)) + require.Equal(t, maxBlobs, len(bsc)) + for i := range bsc { + require.DeepEqual(t, block.Block.Body.BlobKzgCommitments[i], bsc[i].KzgCommitment) + } + d := db.SetupDB(t) + util.SaveBlock(t, ctx, d, block) + root, err := block.Block.HashTreeRoot() + require.NoError(t, err) + dbBlock, err := d.Block(ctx, root) + require.NoError(t, err) + comms, err := dbBlock.Block().Body().BlobKzgCommitments() + require.NoError(t, err) + require.Equal(t, maxBlobs, len(comms)) + for i := range bsc { + require.DeepEqual(t, comms[i], bsc[i].KzgCommitment) + } +} diff --git a/beacon-chain/sync/block_batcher.go b/beacon-chain/sync/block_batcher.go index 040dcd71e4bc..9ab42d18bc03 100644 --- a/beacon-chain/sync/block_batcher.go +++ b/beacon-chain/sync/block_batcher.go @@ -141,6 +141,9 @@ func newBlockBatch(start, reqEnd primitives.Slot, size uint64) (blockBatch, bool if start > reqEnd { return blockBatch{}, false } + if size == 0 { + return blockBatch{}, false + } nb := blockBatch{start: start, end: start.Add(size - 1)} if nb.end > reqEnd { nb.end = reqEnd diff --git a/beacon-chain/sync/block_batcher_test.go b/beacon-chain/sync/block_batcher_test.go index c185ebd967b8..0c2cfec7848e 100644 --- a/beacon-chain/sync/block_batcher_test.go +++ b/beacon-chain/sync/block_batcher_test.go @@ -132,3 +132,8 @@ func TestBlockBatchNext(t *testing.T) { }) } } + +func TestZeroSizeNoOp(t *testing.T) { + _, more := newBlockBatch(12345, 12345, 0) + require.Equal(t, false, more) +} diff --git a/beacon-chain/sync/context.go b/beacon-chain/sync/context.go index 875af143bce3..6f1aead89d31 100644 --- a/beacon-chain/sync/context.go +++ b/beacon-chain/sync/context.go @@ -4,7 +4,9 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/config/params" ) // Specifies the fixed size context length. @@ -45,18 +47,16 @@ func readContextFromStream(stream network.Stream) ([]byte, error) { } func expectRpcContext(stream network.Stream) (bool, error) { - _, _, version, err := p2p.TopicDeconstructor(string(stream.Protocol())) + _, message, version, err := p2p.TopicDeconstructor(string(stream.Protocol())) if err != nil { return false, err } - switch version { - case p2p.SchemaVersionV1: + // For backwards compatibility, we want to omit context bytes for certain v1 methods that were defined before + // context bytes were introduced into the protocol. + if version == p2p.SchemaVersionV1 && p2p.OmitContextBytesV1[message] { return false, nil - case p2p.SchemaVersionV2: - return true, nil - default: - return false, errors.New("invalid version of %s registered for topic: %s") } + return true, nil } // Minimal interface for a stream with a protocol. @@ -75,3 +75,22 @@ func validateVersion(version string, stream withProtocol) error { } return nil } + +// ContextByteVersions is a mapping between expected values for context bytes +// and the runtime/version identifier they correspond to. This can be used to look up the type +// needed to unmarshal a wire-encoded value. +type ContextByteVersions map[[4]byte]int + +// ContextByteVersionsForValRoot computes a mapping between all possible context bytes values +// and the runtime/version identifier for the corresponding fork. +func ContextByteVersionsForValRoot(valRoot [32]byte) (ContextByteVersions, error) { + m := make(ContextByteVersions) + for fv, v := range params.ConfigForkVersions(params.BeaconConfig()) { + digest, err := signing.ComputeForkDigest(fv[:], valRoot[:]) + if err != nil { + return nil, errors.Wrapf(err, "unable to compute fork digest for fork version %#x", fv) + } + m[digest] = v + } + return m, nil +} diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 650afdeccb9e..100bab890bf5 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -19,6 +19,7 @@ var ErrUnrecognizedVersion = errors.New("cannot determine context bytes for unre var responseCodeSuccess = byte(0x00) var responseCodeInvalidRequest = byte(0x01) var responseCodeServerError = byte(0x02) +var responseCodeResourceUnavailable = byte(0x03) func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error) { return createErrorResponse(code, reason, s.cfg.p2p) diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 0dcd3872fd24..17ecd9045414 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -82,6 +82,13 @@ var ( Buckets: []float64{5, 10, 50, 100, 150, 250, 500, 1000, 2000}, }, ) + rpcBlobsByRangeResponseLatency = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "rpc_blobs_by_range_response_latency_milliseconds", + Help: "Captures total time to respond to rpc BlobsByRange requests in a milliseconds distribution", + Buckets: []float64{5, 10, 50, 100, 150, 250, 500, 1000, 2000}, + }, + ) arrivalBlockPropagationHistogram = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "block_arrival_latency_milliseconds", diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index b6099ea0c3fc..9fdb9d56622f 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -42,6 +42,10 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { allowedBlocksPerSecond := float64(flags.Get().BlockBatchLimit) allowedBlocksBurst := int64(flags.Get().BlockBatchLimitBurstFactor * flags.Get().BlockBatchLimit) + // Initialize blob limits. + allowedBlobsPerSecond := float64(flags.Get().BlobBatchLimit) + allowedBlobsBurst := int64(flags.Get().BlobBatchLimitBurstFactor * flags.Get().BlobBatchLimit) + // Set topic map for all rpc topics. topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings)) // Goodbye Message @@ -59,6 +63,9 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { // Collector for V2 blockCollectorV2 := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, blockBucketPeriod, false /* deleteEmptyBuckets */) + // for BlobSidecarsByRoot and BlobSidecarsByRange + blobCollector := leakybucket.NewCollector(allowedBlobsPerSecond, allowedBlobsBurst, blockBucketPeriod, false) + // BlocksByRoots requests topicMap[addEncoding(p2p.RPCBlocksByRootTopicV1)] = blockCollector topicMap[addEncoding(p2p.RPCBlocksByRootTopicV2)] = blockCollectorV2 @@ -67,6 +74,11 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV1)] = blockCollector topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV2)] = blockCollectorV2 + // BlobSidecarsByRootV1 + topicMap[addEncoding(p2p.RPCBlobSidecarsByRootTopicV1)] = blobCollector + // BlobSidecarsByRangeV1 + topicMap[addEncoding(p2p.RPCBlobSidecarsByRangeTopicV1)] = blobCollector + // General topic for all rpc requests. topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */) diff --git a/beacon-chain/sync/rate_limiter_test.go b/beacon-chain/sync/rate_limiter_test.go index dfa5aac656c3..a4d0835bbc43 100644 --- a/beacon-chain/sync/rate_limiter_test.go +++ b/beacon-chain/sync/rate_limiter_test.go @@ -18,14 +18,13 @@ import ( func TestNewRateLimiter(t *testing.T) { rlimiter := newRateLimiter(mockp2p.NewTestP2P(t)) - assert.Equal(t, len(rlimiter.limiterMap), 10, "correct number of topics not registered") + assert.Equal(t, len(rlimiter.limiterMap), 12, "correct number of topics not registered") } func TestNewRateLimiter_FreeCorrectly(t *testing.T) { rlimiter := newRateLimiter(mockp2p.NewTestP2P(t)) rlimiter.free() assert.Equal(t, len(rlimiter.limiterMap), 0, "rate limiter not freed correctly") - } func TestRateLimiter_ExceedCapacity(t *testing.T) { diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index cf95452afecf..3b3a355ed7d6 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -49,6 +49,9 @@ func (s *Service) registerRPCHandlers() { s.pingHandler, ) s.registerRPCHandlersAltair() + if currEpoch >= params.BeaconConfig().DenebForkEpoch { + s.registerRPCHandlersDeneb() + } return } s.registerRPC( @@ -93,6 +96,17 @@ func (s *Service) registerRPCHandlersAltair() { ) } +func (s *Service) registerRPCHandlersDeneb() { + s.registerRPC( + p2p.RPCBlobSidecarsByRangeTopicV1, + s.blobSidecarsByRangeRPCHandler, + ) + s.registerRPC( + p2p.RPCBlobSidecarsByRootTopicV1, + s.blobSidecarByRootRPCHandler, + ) +} + // Remove all v1 Stream handlers that are no longer supported // from altair onwards. func (s *Service) unregisterPhase0Handlers() { diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range.go b/beacon-chain/sync/rpc_blob_sidecars_by_range.go new file mode 100644 index 000000000000..6c98967a7c51 --- /dev/null +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range.go @@ -0,0 +1,186 @@ +package sync + +import ( + "context" + "time" + + libp2pcore "github.com/libp2p/go-libp2p/core" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" + pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/time/slots" + "go.opencensus.io/trace" +) + +func (s *Service) streamBlobBatch(ctx context.Context, batch blockBatch, wQuota uint64, stream libp2pcore.Stream) (uint64, error) { + // Defensive check to guard against underflow. + if wQuota == 0 { + return 0, nil + } + ctx, span := trace.StartSpan(ctx, "sync.streamBlobBatch") + defer span.End() + for _, b := range batch.canonical() { + root := b.Root() + scs, err := s.cfg.beaconDB.BlobSidecarsByRoot(ctx, b.Root()) + if errors.Is(err, db.ErrNotFound) { + continue + } + if err != nil { + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + return wQuota, errors.Wrapf(err, "could not retrieve sidecars for block root %#x", root) + } + for _, sc := range scs { + SetStreamWriteDeadline(stream, defaultWriteDuration) + if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil { + log.WithError(chunkErr).Debug("Could not send a chunked response") + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, chunkErr) + return wQuota, chunkErr + } + s.rateLimiter.add(stream, 1) + wQuota -= 1 + // Stop streaming results once the quota of writes for the request is consumed. + if wQuota == 0 { + return 0, nil + } + } + } + return wQuota, nil +} + +// blobsSidecarsByRangeRPCHandler looks up the request blobs from the database from a given start slot index +func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + var err error + ctx, span := trace.StartSpan(ctx, "sync.BlobsSidecarsByRangeHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, respTimeout) + defer cancel() + SetRPCStreamDeadlines(stream) + log := log.WithField("handler", p2p.BlobSidecarsByRangeName[1:]) // slice the leading slash off the name var + + r, ok := msg.(*pb.BlobSidecarsByRangeRequest) + if !ok { + return errors.New("message is not type *pb.BlobsSidecarsByRangeRequest") + } + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + return err + } + rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot()) + if err != nil { + s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + tracing.AnnotateError(span, err) + return err + } + + // Ticker to stagger out large requests. + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker) + + var batch blockBatch + wQuota := params.BeaconNetworkConfig().MaxRequestBlobSidecars + for batch, ok = batcher.next(ctx, stream); ok; batch, ok = batcher.next(ctx, stream) { + batchStart := time.Now() + wQuota, err = s.streamBlobBatch(ctx, batch, wQuota, stream) + rpcBlobsByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds())) + if err != nil { + return err + } + // once we have written MAX_REQUEST_BLOB_SIDECARS, we're done serving the request + if wQuota == 0 { + break + } + } + if err := batch.error(); err != nil { + log.WithError(err).Debug("error in BlocksByRange batch") + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err + } + + closeStream(stream, log) + return nil +} + +func blobsByRangeMinStartSlot(current primitives.Slot) (primitives.Slot, error) { + minReqEpochs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + currEpoch := slots.ToEpoch(current) + minStart := params.BeaconConfig().DenebForkEpoch + if currEpoch > minReqEpochs && currEpoch-minReqEpochs > minStart { + minStart = currEpoch - minReqEpochs + } + return slots.EpochStart(minStart) +} + +func blobBatchLimit() uint64 { + return uint64(flags.Get().BlockBatchLimit / fieldparams.MaxBlobsPerBlock) +} + +func validateBlobsByRange(r *pb.BlobSidecarsByRangeRequest, current primitives.Slot) (rangeParams, error) { + if r.Count == 0 { + return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "invalid request Count parameter") + } + rp := rangeParams{ + start: r.StartSlot, + size: r.Count, + } + // Peers may overshoot the current slot when in initial sync, so we don't want to penalize them by treating the + // request as an error. So instead we return a set of params that acts as a noop. + if rp.start > current { + return rangeParams{start: current, end: current, size: 0}, nil + } + + var err error + rp.end, err = rp.start.SafeAdd((rp.size - 1)) + if err != nil { + return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "overflow start + count -1") + } + + maxRequest := params.BeaconNetworkConfig().MaxRequestBlocksDeneb + // Allow some wiggle room, up to double the MaxRequestBlocks past the current slot, + // to give nodes syncing close to the head of the chain some margin for error. + maxStart, err := current.SafeAdd(maxRequest * 2) + if err != nil { + return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "current + maxRequest * 2 > max uint") + } + + // Clients MUST keep a record of signed blobs sidecars seen on the epoch range + // [max(current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch] + // where current_epoch is defined by the current wall-clock time, + // and clients MUST support serving requests of blobs on this range. + minStartSlot, err := blobsByRangeMinStartSlot(current) + if err != nil { + return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "blobsByRangeMinStartSlot error") + } + if rp.start > maxStart { + return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "start > maxStart") + } + if rp.start < minStartSlot { + rp.start = minStartSlot + } + + if rp.end > current { + rp.end = current + } + if rp.end < rp.start { + rp.end = rp.start + } + + limit := blobBatchLimit() + if limit > maxRequest { + limit = maxRequest + } + if rp.size > limit { + rp.size = limit + } + + return rp, nil +} diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go new file mode 100644 index 000000000000..8a4f4386dd09 --- /dev/null +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go @@ -0,0 +1,286 @@ +package sync + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/config/params" + types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/time/slots" +) + +func (c *blobsTestCase) defaultOldestSlotByRange(t *testing.T) types.Slot { + currentEpoch := slots.ToEpoch(c.chain.CurrentSlot()) + oldestEpoch := currentEpoch - params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + if oldestEpoch < params.BeaconConfig().DenebForkEpoch { + oldestEpoch = params.BeaconConfig().DenebForkEpoch + } + oldestSlot, err := slots.EpochStart(oldestEpoch) + require.NoError(t, err) + return oldestSlot +} + +func blobRangeRequestFromSidecars(scs []*ethpb.BlobSidecar) interface{} { + maxBlobs := fieldparams.MaxBlobsPerBlock + count := uint64(len(scs) / maxBlobs) + return ðpb.BlobSidecarsByRangeRequest{ + StartSlot: scs[0].Slot, + Count: count, + } +} + +func (c *blobsTestCase) filterExpectedByRange(t *testing.T, scs []*ethpb.BlobSidecar, req interface{}) []*expectedBlobChunk { + var expect []*expectedBlobChunk + blockOffset := 0 + lastRoot := bytesutil.ToBytes32(scs[0].BlockRoot) + rreq, ok := req.(*ethpb.BlobSidecarsByRangeRequest) + require.Equal(t, true, ok) + var writes uint64 + for _, sc := range scs { + root := bytesutil.ToBytes32(sc.BlockRoot) + if root != lastRoot { + blockOffset += 1 + } + lastRoot = root + + if sc.Slot < c.oldestSlot(t) { + continue + } + if sc.Slot < rreq.StartSlot || sc.Slot > rreq.StartSlot+types.Slot(rreq.Count)-1 { + continue + } + if writes == params.BeaconNetworkConfig().MaxRequestBlobSidecars { + continue + } + expect = append(expect, &expectedBlobChunk{ + sidecar: sc, + code: responseCodeSuccess, + message: "", + }) + writes += 1 + } + return expect +} + +func (c *blobsTestCase) runTestBlobSidecarsByRange(t *testing.T) { + if c.serverHandle == nil { + c.serverHandle = func(s *Service) rpcHandler { return s.blobSidecarsByRangeRPCHandler } + } + if c.defineExpected == nil { + c.defineExpected = c.filterExpectedByRange + } + if c.requestFromSidecars == nil { + c.requestFromSidecars = blobRangeRequestFromSidecars + } + if c.topic == "" { + c.topic = p2p.RPCBlobSidecarsByRangeTopicV1 + } + if c.oldestSlot == nil { + c.oldestSlot = c.defaultOldestSlotByRange + } + if c.streamReader == nil { + c.streamReader = defaultExpectedRequirer + } + c.run(t) +} + +func TestBlobByRangeOK(t *testing.T) { + origNC := params.BeaconNetworkConfig() + // restore network config after test completes + defer func() { + params.OverrideBeaconNetworkConfig(origNC) + }() + // set MaxRequestBlobSidecars to a low-ish value so the test doesn't timeout. + nc := params.BeaconNetworkConfig().Copy() + nc.MaxRequestBlobSidecars = 100 + params.OverrideBeaconNetworkConfig(nc) + + cases := []*blobsTestCase{ + { + name: "beginning of window + 10", + nblocks: 10, + }, + { + name: "10 slots before window, 10 slots after, count = 20", + nblocks: 10, + requestFromSidecars: func(scs []*ethpb.BlobSidecar) interface{} { + return ðpb.BlobSidecarsByRangeRequest{ + StartSlot: scs[0].Slot - 10, + Count: 20, + } + }, + }, + { + name: "request before window, empty response", + nblocks: 10, + requestFromSidecars: func(scs []*ethpb.BlobSidecar) interface{} { + return ðpb.BlobSidecarsByRangeRequest{ + StartSlot: scs[0].Slot - 10, + Count: 10, + } + }, + total: func() *int { x := 0; return &x }(), + }, + { + name: "10 blocks * 4 blobs = 40", + nblocks: 10, + requestFromSidecars: func(scs []*ethpb.BlobSidecar) interface{} { + return ðpb.BlobSidecarsByRangeRequest{ + StartSlot: scs[0].Slot - 10, + Count: 20, + } + }, + total: func() *int { x := fieldparams.MaxBlobsPerBlock * 10; return &x }(), // 10 blocks * 4 blobs = 40 + }, + { + name: "when request count > MAX_REQUEST_BLOCKS_DENEB, MAX_REQUEST_BLOBS_SIDECARS sidecars in response", + nblocks: int(params.BeaconNetworkConfig().MaxRequestBlocksDeneb) + 10, + requestFromSidecars: func(scs []*ethpb.BlobSidecar) interface{} { + return ðpb.BlobSidecarsByRangeRequest{ + StartSlot: scs[0].Slot, + Count: params.BeaconNetworkConfig().MaxRequestBlocksDeneb + 1, + } + }, + total: func() *int { x := int(params.BeaconNetworkConfig().MaxRequestBlobSidecars); return &x }(), + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + c.runTestBlobSidecarsByRange(t) + }) + } +} + +func TestBlobsByRangeValidation(t *testing.T) { + cfg := params.BeaconConfig() + repositionFutureEpochs(cfg) + undo, err := params.SetActiveWithUndo(cfg) + require.NoError(t, err) + defer func() { + require.NoError(t, undo()) + }() + denebSlot, err := slots.EpochStart(params.BeaconConfig().DenebForkEpoch) + require.NoError(t, err) + + minReqEpochs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + minReqSlots, err := slots.EpochStart(minReqEpochs) + require.NoError(t, err) + // spec criteria for mix,max bound checking + /* + Clients MUST keep a record of signed blobs sidecars seen on the epoch range + [max(current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch] + where current_epoch is defined by the current wall-clock time, + and clients MUST support serving requests of blobs on this range. + */ + defaultCurrent := denebSlot + 100 + minReqSlots + defaultMinStart, err := blobsByRangeMinStartSlot(defaultCurrent) + require.NoError(t, err) + cases := []struct { + name string + current types.Slot + req *ethpb.BlobSidecarsByRangeRequest + // chain := defaultMockChain(t) + + start types.Slot + end types.Slot + batch uint64 + err error + }{ + { + name: "start at current", + current: denebSlot + 100, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: denebSlot + 100, + Count: 10, + }, + start: denebSlot + 100, + end: denebSlot + 100, + batch: 10, + }, + { + name: "start after current", + current: denebSlot, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: denebSlot + 100, + Count: 10, + }, + start: denebSlot, + end: denebSlot, + batch: 0, + }, + { + name: "start before current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS", + current: defaultCurrent, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: defaultMinStart - 100, + Count: 10, + }, + start: defaultMinStart, + end: defaultMinStart, + batch: 10, + }, + { + name: "start before current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - end still valid", + current: defaultCurrent, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: defaultMinStart - 10, + Count: 20, + }, + start: defaultMinStart, + end: defaultMinStart + 9, + batch: blobBatchLimit(), + }, + { + name: "count > MAX_REQUEST_BLOB_SIDECARS", + current: defaultCurrent, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: defaultMinStart - 10, + Count: 1000, + }, + start: defaultMinStart, + end: defaultMinStart - 10 + 999, + // a large count is ok, we just limit the amount of actual responses + batch: blobBatchLimit(), + }, + { + name: "start + count > current", + current: defaultCurrent, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: defaultCurrent + 100, + Count: 100, + }, + start: defaultCurrent, + end: defaultCurrent, + batch: 0, + }, + { + name: "start before deneb", + current: defaultCurrent - minReqSlots + 100, + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: denebSlot - 10, + Count: 100, + }, + start: denebSlot, + end: denebSlot + 89, + batch: blobBatchLimit(), + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + rp, err := validateBlobsByRange(c.req, c.current) + if c.err != nil { + require.ErrorIs(t, err, c.err) + return + } else { + require.NoError(t, err) + } + require.Equal(t, c.start, rp.start) + require.Equal(t, c.end, rp.end) + require.Equal(t, c.batch, rp.size) + }) + } +} diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root.go b/beacon-chain/sync/rpc_blob_sidecars_by_root.go new file mode 100644 index 000000000000..aa5b2d603b17 --- /dev/null +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root.go @@ -0,0 +1,132 @@ +package sync + +import ( + "context" + "sort" + "time" + + libp2pcore "github.com/libp2p/go-libp2p/core" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/time/slots" + "go.opencensus.io/trace" +) + +func blobMinReqEpoch(finalized, current primitives.Epoch) primitives.Epoch { + // max(finalized_epoch, current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH) + denebFork := params.BeaconConfig().DenebForkEpoch + var reqWindow primitives.Epoch + if current > params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest { + reqWindow = current - params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + } + if finalized >= reqWindow && finalized > denebFork { + return finalized + } + if reqWindow >= finalized && reqWindow > denebFork { + return reqWindow + } + return denebFork +} + +// blobSidecarByRootRPCHandler handles the /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPC request. +// spec: https://github.com/ethereum/consensus-specs/blob/a7e45db9ac2b60a33e144444969ad3ac0aae3d4c/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1 +func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.blobSidecarByRootRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + SetRPCStreamDeadlines(stream) + log := log.WithField("handler", p2p.BlobSidecarsByRootName[1:]) // slice the leading slash off the name var + ref, ok := msg.(*types.BlobSidecarsByRootReq) + if !ok { + return errors.New("message is not type BlobSidecarsByRootReq") + } + + blobIdents := *ref + if err := validateBlobByRootRequest(blobIdents); err != nil { + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) + return err + } + // Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups. + sort.Sort(blobIdents) + + batchSize := flags.Get().BlobBatchLimit + var ticker *time.Ticker + if len(blobIdents) > batchSize { + ticker = time.NewTicker(time.Second) + } + minReqEpoch := blobMinReqEpoch(s.cfg.chain.FinalizedCheckpt().Epoch, slots.ToEpoch(s.cfg.clock.CurrentSlot())) + + buff := struct { + root [32]byte + scs []*eth.BlobSidecar + }{} + for i := range blobIdents { + if err := ctx.Err(); err != nil { + closeStream(stream, log) + return err + } + + // Throttle request processing to no more than batchSize/sec. + if i != 0 && i%batchSize == 0 && ticker != nil { + <-ticker.C + } + s.rateLimiter.add(stream, 1) + root, idx := bytesutil.ToBytes32(blobIdents[i].BlockRoot), blobIdents[i].Index + if root != buff.root { + scs, err := s.cfg.beaconDB.BlobSidecarsByRoot(ctx, root) + buff.root, buff.scs = root, scs + if err != nil { + if errors.Is(err, db.ErrNotFound) { + // In case db error path gave us a non-nil value, make sure that other indices for the problem root + // are not processed when we reenter the outer loop. + buff.scs = nil + log.WithError(err).Debugf("BlobSidecar not found in db, root=%x, index=%d", root, idx) + continue + } + log.WithError(err).Errorf("unexpected db error retrieving BlobSidecar, root=%x, index=%d", root, idx) + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + return err + } + } + + if idx >= uint64(len(buff.scs)) { + continue + } + sc := buff.scs[idx] + + // If any root in the request content references a block earlier than minimum_request_epoch, + // peers MAY respond with error code 3: ResourceUnavailable or not include the blob in the response. + if slots.ToEpoch(sc.Slot) < minReqEpoch { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrBlobLTMinRequest.Error(), stream) + log.WithError(types.ErrBlobLTMinRequest). + Debugf("requested blob for block %#x before minimum_request_epoch", blobIdents[i].BlockRoot) + return types.ErrBlobLTMinRequest + } + SetStreamWriteDeadline(stream, defaultWriteDuration) + if chunkErr := WriteBlobSidecarChunk(stream, s.cfg.chain, s.cfg.p2p.Encoding(), sc); chunkErr != nil { + log.WithError(chunkErr).Debug("Could not send a chunked response") + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, chunkErr) + return chunkErr + } + } + closeStream(stream, log) + return nil +} + +func validateBlobByRootRequest(blobIdents types.BlobSidecarsByRootReq) error { + if uint64(len(blobIdents)) > params.BeaconNetworkConfig().MaxRequestBlobSidecars { + return types.ErrMaxBlobReqExceeded + } + return nil +} diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go new file mode 100644 index 000000000000..93b9d5854e41 --- /dev/null +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go @@ -0,0 +1,306 @@ +package sync + +import ( + "fmt" + "sort" + "testing" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/config/params" + types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/time/slots" +) + +func (c *blobsTestCase) defaultOldestSlotByRoot(t *testing.T) types.Slot { + oldest, err := slots.EpochStart(blobMinReqEpoch(c.chain.FinalizedCheckPoint.Epoch, slots.ToEpoch(c.clock.CurrentSlot()))) + require.NoError(t, err) + return oldest +} + +func blobRootRequestFromSidecars(scs []*ethpb.BlobSidecar) interface{} { + req := make(p2pTypes.BlobSidecarsByRootReq, 0) + for _, sc := range scs { + req = append(req, ðpb.BlobIdentifier{BlockRoot: sc.BlockRoot, Index: sc.Index}) + } + return &req +} + +func (c *blobsTestCase) filterExpectedByRoot(t *testing.T, scs []*ethpb.BlobSidecar, r interface{}) []*expectedBlobChunk { + rp, ok := r.(*p2pTypes.BlobSidecarsByRootReq) + if !ok { + panic("unexpected request type in filterExpectedByRoot") + } + req := *rp + if uint64(len(req)) > params.BeaconNetworkConfig().MaxRequestBlobSidecars { + return []*expectedBlobChunk{{ + code: responseCodeInvalidRequest, + message: p2pTypes.ErrBlobLTMinRequest.Error(), + }} + } + sort.Sort(req) + var expect []*expectedBlobChunk + blockOffset := 0 + if len(scs) == 0 { + return expect + } + lastRoot := bytesutil.ToBytes32(scs[0].BlockRoot) + rootToOffset := make(map[[32]byte]int) + rootToOffset[lastRoot] = 0 + scMap := make(map[[32]byte]map[uint64]*ethpb.BlobSidecar) + for _, sc := range scs { + root := bytesutil.ToBytes32(sc.BlockRoot) + if root != lastRoot { + blockOffset += 1 + rootToOffset[root] = blockOffset + } + lastRoot = root + _, ok := scMap[root] + if !ok { + scMap[root] = make(map[uint64]*ethpb.BlobSidecar) + } + scMap[root][sc.Index] = sc + } + for _, scid := range req { + rootMap, ok := scMap[bytesutil.ToBytes32(scid.BlockRoot)] + if !ok { + panic(fmt.Sprintf("test setup failure, no fixture with root %#x", scid.BlockRoot)) + } + sc, idxOk := rootMap[scid.Index] + if !idxOk { + panic(fmt.Sprintf("test setup failure, no fixture at index %d with root %#x", scid.Index, scid.BlockRoot)) + } + // Skip sidecars that are supposed to be missing. + root := bytesutil.ToBytes32(sc.BlockRoot) + if c.missing[rootToOffset[root]] { + continue + } + // If a sidecar is expired, we'll expect an error for the *first* index, and after that + // we'll expect no further chunks in the stream, so filter out any further expected responses. + // We don't need to check what index this is because we work through them in order and the first one + // will set streamTerminated = true and skip everything else in the test case. + if c.expired[rootToOffset[root]] { + return append(expect, &expectedBlobChunk{ + sidecar: sc, + code: responseCodeResourceUnavailable, + message: p2pTypes.ErrBlobLTMinRequest.Error(), + }) + } + + expect = append(expect, &expectedBlobChunk{ + sidecar: sc, + code: responseCodeSuccess, + message: "", + }) + } + return expect +} + +func (c *blobsTestCase) runTestBlobSidecarsByRoot(t *testing.T) { + if c.serverHandle == nil { + c.serverHandle = func(s *Service) rpcHandler { return s.blobSidecarByRootRPCHandler } + } + if c.defineExpected == nil { + c.defineExpected = c.filterExpectedByRoot + } + if c.requestFromSidecars == nil { + c.requestFromSidecars = blobRootRequestFromSidecars + } + if c.topic == "" { + c.topic = p2p.RPCBlobSidecarsByRootTopicV1 + } + if c.oldestSlot == nil { + c.oldestSlot = c.defaultOldestSlotByRoot + } + if c.streamReader == nil { + c.streamReader = defaultExpectedRequirer + } + c.run(t) +} + +func TestReadChunkEncodedBlobs(t *testing.T) { + cases := []*blobsTestCase{ + { + name: "test successful read via requester", + nblocks: 1, + streamReader: readChunkEncodedBlobsAsStreamReader, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + c.runTestBlobSidecarsByRoot(t) + }) + } +} + +func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*expectedBlobChunk) func(network.Stream) { + encoding := s.cfg.p2p.Encoding() + ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot()) + require.NoError(t, err) + vf := func(sidecar *ethpb.BlobSidecar) error { + return nil + } + return func(stream network.Stream) { + scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf) + require.NoError(t, err) + require.Equal(t, len(expect), len(scs)) + for i, sc := range scs { + esc := expect[i].sidecar + require.Equal(t, esc.Slot, sc.Slot) + require.Equal(t, esc.Index, sc.Index) + require.Equal(t, bytesutil.ToBytes32(esc.BlockRoot), bytesutil.ToBytes32(sc.BlockRoot)) + } + } +} + +func TestBlobsByRootValidation(t *testing.T) { + cfg := params.BeaconConfig() + repositionFutureEpochs(cfg) + undo, err := params.SetActiveWithUndo(cfg) + require.NoError(t, err) + defer func() { + require.NoError(t, undo()) + }() + capellaSlot, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch) + require.NoError(t, err) + dmc, clock := defaultMockChain(t) + dmc.Slot = &capellaSlot + dmc.FinalizedCheckPoint = ðpb.Checkpoint{Epoch: params.BeaconConfig().CapellaForkEpoch} + cases := []*blobsTestCase{ + { + name: "block before minimum_request_epoch", + nblocks: 1, + expired: map[int]bool{0: true}, + chain: dmc, + clock: clock, + err: p2pTypes.ErrBlobLTMinRequest, + }, + { + name: "blocks before and after minimum_request_epoch", + nblocks: 2, + expired: map[int]bool{0: true}, + chain: dmc, + clock: clock, + err: p2pTypes.ErrBlobLTMinRequest, + }, + { + name: "one after minimum_request_epoch then one before", + nblocks: 2, + expired: map[int]bool{1: true}, + chain: dmc, + clock: clock, + err: p2pTypes.ErrBlobLTMinRequest, + }, + { + name: "block with all indices missing between 2 full blocks", + nblocks: 3, + missing: map[int]bool{1: true}, + total: func(i int) *int { return &i }(2 * fieldparams.MaxBlobsPerBlock), + }, + { + name: "exceeds req max", + nblocks: int(params.BeaconNetworkConfig().MaxRequestBlobSidecars) + 1, + err: p2pTypes.ErrMaxBlobReqExceeded, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + c.runTestBlobSidecarsByRoot(t) + }) + } +} + +func TestBlobsByRootOK(t *testing.T) { + cases := []*blobsTestCase{ + { + name: "0 blob", + nblocks: 0, + }, + { + name: "1 blob", + nblocks: 1, + }, + { + name: "2 blob", + nblocks: 2, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + c.runTestBlobSidecarsByRoot(t) + }) + } +} + +func TestBlobsByRootMinReqEpoch(t *testing.T) { + winMin := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest + cases := []struct { + name string + finalized types.Epoch + current types.Epoch + deneb types.Epoch + expected types.Epoch + }{ + { + name: "testnet genesis", + deneb: 100, + current: 0, + finalized: 0, + expected: 100, + }, + { + name: "underflow averted", + deneb: 100, + current: winMin - 1, + finalized: 0, + expected: 100, + }, + { + name: "underflow averted - finalized is higher", + deneb: 100, + current: winMin - 1, + finalized: winMin - 2, + expected: winMin - 2, + }, + { + name: "underflow averted - genesis at deneb", + deneb: 0, + current: winMin - 1, + finalized: 0, + expected: 0, + }, + { + name: "max is finalized", + deneb: 100, + current: 99 + winMin, + finalized: 101, + expected: 101, + }, + { + name: "reqWindow > finalized, reqWindow < deneb", + deneb: 100, + current: 99 + winMin, + finalized: 98, + expected: 100, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + cfg := params.BeaconConfig() + repositionFutureEpochs(cfg) + cfg.DenebForkEpoch = c.deneb + undo, err := params.SetActiveWithUndo(cfg) + require.NoError(t, err) + defer func() { + require.NoError(t, undo()) + }() + ep := blobMinReqEpoch(c.finalized, c.current) + require.Equal(t, c.expected, ep) + }) + } +} diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index ed18224a5916..14a0679e1cb1 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -12,7 +12,9 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/network/forks" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime/version" + "github.com/prysmaticlabs/prysm/v4/time/slots" ) // chunkBlockWriter writes the given message as a chunked response to the given network @@ -148,3 +150,22 @@ func extractBlockDataType(digest []byte, tor blockchain.TemporalOracle) (interfa } return nil, errors.Wrapf(ErrNoValidDigest, "could not extract block data type, saw digest=%#x, genesis=%v, vr=%#x", digest, tor.GenesisTime(), tor.GenesisValidatorsRoot()) } + +// WriteBlobSidecarChunk writes blob chunk object to stream. +// response_chunk ::= | | | +func WriteBlobSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, sidecar *ethpb.BlobSidecar) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + valRoot := tor.GenesisValidatorsRoot() + ctxBytes, err := forks.ForkDigestFromEpoch(slots.ToEpoch(sidecar.GetSlot()), valRoot[:]) + if err != nil { + return err + } + + if err := writeContextToStream(ctxBytes[:], stream); err != nil { + return err + } + _, err = encoding.EncodeWithMaxLength(stream, sidecar) + return err +} diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index df77bea9ffa9..b56c184bbc40 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -2,17 +2,22 @@ package sync import ( "context" + "fmt" "io" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/encoder" p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/time/slots" ) @@ -129,3 +134,114 @@ func SendBeaconBlocksByRootRequest( } return blocks, nil } + +func SendBlobsByRangeRequest(ctx context.Context, ci blockchain.ForkFetcher, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]*pb.BlobSidecar, error) { + topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(ci.CurrentSlot())) + if err != nil { + return nil, err + } + log.WithField("topic", topic).Debug("Sending blob by range request") + stream, err := p2pApi.Send(ctx, req, topic, pid) + if err != nil { + return nil, err + } + defer closeStream(stream, log) + + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req)) +} + +func SendBlobSidecarByRoot( + ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.P2P, pid peer.ID, + ctxMap ContextByteVersions, req *p2ptypes.BlobSidecarsByRootReq, +) ([]*pb.BlobSidecar, error) { + if uint64(len(*req)) > params.BeaconNetworkConfig().MaxRequestBlobSidecars { + return nil, errors.Wrapf(p2ptypes.ErrMaxBlobReqExceeded, "length=%d", len(*req)) + } + + topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRootName, slots.ToEpoch(tor.CurrentSlot())) + if err != nil { + return nil, err + } + log.WithField("topic", topic).Debug("Sending blob sidecar request") + stream, err := p2pApi.Send(ctx, req, topic, pid) + if err != nil { + return nil, err + } + defer closeStream(stream, log) + + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req)) +} + +var ErrBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs") +var ErrBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob") +var ErrUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested") +var ErrBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds") + +type blobResponseValidation func(*pb.BlobSidecar) error + +func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation { + roots := make(map[[32]byte]bool) + for _, sc := range *req { + roots[bytesutil.ToBytes32(sc.BlockRoot)] = true + } + return func(sc *pb.BlobSidecar) error { + if requested := roots[bytesutil.ToBytes32(sc.BlockRoot)]; !requested { + return errors.Wrapf(ErrUnrequestedRoot, "root=%#x", sc.BlockRoot) + } + return nil + } +} + +func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseValidation { + end := req.StartSlot + primitives.Slot(req.Count) + return func(sc *pb.BlobSidecar) error { + if sc.Slot < req.StartSlot || sc.Slot >= end { + return errors.Wrapf(ErrBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot) + } + return nil + } +} + +func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) ([]*pb.BlobSidecar, error) { + decode := encoding.DecodeWithMaxLength + max := int(params.BeaconNetworkConfig().MaxRequestBlobSidecars) + var ( + code uint8 + msg string + err error + ) + sidecars := make([]*pb.BlobSidecar, 0) + for i := 0; i < max; i++ { + code, msg, err = ReadStatusCode(stream, encoding) + if err != nil { + break + } + if code != 0 { + return nil, errors.Wrap(ErrBlobChunkedReadFailure, msg) + } + ctxb, err := readContextFromStream(stream) + if err != nil { + return nil, errors.Wrap(err, "error reading chunk context bytes from stream") + } + + v, found := ctxMap[bytesutil.ToBytes4(ctxb)] + if !found { + return nil, errors.Wrapf(ErrBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb)) + } + if v != version.Deneb { + return nil, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v)) + } + sc := &pb.BlobSidecar{} + if err := decode(stream, sc); err != nil { + return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream") + } + if err := vf(sc); err != nil { + return nil, errors.Wrap(err, "validation failure decoding blob RPC response") + } + sidecars = append(sidecars, sc) + } + if !errors.Is(err, io.EOF) { + return nil, err + } + return sidecars, nil +} diff --git a/beacon-chain/sync/rpc_send_request_test.go b/beacon-chain/sync/rpc_send_request_test.go index c08651453620..ca4aadb6f371 100644 --- a/beacon-chain/sync/rpc_send_request_test.go +++ b/beacon-chain/sync/rpc_send_request_test.go @@ -17,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" @@ -475,3 +476,115 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { assert.Equal(t, 3, len(blocks)) }) } + +func TestBlobValidatorFromRootReq(t *testing.T) { + validRoot := bytesutil.PadTo([]byte("valid"), 32) + invalidRoot := bytesutil.PadTo([]byte("invalid"), 32) + cases := []struct { + name string + ids []*ethpb.BlobIdentifier + response []*ethpb.BlobSidecar + err error + }{ + { + name: "valid", + ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}}, + response: []*ethpb.BlobSidecar{{BlockRoot: validRoot}}, + }, + { + name: "invalid", + ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}}, + response: []*ethpb.BlobSidecar{{BlockRoot: invalidRoot}}, + err: ErrUnrequestedRoot, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + r := p2pTypes.BlobSidecarsByRootReq(c.ids) + vf := blobValidatorFromRootReq(&r) + for _, sc := range c.response { + err := vf(sc) + if c.err != nil { + require.ErrorIs(t, err, c.err) + return + } + require.NoError(t, err) + } + }) + } +} + +func TestBlobValidatorFromRangeReq(t *testing.T) { + cases := []struct { + name string + req *ethpb.BlobSidecarsByRangeRequest + response []*ethpb.BlobSidecar + err error + }{ + { + name: "valid - count multi", + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: 10, + Count: 10, + }, + response: []*ethpb.BlobSidecar{{Slot: 14}}, + }, + { + name: "valid - count 1", + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: 10, + Count: 1, + }, + response: []*ethpb.BlobSidecar{{Slot: 10}}, + }, + { + name: "invalid - before", + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: 10, + Count: 1, + }, + response: []*ethpb.BlobSidecar{{Slot: 9}}, + err: ErrBlobResponseOutOfBounds, + }, + { + name: "invalid - after, count 1", + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: 10, + Count: 1, + }, + response: []*ethpb.BlobSidecar{{Slot: 11}}, + err: ErrBlobResponseOutOfBounds, + }, + { + name: "invalid - after, multi", + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: 10, + Count: 10, + }, + response: []*ethpb.BlobSidecar{{Slot: 23}}, + err: ErrBlobResponseOutOfBounds, + }, + { + name: "invalid - after, at boundary, multi", + req: ðpb.BlobSidecarsByRangeRequest{ + StartSlot: 10, + Count: 10, + }, + response: []*ethpb.BlobSidecar{{Slot: 20}}, + err: ErrBlobResponseOutOfBounds, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + vf := blobValidatorFromRangeReq(c.req) + for _, sc := range c.response { + err := vf(sc) + if c.err != nil { + require.ErrorIs(t, err, c.err) + return + } + require.NoError(t, err) + } + }) + } +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index f212e2f85d3f..06b88178ad18 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -34,6 +34,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" lruwrpr "github.com/prysmaticlabs/prysm/v4/cache/lru" "github.com/prysmaticlabs/prysm/v4/config/params" + leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime" prysmTime "github.com/prysmaticlabs/prysm/v4/time" @@ -284,6 +285,10 @@ func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, s writeErrorResponseToStream(responseCode, reason, stream, s.cfg.p2p) } +func (s *Service) setRateCollector(topic string, c *leakybucket.Collector) { + s.rateLimiter.limiterMap[topic] = c +} + // marks the chain as having started. func (s *Service) markForChainStart() { s.chainStarted.Set() diff --git a/beacon-chain/sync/sync_test.go b/beacon-chain/sync/sync_test.go index 982b7a7eb6f1..4306a7baca1d 100644 --- a/beacon-chain/sync/sync_test.go +++ b/beacon-chain/sync/sync_test.go @@ -16,6 +16,8 @@ func TestMain(m *testing.M) { flags.Init(&flags.GlobalFlags{ BlockBatchLimit: 64, BlockBatchLimitBurstFactor: 10, + BlobBatchLimit: 8, + BlobBatchLimitBurstFactor: 2, }) defer func() { flags.Init(resetFlags) diff --git a/cmd/beacon-chain/flags/base.go b/cmd/beacon-chain/flags/base.go index 43bff866aba7..51283ce8227d 100644 --- a/cmd/beacon-chain/flags/base.go +++ b/cmd/beacon-chain/flags/base.go @@ -165,6 +165,18 @@ var ( Usage: "The factor by which block batch limit may increase on burst.", Value: 2, } + // BlockBatchLimit specifies the requested block batch size. + BlobBatchLimit = &cli.IntFlag{ + Name: "blob-batch-limit", + Usage: "The amount of blobs the local peer is bounded to request and respond to in a batch.", + Value: 8, + } + // BlobBatchLimitBurstFactor specifies the factor by which blob batch size may increase. + BlobBatchLimitBurstFactor = &cli.IntFlag{ + Name: "blob-batch-limit-burst-factor", + Usage: "The factor by which blob batch limit may increase on burst.", + Value: 2, + } // EnableDebugRPCEndpoints as /v1/beacon/state. EnableDebugRPCEndpoints = &cli.BoolFlag{ Name: "enable-debug-rpc-endpoints", diff --git a/cmd/beacon-chain/flags/config.go b/cmd/beacon-chain/flags/config.go index d0e2fdef348d..6ebc80a9052b 100644 --- a/cmd/beacon-chain/flags/config.go +++ b/cmd/beacon-chain/flags/config.go @@ -13,6 +13,8 @@ type GlobalFlags struct { MinimumPeersPerSubnet int BlockBatchLimit int BlockBatchLimitBurstFactor int + BlobBatchLimit int + BlobBatchLimitBurstFactor int } var globalConfig *GlobalFlags @@ -40,6 +42,8 @@ func ConfigureGlobalFlags(ctx *cli.Context) { } cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name) cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name) + cfg.BlobBatchLimit = ctx.Int(BlobBatchLimit.Name) + cfg.BlobBatchLimitBurstFactor = ctx.Int(BlobBatchLimitBurstFactor.Name) cfg.MinimumPeersPerSubnet = ctx.Int(MinPeersPerSubnet.Name) configureMinimumPeers(ctx, cfg) diff --git a/config/fieldparams/mainnet.go b/config/fieldparams/mainnet.go index 03c43223aed7..5f17b92cbab6 100644 --- a/config/fieldparams/mainnet.go +++ b/config/fieldparams/mainnet.go @@ -29,4 +29,5 @@ const ( MaxBlobsPerBlock = 6 // MaxBlobsPerBlock defines the maximum number of blobs with respect to consensus rule can be included in a block. MaxBlobCommitmentsPerBlock = 4096 // MaxBlobCommitmentsPerBlock defines the theoretical limit of blobs can be included in a block. BlobLength = 131072 // BlobLength defines the byte length of a blob. + BlobSize = 131072 // defined to match blob.size in bazel ssz codegen ) diff --git a/config/fieldparams/minimal.go b/config/fieldparams/minimal.go index cf76dfd23d94..9efa6b458d81 100644 --- a/config/fieldparams/minimal.go +++ b/config/fieldparams/minimal.go @@ -29,4 +29,5 @@ const ( MaxBlobsPerBlock = 6 // MaxBlobsPerBlock defines the maximum number of blobs with respect to consensus rule can be included in a block. MaxBlobCommitmentsPerBlock = 16 // MaxBlobCommitmentsPerBlock defines the theoretical limit of blobs can be included in a block. BlobLength = 4 // BlobLength defines the byte length of a blob. + BlobSize = 128 // defined to match blob.size in bazel ssz codegen ) diff --git a/config/params/BUILD.bazel b/config/params/BUILD.bazel index 064c16c6a868..861d04ad9dd1 100644 --- a/config/params/BUILD.bazel +++ b/config/params/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", "//math:go_default_library", + "//runtime/version:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//params:go_default_library", "@com_github_mohae_deepcopy//:go_default_library", diff --git a/config/params/config.go b/config/params/config.go index e1107df74397..925d62ffdd2e 100644 --- a/config/params/config.go +++ b/config/params/config.go @@ -8,6 +8,7 @@ import ( fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/runtime/version" ) // BeaconChainConfig contains constant configs for node to participate in beacon chain. @@ -230,18 +231,31 @@ func configForkSchedule(b *BeaconChainConfig) map[[fieldparams.VersionLength]byt fvs[bytesutil.ToBytes4(b.AltairForkVersion)] = b.AltairForkEpoch fvs[bytesutil.ToBytes4(b.BellatrixForkVersion)] = b.BellatrixForkEpoch fvs[bytesutil.ToBytes4(b.CapellaForkVersion)] = b.CapellaForkEpoch + fvs[bytesutil.ToBytes4(b.DenebForkVersion)] = b.DenebForkEpoch return fvs } func configForkNames(b *BeaconChainConfig) map[[fieldparams.VersionLength]byte]string { + cfv := ConfigForkVersions(b) fvn := map[[fieldparams.VersionLength]byte]string{} - fvn[bytesutil.ToBytes4(b.GenesisForkVersion)] = "phase0" - fvn[bytesutil.ToBytes4(b.AltairForkVersion)] = "altair" - fvn[bytesutil.ToBytes4(b.BellatrixForkVersion)] = "bellatrix" - fvn[bytesutil.ToBytes4(b.CapellaForkVersion)] = "capella" + for k, v := range cfv { + fvn[k] = version.String(v) + } return fvn } +// ConfigForkVersions returns a mapping between a fork version param and the version identifier +// from the runtime/version package. +func ConfigForkVersions(b *BeaconChainConfig) map[[fieldparams.VersionLength]byte]int { + return map[[fieldparams.VersionLength]byte]int{ + bytesutil.ToBytes4(b.GenesisForkVersion): version.Phase0, + bytesutil.ToBytes4(b.AltairForkVersion): version.Altair, + bytesutil.ToBytes4(b.BellatrixForkVersion): version.Bellatrix, + bytesutil.ToBytes4(b.CapellaForkVersion): version.Capella, + bytesutil.ToBytes4(b.DenebForkVersion): version.Deneb, + } +} + // Eth1DataVotesLength returns the maximum length of the votes on the Eth1 data, // computed from the parameters in BeaconChainConfig. func (b *BeaconChainConfig) Eth1DataVotesLength() uint64 { diff --git a/config/params/configset.go b/config/params/configset.go index 85e68b30f4f2..7245f49a0314 100644 --- a/config/params/configset.go +++ b/config/params/configset.go @@ -1,8 +1,11 @@ package params import ( + "fmt" + "github.com/pkg/errors" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/runtime/version" ) var configs *configset @@ -69,7 +72,12 @@ func (r *configset) add(c *BeaconChainConfig) error { c.InitializeForkSchedule() for v := range c.ForkVersionSchedule { if n, exists := r.versionToName[v]; exists { - return errors.Wrapf(errCollisionFork, "config name=%s conflicts with existing config named=%s", name, n) + // determine the fork name for the colliding version + cfv := ConfigForkVersions(c) + versionId := cfv[v] + msg := fmt.Sprintf("version %#x for fork %s in config %s conflicts with existing config named=%s", + v, version.String(versionId), name, n) + return errors.Wrap(errCollisionFork, msg) } r.versionToName[v] = name } diff --git a/config/params/interop.go b/config/params/interop.go index 16114449b4bf..f1a9e42452f8 100644 --- a/config/params/interop.go +++ b/config/params/interop.go @@ -10,6 +10,7 @@ func InteropConfig() *BeaconChainConfig { c.AltairForkVersion = []byte{1, 0, 0, 235} c.BellatrixForkVersion = []byte{2, 0, 0, 235} c.CapellaForkVersion = []byte{3, 0, 0, 235} + c.DenebForkVersion = []byte{4, 0, 0, 235} c.InitializeForkSchedule() return c diff --git a/config/params/loader.go b/config/params/loader.go index eccd426130e2..61b438dcaaf5 100644 --- a/config/params/loader.go +++ b/config/params/loader.go @@ -207,6 +207,8 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte { fmt.Sprintf("TERMINAL_BLOCK_HASH: %#x", cfg.TerminalBlockHash), fmt.Sprintf("TERMINAL_BLOCK_HASH_ACTIVATION_EPOCH: %d", cfg.TerminalBlockHashActivationEpoch), fmt.Sprintf("DEPOSIT_CONTRACT_ADDRESS: %s", cfg.DepositContractAddress), + fmt.Sprintf("DENEB_FORK_EPOCH: %d", cfg.DenebForkEpoch), + fmt.Sprintf("DENEB_FORK_VERSION: %#x", cfg.DenebForkVersion), } yamlFile := []byte(strings.Join(lines, "\n")) diff --git a/config/params/mainnet_config.go b/config/params/mainnet_config.go index 9fe8eed1fc82..28fa634ab5e3 100644 --- a/config/params/mainnet_config.go +++ b/config/params/mainnet_config.go @@ -286,14 +286,17 @@ func FillTestVersions(c *BeaconChainConfig, b byte) { c.AltairForkVersion = make([]byte, fieldparams.VersionLength) c.BellatrixForkVersion = make([]byte, fieldparams.VersionLength) c.CapellaForkVersion = make([]byte, fieldparams.VersionLength) + c.DenebForkVersion = make([]byte, fieldparams.VersionLength) c.GenesisForkVersion[fieldparams.VersionLength-1] = b c.AltairForkVersion[fieldparams.VersionLength-1] = b c.BellatrixForkVersion[fieldparams.VersionLength-1] = b c.CapellaForkVersion[fieldparams.VersionLength-1] = b + c.DenebForkVersion[fieldparams.VersionLength-1] = b c.GenesisForkVersion[0] = 0 c.AltairForkVersion[0] = 1 c.BellatrixForkVersion[0] = 2 c.CapellaForkVersion[0] = 3 + c.DenebForkVersion[0] = 4 } diff --git a/config/params/testnet_e2e_config.go b/config/params/testnet_e2e_config.go index 584d999e1ada..e5535b46588f 100644 --- a/config/params/testnet_e2e_config.go +++ b/config/params/testnet_e2e_config.go @@ -48,6 +48,7 @@ func E2ETestConfig() *BeaconChainConfig { e2eConfig.AltairForkVersion = []byte{1, 0, 0, 253} e2eConfig.BellatrixForkVersion = []byte{2, 0, 0, 253} e2eConfig.CapellaForkVersion = []byte{3, 0, 0, 253} + e2eConfig.DenebForkVersion = []byte{4, 0, 0, 253} e2eConfig.InitializeForkSchedule() return e2eConfig @@ -88,6 +89,7 @@ func E2EMainnetTestConfig() *BeaconChainConfig { e2eConfig.AltairForkVersion = []byte{1, 0, 0, 254} e2eConfig.BellatrixForkVersion = []byte{2, 0, 0, 254} e2eConfig.CapellaForkVersion = []byte{3, 0, 0, 254} + e2eConfig.DenebForkVersion = []byte{4, 0, 0, 254} e2eConfig.InitializeForkSchedule() return e2eConfig