Skip to content

Commit

Permalink
Integrate proto array forkchoice to run time (#4649)
Browse files Browse the repository at this point in the history
* Run time

* Fixed pruning

* Fixed test

* Fixed test

* Process attestations during init sync

* Raul's feedback

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rauljordan committed Jan 25, 2020
1 parent 417480f commit 5eece9a
Show file tree
Hide file tree
Showing 22 changed files with 356 additions and 117 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/BUILD.bazel
Expand Up @@ -23,6 +23,8 @@ go_library(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/forkchoice/protoarray:go_default_library",
"//beacon-chain/operations/attestations:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
Expand Down
26 changes: 13 additions & 13 deletions beacon-chain/blockchain/forkchoice/process_attestation.go
Expand Up @@ -73,67 +73,67 @@ var ErrTargetRootNotInDB = errors.New("target root does not exist in db")
// for i in indexed_attestation.attesting_indices:
// if i not in store.latest_messages or target.epoch > store.latest_messages[i].epoch:
// store.latest_messages[i] = LatestMessage(epoch=target.epoch, root=attestation.data.beacon_block_root)
func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) ([]uint64, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.onAttestation")
defer span.End()

tgt := proto.Clone(a.Data.Target).(*ethpb.Checkpoint)
tgtSlot := helpers.StartSlot(tgt.Epoch)

if helpers.SlotToEpoch(a.Data.Slot) != a.Data.Target.Epoch {
return fmt.Errorf("data slot is not in the same epoch as target %d != %d", helpers.SlotToEpoch(a.Data.Slot), a.Data.Target.Epoch)
return nil, fmt.Errorf("data slot is not in the same epoch as target %d != %d", helpers.SlotToEpoch(a.Data.Slot), a.Data.Target.Epoch)
}

// Verify beacon node has seen the target block before.
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(tgt.Root)) {
return ErrTargetRootNotInDB
return nil, ErrTargetRootNotInDB
}

// Verify attestation target has had a valid pre state produced by the target block.
baseState, err := s.verifyAttPreState(ctx, tgt)
if err != nil {
return err
return nil, err
}

// Verify attestation target is from current epoch or previous epoch.
if err := s.verifyAttTargetEpoch(ctx, baseState.GenesisTime, uint64(time.Now().Unix()), tgt); err != nil {
return err
return nil, err
}

// Verify Attestations cannot be from future epochs.
if err := helpers.VerifySlotTime(baseState.GenesisTime, tgtSlot); err != nil {
return errors.Wrap(err, "could not verify attestation target slot")
return nil, errors.Wrap(err, "could not verify attestation target slot")
}

// Verify attestation beacon block is known and not from the future.
if err := s.verifyBeaconBlock(ctx, a.Data); err != nil {
return errors.Wrap(err, "could not verify attestation beacon block")
return nil, errors.Wrap(err, "could not verify attestation beacon block")
}

// Store target checkpoint state if not yet seen.
baseState, err = s.saveCheckpointState(ctx, baseState, tgt)
if err != nil {
return err
return nil, err
}

// Verify attestations can only affect the fork choice of subsequent slots.
if err := helpers.VerifySlotTime(baseState.GenesisTime, a.Data.Slot+1); err != nil {
return err
return nil, err
}

// Use the target state to to validate attestation and calculate the committees.
indexedAtt, err := s.verifyAttestation(ctx, baseState, a)
if err != nil {
return err
return nil, err
}

// Update every validator's latest vote.
if err := s.updateAttVotes(ctx, indexedAtt, tgt.Root, tgt.Epoch); err != nil {
return err
return nil, err
}

if err := s.db.SaveAttestation(ctx, a); err != nil {
return err
return nil, err
}

log := log.WithFields(logrus.Fields{
Expand All @@ -144,7 +144,7 @@ func (s *Store) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
})
log.Debug("Updated latest votes")

return nil
return indexedAtt.AttestingIndices, nil
}

// verifyAttPreState validates input attested check point has a valid pre-state.
Expand Down
Expand Up @@ -107,7 +107,7 @@ func TestStore_OnAttestation(t *testing.T) {
t.Fatal(err)
}

