diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go index 18a483b1c7c..0b8cfef0303 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go @@ -141,8 +141,11 @@ func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*exp encoding := s.cfg.p2p.Encoding() ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot()) require.NoError(t, err) + vf := func(sidecar *ethpb.BlobSidecar) error { + return nil + } return func(stream network.Stream) { - scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap) + scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf) require.NoError(t, err) require.Equal(t, len(expect), len(scs)) for i, sc := range scs { diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index be6dd78f5e9..0739664786b 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -154,13 +154,29 @@ func SendBlobSidecarByRoot( } defer closeStream(stream, log) - return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap) + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req)) } var ErrBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs") var ErrBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob") +var ErrUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested") -func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions) ([]*pb.BlobSidecar, error) { +type blobResponseValidation func(*pb.BlobSidecar) error + +func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation { + roots := make(map[[32]byte]bool) + for _, sc := range *req { + roots[bytesutil.ToBytes32(sc.BlockRoot)] = true + } + return func(sc *pb.BlobSidecar) error { + if requested := roots[bytesutil.ToBytes32(sc.BlockRoot)]; !requested { + return errors.Wrapf(ErrUnrequestedRoot, "root=%#x", sc.BlockRoot) + } + return nil + } +} + +func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) ([]*pb.BlobSidecar, error) { decode := encoding.DecodeWithMaxLength max := int(params.BeaconNetworkConfig().MaxRequestBlobsSidecars) var ( @@ -193,6 +209,9 @@ func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncodi if err := decode(stream, sc); err != nil { return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream") } + if err := vf(sc); err != nil { + return nil, errors.Wrap(err, "validation failure decoding blob RPC response") + } sidecars = append(sidecars, sc) } if !errors.Is(err, io.EOF) { diff --git a/beacon-chain/sync/rpc_send_request_test.go b/beacon-chain/sync/rpc_send_request_test.go index c0865145362..59366436c0c 100644 --- a/beacon-chain/sync/rpc_send_request_test.go +++ b/beacon-chain/sync/rpc_send_request_test.go @@ -17,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" @@ -475,3 +476,40 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) { assert.Equal(t, 3, len(blocks)) }) } + +func TestBlobValidatorFromRootReq(t *testing.T) { + validRoot := bytesutil.PadTo([]byte("valid"), 32) + invalidRoot := bytesutil.PadTo([]byte("invalid"), 32) + cases := []struct { + name string + ids []*ethpb.BlobIdentifier + response []*ethpb.BlobSidecar + err error + }{ + { + name: "valid", + ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}}, + response: []*ethpb.BlobSidecar{{BlockRoot: validRoot}}, + }, + { + name: "invalid", + ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}}, + response: []*ethpb.BlobSidecar{{BlockRoot: invalidRoot}}, + err: ErrUnrequestedRoot, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + r := p2pTypes.BlobSidecarsByRootReq(c.ids) + vf := blobValidatorFromRootReq(&r) + for _, sc := range c.response { + err := vf(sc) + if c.err != nil { + require.ErrorIs(t, err, c.err) + return + } + require.NoError(t, err) + } + }) + } +}