Skip to content

Commit

Permalink
Revert "Sync optimistically candidate blocks (#10193)"
Browse files Browse the repository at this point in the history
This reverts commit f99a041.
  • Loading branch information
potuz committed Feb 7, 2022
1 parent f99a041 commit 08a5155
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 96 deletions.
110 changes: 35 additions & 75 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/config/features"
"github.com/prysmaticlabs/prysm/config/params"
Expand Down Expand Up @@ -115,7 +114,6 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
return err
}

fullyValidated := false
if postState.Version() == version.Bellatrix {
executionEnabled, err := blocks.ExecutionEnabled(postState, body)
if err != nil {
Expand All @@ -128,25 +126,10 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
}
// This is not the earliest we can call `ExecutePayload`, see above to do as the soonest we can call is after per_slot processing.
_, err = s.cfg.ExecutionEngineCaller.ExecutePayload(ctx, executionPayloadToExecutableData(payload))
switch err {
case powchain.ErrInvalidPayload:
// TODO_MERGE walk up the parent chain removing
// invalid blocks
return errors.Wrap(err, "could not sync block with invalid execution payload")
case powchain.ErrSyncing:
candidate, err := s.optimisticCandidateBlock(ctx, b)
if err != nil {
return errors.Wrap(err, "could not check if block is optimistic candidate")
}
if candidate {
break
}
return errors.Wrap(err, "could not optimistically sync block")
case nil:
fullyValidated = true
default:
if err != nil {
return errors.Wrap(err, "could not execute payload")
}

mergeBlock, err := blocks.MergeTransitionBlock(postState, body)
if err != nil {
return errors.Wrap(err, "could not check if merge block is terminal")
Expand All @@ -171,20 +154,6 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
return err
}

// update forkchoice synced tips if the block is not optimistic
if fullyValidated {
root, err := b.HashTreeRoot()
if err != nil {
return err
}
if err := s.cfg.ForkChoiceStore.UpdateSyncedTipsWithValidRoot(ctx, root); err != nil {
return err
}
if err := s.saveSyncedTipsDB(ctx); err != nil {
return err
}
}

// If slasher is configured, forward the attestations in the block via
// an event feed for processing.
if features.Get().EnableSlasher {
Expand Down Expand Up @@ -244,6 +213,9 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
if err := s.updateHead(ctx, balances); err != nil {
log.WithError(err).Warn("Could not update head")
}
if err := s.saveSyncedTipsDB(ctx); err != nil {
return err
}

// Notify execution layer with fork choice head update if this is post merge block.
if postState.Version() == version.Bellatrix {
Expand Down Expand Up @@ -287,6 +259,7 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
}()
}

return errors.Wrap(err, "could not save synced tips")
}

if err := s.pruneCanonicalAttsFromPool(ctx, blockRoot, signed); err != nil {
Expand Down Expand Up @@ -361,33 +334,32 @@ func (s *Service) onBlock(ctx context.Context, signed block.SignedBeaconBlock, b
}

func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlock,
blockRoots [][32]byte) ([]*ethpb.Checkpoint, []*ethpb.Checkpoint, []bool, error) {
blockRoots [][32]byte) ([]*ethpb.Checkpoint, []*ethpb.Checkpoint, error) {
ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch")
defer span.End()

if len(blks) == 0 || len(blockRoots) == 0 {
return nil, nil, nil, errors.New("no blocks provided")
return nil, nil, errors.New("no blocks provided")
}
if err := helpers.BeaconBlockIsNil(blks[0]); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
b := blks[0].Block()

// Retrieve incoming block's pre state.
if err := s.verifyBlkPreState(ctx, b); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
preState, err := s.cfg.StateGen.StateByRootInitialSync(ctx, bytesutil.ToBytes32(b.ParentRoot()))
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if preState == nil || preState.IsNil() {
return nil, nil, nil, fmt.Errorf("nil pre state for slot %d", b.Slot())
return nil, nil, fmt.Errorf("nil pre state for slot %d", b.Slot())
}

jCheckpoints := make([]*ethpb.Checkpoint, len(blks))
fCheckpoints := make([]*ethpb.Checkpoint, len(blks))
optimistic := make([]bool, len(blks))
sigSet := &bls.SignatureBatch{
Signatures: [][]byte{},
PublicKeys: []bls.PublicKey{},
Expand All @@ -398,65 +370,49 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlo
for i, b := range blks {
set, preState, err = transition.ExecuteStateTransitionNoVerifyAnySig(ctx, preState, b)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}

optimistic[i] = true
if preState.Version() == version.Bellatrix {
executionEnabled, err := blocks.ExecutionEnabled(preState, b.Block().Body())
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not check if execution is enabled")
return nil, nil, errors.Wrap(err, "could not check if execution is enabled")
}
if executionEnabled {
payload, err := b.Block().Body().ExecutionPayload()
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not get body execution payload")
return nil, nil, errors.Wrap(err, "could not get body execution payload")
}
_, err = s.cfg.ExecutionEngineCaller.ExecutePayload(ctx, executionPayloadToExecutableData(payload))
switch err {
case powchain.ErrInvalidPayload:
// TODO_MERGE walk up the parent chain removing
// invalid blocks
return nil, nil, nil, errors.Wrap(err, "could not sync block with invalid execution payload")
case powchain.ErrSyncing:
candidate, err := s.optimisticCandidateBlock(ctx, b.Block())
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not check if block is optimistic candidate")
}
if candidate {
break
}
return nil, nil, nil, errors.Wrap(err, "could not optimistically sync block")
case nil:
optimistic[i] = false
default:
return nil, nil, nil, errors.Wrap(err, "could not execute payload")
if err != nil {
return nil, nil, errors.Wrap(err, "could not execute payload")
}

mergeBlock, err := blocks.MergeTransitionBlock(preState, b.Block().Body())
if err != nil {
return nil, nil, nil, errors.Wrap(err, "could not check if merge block is terminal")
return nil, nil, errors.Wrap(err, "could not check if merge block is terminal")
}
if mergeBlock {
if err := s.validateTerminalBlock(b); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}
headPayload, err := s.headBlock().Block().Body().ExecutionPayload()
if err != nil {
return nil, nil, nil, err
return nil, nil, err

}
// TODO_MERGE: Loading the finalized block from DB on per block is not ideal. Finalized block should be cached here
finalizedBlock, err := s.cfg.BeaconDB.Block(ctx, bytesutil.ToBytes32(preState.FinalizedCheckpoint().Root))
if err != nil {
return nil, nil, nil, err
return nil, nil, err

}
finalizedBlockHash := params.BeaconConfig().ZeroHash[:]
if finalizedBlock != nil && finalizedBlock.Version() == version.Bellatrix {
finalizedPayload, err := finalizedBlock.Block().Body().ExecutionPayload()
if err != nil {
return nil, nil, nil, err
return nil, nil, err

}
finalizedBlockHash = finalizedPayload.BlockHash
Expand All @@ -468,7 +424,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlo
FinalizedBlockHash: common.BytesToHash(finalizedBlockHash),
}
if err := s.cfg.ExecutionEngineCaller.NotifyForkChoiceValidated(ctx, f); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}
}
Expand All @@ -477,7 +433,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlo
if slots.IsEpochStart(preState.Slot()) {
boundaries[blockRoots[i]] = preState.Copy()
if err := s.handleEpochBoundary(ctx, preState); err != nil {
return nil, nil, nil, errors.Wrap(err, "could not handle epoch boundary state")
return nil, nil, errors.Wrap(err, "could not handle epoch boundary state")
}
}
jCheckpoints[i] = preState.CurrentJustifiedCheckpoint()
Expand All @@ -487,26 +443,26 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []block.SignedBeaconBlo
}
verify, err := sigSet.Verify()
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if !verify {
return nil, nil, nil, errors.New("batch block signature verification failed")
return nil, nil, errors.New("batch block signature verification failed")
}
for r, st := range boundaries {
if err := s.cfg.StateGen.SaveState(ctx, r, st); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}
// Also saves the last post state which to be used as pre state for the next batch.
lastB := blks[len(blks)-1]
lastBR := blockRoots[len(blockRoots)-1]
if err := s.cfg.StateGen.SaveState(ctx, lastBR, preState); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if err := s.saveHeadNoDB(ctx, lastB, lastBR, preState); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return fCheckpoints, jCheckpoints, optimistic, nil
return fCheckpoints, jCheckpoints, nil
}

