Skip to content

Commit

Permalink
fixup! BlobSidecarsByRoot RPC handler
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Jun 12, 2023
1 parent 05b1f24 commit 424730f
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 3 deletions.
5 changes: 4 additions & 1 deletion beacon-chain/sync/rpc_blob_sidecars_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*exp
encoding := s.cfg.p2p.Encoding()
ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot())
require.NoError(t, err)
vf := func(sidecar *ethpb.BlobSidecar) error {
return nil
}
return func(stream network.Stream) {
scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap)
scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf)
require.NoError(t, err)
require.Equal(t, len(expect), len(scs))
for i, sc := range scs {
Expand Down
23 changes: 21 additions & 2 deletions beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,29 @@ func SendBlobSidecarByRoot(
}
defer closeStream(stream, log)

return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap)
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req))
}

var ErrBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs")
var ErrBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
var ErrUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested")

func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions) ([]*pb.BlobSidecar, error) {
type blobResponseValidation func(*pb.BlobSidecar) error

func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation {
roots := make(map[[32]byte]bool)
for _, sc := range *req {
roots[bytesutil.ToBytes32(sc.BlockRoot)] = true
}
return func(sc *pb.BlobSidecar) error {
if requested := roots[bytesutil.ToBytes32(sc.BlockRoot)]; !requested {
return errors.Wrapf(ErrUnrequestedRoot, "root=%#x", sc.BlockRoot)
}
return nil
}
}

func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) ([]*pb.BlobSidecar, error) {
decode := encoding.DecodeWithMaxLength
max := int(params.BeaconNetworkConfig().MaxRequestBlobsSidecars)
var (
Expand Down Expand Up @@ -193,6 +209,9 @@ func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncodi
if err := decode(stream, sc); err != nil {
return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream")
}
if err := vf(sc); err != nil {
return nil, errors.Wrap(err, "validation failure decoding blob RPC response")
}
sidecars = append(sidecars, sc)
}
if !errors.Is(err, io.EOF) {
Expand Down
38 changes: 38 additions & 0 deletions beacon-chain/sync/rpc_send_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
Expand Down Expand Up @@ -475,3 +476,40 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
assert.Equal(t, 3, len(blocks))
})
}

func TestBlobValidatorFromRootReq(t *testing.T) {
validRoot := bytesutil.PadTo([]byte("valid"), 32)
invalidRoot := bytesutil.PadTo([]byte("invalid"), 32)
cases := []struct {
name string
ids []*ethpb.BlobIdentifier
response []*ethpb.BlobSidecar
err error
}{
{
name: "valid",
ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}},
response: []*ethpb.BlobSidecar{{BlockRoot: validRoot}},
},
{
name: "invalid",
ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}},
response: []*ethpb.BlobSidecar{{BlockRoot: invalidRoot}},
err: ErrUnrequestedRoot,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
r := p2pTypes.BlobSidecarsByRootReq(c.ids)
vf := blobValidatorFromRootReq(&r)
for _, sc := range c.response {
err := vf(sc)
if c.err != nil {
require.ErrorIs(t, err, c.err)
return
}
require.NoError(t, err)
}
})
}
}

0 comments on commit 424730f

Please sign in to comment.