err := store.OnAttestation(ctx, tt.a)
_, err := store.OnAttestation(ctx, tt.a)
if tt.wantErr {
if !strings.Contains(err.Error(), tt.wantErrString) {
t.Errorf("Store.OnAttestation() error = %v, wantErr = %v", err, tt.wantErrString)
Expand Down
72 changes: 36 additions & 36 deletions beacon-chain/blockchain/forkchoice/process_block.go
Expand Up @@ -56,62 +56,62 @@ import (
// # Update finalized checkpoint
// if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch:
// store.finalized_checkpoint = state.finalized_checkpoint
func (s *Store) OnBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) error {
func (s *Store) OnBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock")
defer span.End()

if signed == nil || signed.Block == nil {
return errors.New("nil block")
return nil, errors.New("nil block")
}

b := signed.Block

// Retrieve incoming block's pre state.
preState, err := s.getBlockPreState(ctx, b)
if err != nil {
return err
return nil, err
}
preStateValidatorCount := len(preState.Validators)

root, err := ssz.HashTreeRoot(b)
if err != nil {
return errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
log.WithFields(logrus.Fields{
"slot": b.Slot,
"root": fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8]),
}).Info("Executing state transition on block")
postState, err := state.ExecuteStateTransition(ctx, preState, signed)
if err != nil {
return errors.Wrap(err, "could not execute state transition")
return nil, errors.Wrap(err, "could not execute state transition")
}

if err := s.db.SaveBlock(ctx, signed); err != nil {
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
if err := s.db.SaveState(ctx, postState, root); err != nil {
return errors.Wrap(err, "could not save state")
return nil, errors.Wrap(err, "could not save state")
}

// Update justified check point.
if postState.CurrentJustifiedCheckpoint.Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {
return err
return nil, err
}
}

// Update finalized check point.
// Prune the block cache and helper caches on every new finalized epoch.
if postState.FinalizedCheckpoint.Epoch > s.finalizedCheckpt.Epoch {
if err := s.db.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}

startSlot := helpers.StartSlot(s.prevFinalizedCheckpt.Epoch)
endSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch)
if endSlot > startSlot {
if err := s.rmStatesOlderThanLastFinalized(ctx, startSlot, endSlot); err != nil {
return errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d",
return nil, errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d",
startSlot, endSlot)
}
}
Expand All @@ -122,11 +122,11 @@ func (s *Store) OnBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) er

// Update validator indices in database as needed.
if err := s.saveNewValidators(ctx, preStateValidatorCount, postState); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}
// Save the unseen attestations from block to db.
if err := s.saveNewBlockAttestations(ctx, b.Body.Attestations); err != nil {
return errors.Wrap(err, "could not save attestations")
return nil, errors.Wrap(err, "could not save attestations")
}

// Epoch boundary bookkeeping such as logging epoch summaries.
Expand All @@ -136,48 +136,48 @@ func (s *Store) OnBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) er

// Update committees cache at epoch boundary slot.
if err := helpers.UpdateCommitteeCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return err
return nil, err
}
if err := helpers.UpdateProposerIndicesInCache(postState, helpers.CurrentEpoch(postState)); err != nil {
return err
return nil, err
}

s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
}

return nil
return postState, nil
}

// OnBlockCacheFilteredTree calls OnBlock with additional of caching of filtered block tree
// for efficient fork choice processing.
func (s *Store) OnBlockCacheFilteredTree(ctx context.Context, signed *ethpb.SignedBeaconBlock) error {
if err := s.OnBlock(ctx, signed); err != nil {
return err
func (s *Store) OnBlockCacheFilteredTree(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) {
state, err := s.OnBlock(ctx, signed)
if err != nil {
return nil, err
}

if featureconfig.Get().EnableBlockTreeCache {
if featureconfig.Get().EnableBlockTreeCache && !featureconfig.Get().ProtoArrayForkChoice {
tree, err := s.getFilterBlockTree(ctx)
if err != nil {
return errors.Wrap(err, "could not calculate filtered block tree")
return nil, errors.Wrap(err, "could not calculate filtered block tree")
}
s.filteredBlockTreeLock.Lock()
s.filteredBlockTree = tree
s.filteredBlockTreeLock.Unlock()
}

return nil
return state, nil
}

// OnBlockInitialSyncStateTransition is called when an initial sync block is received.
// It runs state transition on the block and without any BLS verification. The BLS verification
// includes proposer signature, randao and attestation's aggregated signature. It also does not save
// attestations.
func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock) error {
func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*pb.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "forkchoice.onBlock")
defer span.End()

if signed == nil || signed.Block == nil {
return errors.New("nil block")
return nil, errors.New("nil block")
}

b := signed.Block
Expand All @@ -188,37 +188,37 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *e
// Retrieve incoming block's pre state.
preState, err := s.cachedPreState(ctx, b)
if err != nil {
return err
return nil, err
}
preStateValidatorCount := len(preState.Validators)

log.WithField("slot", b.Slot).Debug("Executing state transition on block")

postState, err := state.ExecuteStateTransitionNoVerifyAttSigs(ctx, preState, signed)
if err != nil {
return errors.Wrap(err, "could not execute state transition")
return nil, errors.Wrap(err, "could not execute state transition")
}

if err := s.db.SaveBlock(ctx, signed); err != nil {
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
root, err := ssz.HashTreeRoot(b)
if err != nil {
return errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}

if featureconfig.Get().InitSyncCacheState {
s.initSyncState[root] = postState
} else {
if err := s.db.SaveState(ctx, postState, root); err != nil {
return errors.Wrap(err, "could not save state")
return nil, errors.Wrap(err, "could not save state")
}
}

// Update justified check point.
if postState.CurrentJustifiedCheckpoint.Epoch > s.justifiedCheckpt.Epoch {
if err := s.updateJustified(ctx, postState); err != nil {
return err
return nil, err
}
}

Expand All @@ -229,17 +229,17 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *e
endSlot := helpers.StartSlot(s.finalizedCheckpt.Epoch)
if endSlot > startSlot {
if err := s.rmStatesOlderThanLastFinalized(ctx, startSlot, endSlot); err != nil {
return errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d",
return nil, errors.Wrapf(err, "could not delete states prior to finalized check point, range: %d, %d",
startSlot, endSlot)
}
}

if err := s.saveInitState(ctx, postState); err != nil {
return errors.Wrap(err, "could not save init sync finalized state")
return nil, errors.Wrap(err, "could not save init sync finalized state")
}

if err := s.db.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}

s.prevFinalizedCheckpt = s.finalizedCheckpt
Expand All @@ -248,13 +248,13 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *e

// Update validator indices in database as needed.
if err := s.saveNewValidators(ctx, preStateValidatorCount, postState); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
return nil, errors.Wrap(err, "could not save finalized checkpoint")
}

if flags.Get().EnableArchive {
// Save the unseen attestations from block to db.
if err := s.saveNewBlockAttestations(ctx, b.Body.Attestations); err != nil {
return errors.Wrap(err, "could not save attestations")
return nil, errors.Wrap(err, "could not save attestations")
}
}

Expand All @@ -265,7 +265,7 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *e
s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
}