// handles a block after the block's batch has been verified, where we can save blocks
Expand All @@ -519,6 +475,10 @@ func (s *Service) handleBlockAfterBatchVerify(ctx context.Context, signed block.
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, fCheckpoint, jCheckpoint); err != nil {
return err
}
if err := s.saveSyncedTipsDB(ctx); err != nil {
return errors.Wrap(err, "could not save synced tips")
}

if err := s.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{
Slot: signed.Block().Slot(),
Root: blockRoot[:],
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestStore_OnBlockBatch(t *testing.T) {
rBlock.Block.ParentRoot = gRoot[:]
require.NoError(t, beaconDB.SaveBlock(context.Background(), blks[0]))
require.NoError(t, service.cfg.StateGen.SaveState(ctx, blkRoots[0], firstState))
_, _, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:])
_, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:])
require.NoError(t, err)
}

Expand Down
15 changes: 1 addition & 14 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []block.SignedBe
defer span.End()

// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
fCheckpoints, jCheckpoints, optimistic, err := s.onBlockBatch(ctx, blocks, blkRoots)
fCheckpoints, jCheckpoints, err := s.onBlockBatch(ctx, blocks, blkRoots)
if err != nil {
err := errors.Wrap(err, "could not process block in batch")
tracing.AnnotateError(span, err)
Expand All @@ -90,19 +90,6 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []block.SignedBe
tracing.AnnotateError(span, err)
return err
}
if !optimistic[i] {
root, err := b.Block().HashTreeRoot()
if err != nil {
return err
}
if err := s.cfg.ForkChoiceStore.UpdateSyncedTipsWithValidRoot(ctx, root); err != nil {
return err
}
if err := s.saveSyncedTipsDB(ctx); err != nil {
return errors.Wrap(err, "could not save synced tips")
}
}

