Skip to content

Commit

Permalink
feat(deneb): proposer rpc to handle builder flow (#12554)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored and james-prysm committed Aug 4, 2023
1 parent 147a0d7 commit cf71546
Show file tree
Hide file tree
Showing 11 changed files with 537 additions and 98 deletions.
1 change: 1 addition & 0 deletions beacon-chain/builder/testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//consensus-types/primitives:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
Expand Down
23 changes: 13 additions & 10 deletions beacon-chain/builder/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
v1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
)

Expand Down Expand Up @@ -44,27 +45,29 @@ func (s *MockBuilderService) Configured() bool {
}

// SubmitBlindedBlock for mocking.
func (s *MockBuilderService) SubmitBlindedBlock(_ context.Context, _ interfaces.ReadOnlySignedBeaconBlock, _ []*ethpb.SignedBlindedBlobSidecar) (interfaces.ExecutionData, *v1.BlobsBundle, error) {
if s.Payload != nil {
func (s *MockBuilderService) SubmitBlindedBlock(_ context.Context, b interfaces.ReadOnlySignedBeaconBlock, _ []*ethpb.SignedBlindedBlobSidecar) (interfaces.ExecutionData, *v1.BlobsBundle, error) {
switch b.Version() {
case version.Bellatrix:
w, err := blocks.WrappedExecutionPayload(s.Payload)
if err != nil {
return nil, nil, errors.Wrap(err, "could not wrap payload")
}
return w, nil, s.ErrSubmitBlindedBlock
}
if s.PayloadCapella != nil {
case version.Capella:
w, err := blocks.WrappedExecutionPayloadCapella(s.PayloadCapella, 0)
if err != nil {
return nil, nil, errors.Wrap(err, "could not wrap capella payload")
}
return w, nil, s.ErrSubmitBlindedBlock
case version.Deneb:
w, err := blocks.WrappedExecutionPayloadDeneb(s.PayloadDeneb, 0)
if err != nil {
return nil, nil, errors.Wrap(err, "could not wrap deneb payload")
}
return w, s.BlobBundle, s.ErrSubmitBlindedBlock
default:
return nil, nil, errors.New("unknown block version for mocking")
}

w, err := blocks.WrappedExecutionPayloadDeneb(s.PayloadDeneb, 0)
if err != nil {
return nil, nil, errors.Wrap(err, "could not wrap deneb payload")
}
return w, s.BlobBundle, s.ErrSubmitBlindedBlock
}

// GetHeader for mocking.
Expand Down
67 changes: 50 additions & 17 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, status.Errorf(codes.Internal, "Could not get local payload: %v", err)
}

builderPayload, err := vs.getBuilderPayload(ctx, sBlk.Block().Slot(), sBlk.Block().ProposerIndex())
builderPayload, blindBlobsBundle, err := vs.getBuilderPayloadAndBlobs(ctx, sBlk.Block().Slot(), sBlk.Block().ProposerIndex())
if err != nil {
builderGetPayloadMissCount.Inc()
log.WithError(err).Error("Could not get builder payload")
Expand All @@ -158,7 +158,7 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, status.Errorf(codes.Internal, "Could not set execution data: %v", err)
}

if err := setKzgCommitments(sBlk, blobsBundle); err != nil {
if err := setKzgCommitments(sBlk, blobsBundle, blindBlobsBundle); err != nil {
return nil, status.Errorf(codes.Internal, "Could not set kzg commitment: %v", err)
}

Expand All @@ -181,7 +181,18 @@ func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (
return nil, status.Errorf(codes.Internal, "Could not convert block to proto: %v", err)
}
if slots.ToEpoch(req.Slot) >= params.BeaconConfig().DenebForkEpoch {
// TODO: Handle blind case
if sBlk.IsBlinded() {
scs, err := blindBlobsBundleToSidecars(blindBlobsBundle, sBlk.Block())
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not convert blind blobs bundle to sidecar: %v", err)
}
blockAndBlobs := &ethpb.BlindedBeaconBlockAndBlobsDeneb{
Block: pb.(*ethpb.BlindedBeaconBlockDeneb),
Blobs: scs,
}
return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_BlindedDeneb{BlindedDeneb: blockAndBlobs}}, nil
}

scs, err := blobsBundleToSidecars(blobsBundle, sBlk.Block())
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not convert blobs bundle to sidecar: %v", err)
Expand Down Expand Up @@ -222,11 +233,18 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", CouldNotDecodeBlock, err)
}