return nil
return postState, nil
}

// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/forkchoice/process_block_test.go
Expand Up @@ -103,7 +103,7 @@ func TestStore_OnBlock(t *testing.T) {
}
store.finalizedCheckpt.Root = roots[0]

err := store.OnBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk})
_, err := store.OnBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk})
if !strings.Contains(err.Error(), tt.wantErrString) {
t.Errorf("Store.OnBlock() error = %v, wantErr = %v", err, tt.wantErrString)
}
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/blockchain/forkchoice/service.go
Expand Up @@ -27,12 +27,13 @@ import (
// to beacon blocks to compute head.
type ForkChoicer interface {
Head(ctx context.Context) ([]byte, error)
OnBlock(ctx context.Context, b *ethpb.SignedBeaconBlock) error
OnBlockCacheFilteredTree(ctx context.Context, b *ethpb.SignedBeaconBlock) error
OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb.SignedBeaconBlock) error
OnAttestation(ctx context.Context, a *ethpb.Attestation) error
OnBlock(ctx context.Context, b *ethpb.SignedBeaconBlock) (*pb.BeaconState, error)
OnBlockCacheFilteredTree(ctx context.Context, b *ethpb.SignedBeaconBlock) (*pb.BeaconState, error)
OnBlockInitialSyncStateTransition(ctx context.Context, b *ethpb.SignedBeaconBlock) (*pb.BeaconState, error)
OnAttestation(ctx context.Context, a *ethpb.Attestation) ([]uint64, error)
GenesisStore(ctx context.Context, justifiedCheckpoint *ethpb.Checkpoint, finalizedCheckpoint *ethpb.Checkpoint) error
FinalizedCheckpt() *ethpb.Checkpoint
JustifiedCheckpt() *ethpb.Checkpoint
}

// Store represents a service struct that handles the forkchoice
Expand Down
11 changes: 8 additions & 3 deletions beacon-chain/blockchain/receive_attestation.go
Expand Up @@ -33,13 +33,18 @@ func (s *Service) ReceiveAttestationNoPubsub(ctx context.Context, att *ethpb.Att
defer span.End()

// Update forkchoice store for the new attestation
if err := s.forkChoiceStore.OnAttestation(ctx, att); err != nil {
indices, err := s.forkChoiceStoreOld.OnAttestation(ctx, att)
if err != nil {
return errors.Wrap(err, "could not process attestation from fork choice service")
}

if featureconfig.Get().ProtoArrayForkChoice {
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(att.Data.BeaconBlockRoot), att.Data.Target.Epoch)
}

// Run fork choice for head block after updating fork choice store.
if !featureconfig.Get().DisableForkChoice {
headRoot, err := s.forkChoiceStore.Head(ctx)
if !featureconfig.Get().DisableForkChoice && !featureconfig.Get().ProtoArrayForkChoice {
headRoot, err := s.forkChoiceStoreOld.Head(ctx)
if err != nil {
return errors.Wrap(err, "could not get head from fork choice service")
}
Expand Down

0 comments on commit 5eece9a

Please sign in to comment.