// Send notification of the processed block to the state feed.
s.cfg.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/powchain/execution_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
)

var errNoExecutionEngineConnection = errors.New("can't connect to execution engine")
var ErrInvalidPayload = errors.New("invalid payload")
var ErrSyncing = errors.New("syncing")
var errInvalidPayload = errors.New("invalid payload")
var errSyncing = errors.New("syncing")

// ExecutionEngineCaller defines methods that wraps around execution engine API calls to enable other prysm services to interact with.
type ExecutionEngineCaller interface {
Expand Down Expand Up @@ -207,10 +207,10 @@ func (s *Service) ExecutePayload(ctx context.Context, data *catalyst.ExecutableD
}

if respond.Result.Status == catalyst.INVALID.Status {
return common.FromHex(respond.Result.LatestValidHash), ErrInvalidPayload
return common.FromHex(respond.Result.LatestValidHash), errInvalidPayload
}
if respond.Result.Status == catalyst.SYNCING.Status {
return common.FromHex(respond.Result.LatestValidHash), ErrSyncing
return common.FromHex(respond.Result.LatestValidHash), errSyncing
}

return common.FromHex(respond.Result.LatestValidHash), nil
Expand Down Expand Up @@ -257,7 +257,7 @@ func (s *Service) NotifyForkChoiceValidated(ctx context.Context, forkchoiceState
return fmt.Errorf("could not call engine_forkchoiceUpdatedV1, code: %d, message: %s", respond.Error.Code, respond.Error.Message)
}
if respond.Result.Status == catalyst.SYNCING.Status {
return ErrSyncing
return errSyncing
}

return nil
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s *Service) PreparePayload(ctx context.Context, forkchoiceState catalyst.F
return "", fmt.Errorf("could not call engine_forkchoiceUpdatedV1, code: %d, message: %s", respond.Error.Code, respond.Error.Message)
}
if respond.Result.Status == catalyst.SYNCING.Status {
return "", ErrSyncing
return "", errSyncing
}

return respond.Result.PayloadID, nil
Expand Down

0 comments on commit 08a5155

Please sign in to comment.