unblinder, err := newUnblinder(blk, vs.BlockBuilder)
var blindSidecars []*ethpb.SignedBlindedBlobSidecar
if blk.Version() >= version.Deneb && blk.IsBlinded() {
blindSidecars = req.GetBlindedDeneb().Blobs
}

unblinder, err := newUnblinder(blk, blindSidecars, vs.BlockBuilder)
if err != nil {
return nil, errors.Wrap(err, "could not create unblinder")
}
blk, err = unblinder.unblindBuilderBlock(ctx)
blinded := unblinder.b.IsBlinded() //

blk, unblindedSidecars, err := unblinder.unblindBuilderBlock(ctx)
if err != nil {
return nil, errors.Wrap(err, "could not unblind builder block")
}
Expand All @@ -240,23 +258,25 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return nil, fmt.Errorf("could not broadcast block: %v", err)
}

var scs []*ethpb.SignedBlobSidecar
if blk.Version() >= version.Deneb {
b, ok := req.GetBlock().(*ethpb.GenericSignedBeaconBlock_Deneb)
if !ok {
return nil, status.Error(codes.Internal, "Could not cast block to Deneb")
}
if len(b.Deneb.Blobs) > fieldparams.MaxBlobsPerBlock {
return nil, status.Errorf(codes.InvalidArgument, "Too many blobs in block: %d", len(b.Deneb.Blobs))
if blinded {
scs = unblindedSidecars // Use sidecars from unblinder if the block was blinded.
} else {
scs, err = extraSidecars(req) // Use sidecars from the request if the block was not blinded.
if err != nil {
return nil, errors.Wrap(err, "could not extract blobs")
}
}
scs := make([]*ethpb.BlobSidecar, len(b.Deneb.Blobs))
for i, blob := range b.Deneb.Blobs {
if err := vs.P2P.BroadcastBlob(ctx, blob.Message.Index, blob); err != nil {
log.WithError(err).Errorf("Could not broadcast blob index %d / %d", i, len(b.Deneb.Blobs))
sidecars := make([]*ethpb.BlobSidecar, len(scs))
for i, sc := range scs {
if err := vs.P2P.BroadcastBlob(ctx, sc.Message.Index, sc); err != nil {
log.WithError(err).Errorf("Could not broadcast blob sidecar index %d / %d", i, len(scs))
}
scs[i] = blob.Message
sidecars[i] = sc.Message
}
if len(scs) > 0 {
if err := vs.BeaconDB.SaveBlobSidecar(ctx, scs); err != nil {
if err := vs.BeaconDB.SaveBlobSidecar(ctx, sidecars); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -286,6 +306,19 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
}, nil
}

// extraSidecars extracts the sidecars from the request.
// return error if there are too many sidecars.
func extraSidecars(req *ethpb.GenericSignedBeaconBlock) ([]*ethpb.SignedBlobSidecar, error) {
b, ok := req.GetBlock().(*ethpb.GenericSignedBeaconBlock_Deneb)
if !ok {
return nil, errors.New("Could not cast block to Deneb")
}
if len(b.Deneb.Blobs) > fieldparams.MaxBlobsPerBlock {
return nil, fmt.Errorf("too many blobs in block: %d", len(b.Deneb.Blobs))
}
return b.Deneb.Blobs, nil
}

// PrepareBeaconProposer caches and updates the fee recipient for the given proposer.
func (vs *Server) PrepareBeaconProposer(
ctx context.Context, request *ethpb.PrepareBeaconProposerRequest,
Expand Down
52 changes: 31 additions & 21 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/encoding/ssz"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v4/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -123,89 +124,98 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc

// This function retrieves the payload header given the slot number and the validator index.
// It's a no-op if the latest head block is not versioned bellatrix.
func (vs *Server) getPayloadHeaderFromBuilder(ctx context.Context, slot primitives.Slot, idx primitives.ValidatorIndex) (interfaces.ExecutionData, error) {
func (vs *Server) getPayloadHeaderFromBuilder(ctx context.Context, slot primitives.Slot, idx primitives.ValidatorIndex) (interfaces.ExecutionData, *enginev1.BlindedBlobsBundle, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.getPayloadHeaderFromBuilder")
defer span.End()

if slots.ToEpoch(slot) < params.BeaconConfig().BellatrixForkEpoch {
return nil, errors.New("can't get payload header from builder before bellatrix epoch")
return nil, nil, errors.New("can't get payload header from builder before bellatrix epoch")
}

b, err := vs.HeadFetcher.HeadBlock(ctx)
if err != nil {
return nil, err
return nil, nil, err
}

h, err := b.Block().Body().Execution()
if err != nil {
return nil, errors.Wrap(err, "failed to get execution header")
return nil, nil, errors.Wrap(err, "failed to get execution header")
}
pk, err := vs.HeadFetcher.HeadValidatorIndexToPublicKey(ctx, idx)
if err != nil {
return nil, err
return nil, nil, err
}

ctx, cancel := context.WithTimeout(ctx, blockBuilderTimeout)
defer cancel()

signedBid, err := vs.BlockBuilder.GetHeader(ctx, slot, bytesutil.ToBytes32(h.BlockHash()), pk)
if err != nil {
return nil, err
return nil, nil, err
}
if signedBid.IsNil() {
return nil, errors.New("builder returned nil bid")
return nil, nil, errors.New("builder returned nil bid")
}
fork, err := forks.Fork(slots.ToEpoch(slot))
if err != nil {
return nil, errors.Wrap(err, "unable to get fork information")
return nil, nil, errors.Wrap(err, "unable to get fork information")
}
forkName, ok := params.BeaconConfig().ForkVersionNames[bytesutil.ToBytes4(fork.CurrentVersion)]
if !ok {
return nil, errors.New("unable to find current fork in schedule")
return nil, nil, errors.New("unable to find current fork in schedule")
}
if !strings.EqualFold(version.String(signedBid.Version()), forkName) {
return nil, fmt.Errorf("builder bid response version: %d is different from head block version: %d for epoch %d", signedBid.Version(), b.Version(), slots.ToEpoch(slot))
return nil, nil, fmt.Errorf("builder bid response version: %d is different from head block version: %d for epoch %d", signedBid.Version(), b.Version(), slots.ToEpoch(slot))
}

bid, err := signedBid.Message()
if err != nil {
return nil, errors.Wrap(err, "could not get bid")
return nil, nil, errors.Wrap(err, "could not get bid")
}
if bid.IsNil() {
return nil, errors.New("builder returned nil bid")
return nil, nil, errors.New("builder returned nil bid")
}

v := bytesutil.LittleEndianBytesToBigInt(bid.Value())
if v.String() == "0" {
return nil, errors.New("builder returned header with 0 bid amount")
return nil, nil, errors.New("builder returned header with 0 bid amount")
}

header, err := bid.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get bid header")
return nil, nil, errors.Wrap(err, "could not get bid header")
}
txRoot, err := header.TransactionsRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get transaction root")
return nil, nil, errors.Wrap(err, "could not get transaction root")
}
if bytesutil.ToBytes32(txRoot) == emptyTransactionsRoot {
return nil, errors.New("builder returned header with an empty tx root")
return nil, nil, errors.New("builder returned header with an empty tx root")
}

if !bytes.Equal(header.ParentHash(), h.BlockHash()) {
return nil, fmt.Errorf("incorrect parent hash %#x != %#x", header.ParentHash(), h.BlockHash())
return nil, nil, fmt.Errorf("incorrect parent hash %#x != %#x", header.ParentHash(), h.BlockHash())
}

t, err := slots.ToTime(uint64(vs.TimeFetcher.GenesisTime().Unix()), slot)
if err != nil {
return nil, err
return nil, nil, err
}
if header.Timestamp() != uint64(t.Unix()) {
return nil, fmt.Errorf("incorrect timestamp %d != %d", header.Timestamp(), uint64(t.Unix()))
return nil, nil, fmt.Errorf("incorrect timestamp %d != %d", header.Timestamp(), uint64(t.Unix()))
}

if err := validateBuilderSignature(signedBid); err != nil {
return nil, errors.Wrap(err, "could not validate builder signature")
return nil, nil, errors.Wrap(err, "could not validate builder signature")
}

var bundle *enginev1.BlindedBlobsBundle
if bid.Version() >= version.Deneb {
bundle, err = bid.BlindedBlobsBundle()
if err != nil {
return nil, nil, errors.Wrap(err, "could not get blinded blobs bundle")
}
log.WithField("blindBlobCount", len(bundle.BlobRoots))
}

log.WithFields(logrus.Fields{
Expand All @@ -223,7 +233,7 @@ func (vs *Server) getPayloadHeaderFromBuilder(ctx context.Context, slot primitiv
trace.StringAttribute("blockHash", fmt.Sprintf("%#x", header.BlockHash())),
)

return header, nil
return header, bundle, nil
}

// Validates builder signature and returns an error if the signature is invalid.
Expand Down
Loading

0 comments on commit cf71546

Please sign in to comment.