Skip to content

Commit

Permalink
Insert block to fork choice after saving the block to DB (#4728)
Browse files Browse the repository at this point in the history
* Let's try this
* Merge branch 'master' of git+ssh://github.com/prysmaticlabs/prysm
* Better place to insert block to fork choice store
* Fmt
* Revert a few changes
* Revert a few changes
* Comments
* Merge branch 'master' of git+ssh://github.com/prysmaticlabs/prysm into insert-blk-forkchoice
* Merge refs/heads/master into insert-blk-forkchoice
  • Loading branch information
terencechain committed Feb 3, 2020
1 parent a1dc4dd commit cdfa969
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 62 deletions.
1 change: 0 additions & 1 deletion beacon-chain/blockchain/forkchoice/process_attestation.go
Expand Up @@ -166,7 +166,6 @@ func (s *Store) getAttPreState(ctx context.Context, c *ethpb.Checkpoint) (*state
return nil, fmt.Errorf("pre state of target block %d does not exist", helpers.StartSlot(c.Epoch))
}


if helpers.StartSlot(c.Epoch) > baseState.Slot() {
baseState, err = state.ProcessSlots(ctx, baseState, helpers.StartSlot(c.Epoch))
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/blockchain/process_attestation_helpers.go
Expand Up @@ -36,7 +36,6 @@ func (s *Service) getAttPreState(ctx context.Context, c *ethpb.Checkpoint) (*sta
return nil, fmt.Errorf("pre state of target block %d does not exist", helpers.StartSlot(c.Epoch))
}


if helpers.StartSlot(c.Epoch) > baseState.Slot() {
baseState, err = state.ProcessSlots(ctx, baseState, helpers.StartSlot(c.Epoch))
if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions beacon-chain/blockchain/process_block.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -82,6 +83,11 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}

if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil {
return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}

if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
return nil, errors.Wrap(err, "could not save state")
}
Expand Down Expand Up @@ -176,6 +182,10 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}

if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil {
return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}

if featureconfig.Get().InitSyncCacheState {
s.initSyncState[root] = postState
} else {
Expand Down Expand Up @@ -235,3 +245,34 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed

return postState, nil
}

// This feeds in the block and block's attestations to fork choice store. It's allows fork choice store
// to gain information on the most current chain.
func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte, state *stateTrie.BeaconState) error {
if !featureconfig.Get().ProtoArrayForkChoice {
return nil
}

// Feed in block to fork choice store.
if err := s.forkChoiceStore.ProcessBlock(ctx,
blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot),
state.CurrentJustifiedCheckpoint().Epoch,
state.FinalizedCheckpoint().Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}

// Feed in block's attestations to fork choice store.
for _, a := range blk.Body.Attestations {
committee, err := helpers.BeaconCommitteeFromState(state, a.Data.Slot, a.Data.CommitteeIndex)
if err != nil {
return err
}
indices, err := attestationutil.AttestingIndices(a.AggregationBits, committee)
if err != nil {
return err
}
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}

return nil
}
63 changes: 4 additions & 59 deletions beacon-chain/blockchain/receive_block.go
Expand Up @@ -14,7 +14,6 @@ import (
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
Expand Down Expand Up @@ -132,22 +131,6 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
} else {
headRoot := make([]byte, 0)
if featureconfig.Get().ProtoArrayForkChoice {
if err := s.forkChoiceStore.ProcessBlock(ctx, blockCopy.Block.Slot, root, bytesutil.ToBytes32(blockCopy.Block.ParentRoot), postState.CurrentJustifiedCheckpoint().Epoch, postState.FinalizedCheckpoint().Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}

for _, a := range blockCopy.Block.Body.Attestations {
committee, err := helpers.BeaconCommitteeFromState(postState, a.Data.Slot, a.Data.CommitteeIndex)
if err != nil {
return err
}
indices, err := attestationutil.AttestingIndices(a.AggregationBits, committee)
if err != nil {
return err
}
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}

f := s.finalizedCheckpt
j := s.justifiedCheckpt
headRootProtoArray, err := s.forkChoiceStore.Head(
Expand Down Expand Up @@ -201,17 +184,16 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock)

// Apply state transition on the new block.
var postState *stateTrie.BeaconState
var err error
if featureconfig.Get().ProtoArrayForkChoice {
postState, err = s.onBlock(ctx, blockCopy)
_, err = s.onBlock(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
} else {
postState, err = s.forkChoiceStoreOld.OnBlock(ctx, blockCopy)
_, err = s.forkChoiceStoreOld.OnBlock(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block from fork choice service")
traceutil.AnnotateError(span, err)
Expand All @@ -233,24 +215,6 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
}
}

if featureconfig.Get().ProtoArrayForkChoice {
if err := s.forkChoiceStore.ProcessBlock(ctx, blockCopy.Block.Slot, root, bytesutil.ToBytes32(blockCopy.Block.ParentRoot), postState.CurrentJustifiedCheckpoint().Epoch, postState.FinalizedCheckpoint().Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}

for _, a := range blockCopy.Block.Body.Attestations {
committee, err := helpers.BeaconCommitteeFromState(postState, a.Data.Slot, a.Data.CommitteeIndex)
if err != nil {
return err
}
indices, err := attestationutil.AttestingIndices(a.AggregationBits, committee)
if err != nil {
return err
}
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}
}

// Send notification of the processed block to the state feed.
s.stateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Expand Down Expand Up @@ -283,17 +247,16 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
blockCopy := proto.Clone(block).(*ethpb.SignedBeaconBlock)

// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
var postState *stateTrie.BeaconState
var err error
if featureconfig.Get().ProtoArrayForkChoice {
postState, err = s.onBlockInitialSyncStateTransition(ctx, blockCopy)
_, err = s.onBlockInitialSyncStateTransition(ctx, blockCopy)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}
} else {
postState, err = s.forkChoiceStoreOld.OnBlockInitialSyncStateTransition(ctx, blockCopy)
_, err = s.forkChoiceStoreOld.OnBlockInitialSyncStateTransition(ctx, blockCopy)
if err != nil {
return errors.Wrap(err, "could not process blockCopy from fork choice service")
}
Expand All @@ -309,24 +272,6 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
return errors.Wrap(err, "could not get head root from cache")
}

if featureconfig.Get().ProtoArrayForkChoice {
if err := s.forkChoiceStore.ProcessBlock(ctx, blockCopy.Block.Slot, root, bytesutil.ToBytes32(blockCopy.Block.ParentRoot), postState.CurrentJustifiedCheckpoint().Epoch, postState.FinalizedCheckpoint().Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}

for _, a := range blockCopy.Block.Body.Attestations {
committee, err := helpers.BeaconCommitteeFromState(postState, a.Data.Slot, a.Data.CommitteeIndex)
if err != nil {
return err
}
indices, err := attestationutil.AttestingIndices(a.AggregationBits, committee)
if err != nil {
return err
}
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}
}

if featureconfig.Get().InitSyncCacheState {
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/state/types.go
Expand Up @@ -86,7 +86,7 @@ func (b *BeaconState) Copy() *BeaconState {
dirtyFields: make(map[fieldIndex]interface{}, 20),

// Copy on write validator index map.
valIdxMap: b.valIdxMap,
valIdxMap: b.valIdxMap,
}

for i := range b.dirtyFields {
Expand Down

0 comments on commit cdfa969

Please sign in to comment.