Skip to content

Commit

Permalink
Fix initial sync cache state (#4677)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed Jan 28, 2020
1 parent ad01bfb commit 07ba594
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 66 deletions.
51 changes: 22 additions & 29 deletions beacon-chain/blockchain/forkchoice/process_block.go
Expand Up @@ -186,7 +186,7 @@ func (s *Store) OnBlockInitialSyncStateTransition(ctx context.Context, signed *e
defer s.initSyncStateLock.Unlock()

// Retrieve incoming block's pre state.
preState, err := s.cachedPreState(ctx, b)
preState, err := s.verifyBlkPreState(ctx, b)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -301,6 +301,21 @@ func (s *Store) getBlockPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb

// verifyBlkPreState validates input block has a valid pre-state.
func (s *Store) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
if featureconfig.Get().InitSyncCacheState {
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
var err error
if preState == nil {
preState, err = s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
}
return proto.Clone(preState).(*pb.BeaconState), nil
}

preState, err := s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
Expand Down Expand Up @@ -493,7 +508,13 @@ func (s *Store) updateJustified(ctx context.Context, state *pb.BeaconState) erro

if featureconfig.Get().InitSyncCacheState {
justifiedRoot := bytesutil.ToBytes32(state.CurrentJustifiedCheckpoint.Root)

justifiedState := s.initSyncState[justifiedRoot]
// If justified state is nil, resume back to normal syncing process and save
// justified check point.
if justifiedState == nil {
return s.db.SaveJustifiedCheckpoint(ctx, state.CurrentJustifiedCheckpoint)
}
if err := s.db.SaveState(ctx, justifiedState, justifiedRoot); err != nil {
return errors.Wrap(err, "could not save justified state")
}
Expand All @@ -518,34 +539,6 @@ func (s *Store) updateJustifiedCheckpoint() {
}
}

// This receives cached state in memory for initial sync only during initial sync.
func (s *Store) cachedPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
if featureconfig.Get().InitSyncCacheState {
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
var err error
if preState == nil {
preState, err = s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
}
return proto.Clone(preState).(*pb.BeaconState), nil
}

preState, err := s.db.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}

return preState, nil
}

// This saves every finalized state in DB during initial sync, needed as part of optimization to
// use cache state during initial sync in case of restart.
func (s *Store) saveInitState(ctx context.Context, state *pb.BeaconState) error {
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/blockchain/forkchoice/process_block_test.go
Expand Up @@ -429,7 +429,7 @@ func TestCachedPreState_CanGetFromCache(t *testing.T) {
store.initSyncState[r] = s

wanted := "pre state of slot 1 does not exist"
if _, err := store.cachedPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
if _, err := store.verifyBlkPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
t.Fatal("Not expected error")
}
}
Expand All @@ -449,7 +449,7 @@ func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) {
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
store.initSyncState[r] = s

received, err := store.cachedPreState(ctx, b)
received, err := store.verifyBlkPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
Expand All @@ -467,7 +467,7 @@ func TestCachedPreState_CanGetFromDB(t *testing.T) {
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}

_, err := store.cachedPreState(ctx, b)
_, err := store.verifyBlkPreState(ctx, b)
wanted := "pre state of slot 1 does not exist"
if err.Error() != wanted {
t.Error("Did not get wanted error")
Expand All @@ -476,7 +476,7 @@ func TestCachedPreState_CanGetFromDB(t *testing.T) {
s := &pb.BeaconState{Slot: 1}
store.db.SaveState(ctx, s, r)

received, err := store.cachedPreState(ctx, b)
received, err := store.verifyBlkPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/process_block.go
Expand Up @@ -152,7 +152,7 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
defer s.initSyncStateLock.Unlock()

// Retrieve incoming block's pre state.
preState, err := s.cachedPreState(ctx, b)
preState, err := s.verifyBlkPreState(ctx, b)
if err != nil {
return nil, err
}
Expand Down
49 changes: 21 additions & 28 deletions beacon-chain/blockchain/process_block_helpers.go
Expand Up @@ -54,6 +54,21 @@ func (s *Service) getBlockPreState(ctx context.Context, b *ethpb.BeaconBlock) (*

// verifyBlkPreState validates input block has a valid pre-state.
func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
if featureconfig.Get().InitSyncCacheState {
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
var err error
if preState == nil {
preState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
}
return proto.Clone(preState).(*pb.BeaconState), nil
}

preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
Expand Down Expand Up @@ -230,7 +245,13 @@ func (s *Service) updateJustified(ctx context.Context, state *pb.BeaconState) er

if featureconfig.Get().InitSyncCacheState {
justifiedRoot := bytesutil.ToBytes32(state.CurrentJustifiedCheckpoint.Root)

justifiedState := s.initSyncState[justifiedRoot]
// If justified state is nil, resume back to normal syncing process and save
// justified check point.
if justifiedState == nil {
return s.beaconDB.SaveJustifiedCheckpoint(ctx, state.CurrentJustifiedCheckpoint)
}
if err := s.beaconDB.SaveState(ctx, justifiedState, justifiedRoot); err != nil {
return errors.Wrap(err, "could not save justified state")
}
Expand All @@ -244,34 +265,6 @@ func (s *Service) currentSlot() uint64 {
return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot
}

// This receives cached state in memory for initial sync only during initial sync.
func (s *Service) cachedPreState(ctx context.Context, b *ethpb.BeaconBlock) (*pb.BeaconState, error) {
if featureconfig.Get().InitSyncCacheState {
preState := s.initSyncState[bytesutil.ToBytes32(b.ParentRoot)]
var err error
if preState == nil {
preState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}
}
return proto.Clone(preState).(*pb.BeaconState), nil
}

preState, err := s.beaconDB.State(ctx, bytesutil.ToBytes32(b.ParentRoot))
if err != nil {
return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot)
}
if preState == nil {
return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot)
}

return preState, nil
}

// This saves every finalized state in DB during initial sync, needed as part of optimization to
// use cache state during initial sync in case of restart.
func (s *Service) saveInitState(ctx context.Context, state *pb.BeaconState) error {
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/blockchain/process_block_test.go
Expand Up @@ -321,7 +321,7 @@ func TestCachedPreState_CanGetFromCache(t *testing.T) {
service.initSyncState[r] = s

wanted := "pre state of slot 1 does not exist"
if _, err := service.cachedPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
if _, err := service.verifyBlkPreState(ctx, b); !strings.Contains(err.Error(), wanted) {
t.Fatal("Not expected error")
}
}
Expand All @@ -346,7 +346,7 @@ func TestCachedPreState_CanGetFromCacheWithFeature(t *testing.T) {
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}
service.initSyncState[r] = s

received, err := service.cachedPreState(ctx, b)
received, err := service.verifyBlkPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
Expand All @@ -369,7 +369,7 @@ func TestCachedPreState_CanGetFromDB(t *testing.T) {
r := [32]byte{'A'}
b := &ethpb.BeaconBlock{Slot: 1, ParentRoot: r[:]}

_, err = service.cachedPreState(ctx, b)
_, err = service.verifyBlkPreState(ctx, b)
wanted := "pre state of slot 1 does not exist"
if err.Error() != wanted {
t.Error("Did not get wanted error")
Expand All @@ -378,7 +378,7 @@ func TestCachedPreState_CanGetFromDB(t *testing.T) {
s := &pb.BeaconState{Slot: 1}
service.beaconDB.SaveState(ctx, s, r)

received, err := service.cachedPreState(ctx, b)
received, err := service.verifyBlkPreState(ctx, b)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/service.go
Expand Up @@ -313,6 +313,10 @@ func (s *Service) saveHeadNoDB(ctx context.Context, b *ethpb.SignedBeaconBlock,
s.headLock.Lock()
defer s.headLock.Unlock()

if b == nil || b.Block == nil {
return errors.New("cannot save nil head block")
}

s.headSlot = b.Block.Slot

s.canonicalRoots[b.Block.Slot] = r[:]
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/initial-sync/round_robin.go
Expand Up @@ -240,6 +240,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
best = s.bestPeer()
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot()))
}

for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
HeadBlockRoot: root,
Expand Down
1 change: 1 addition & 0 deletions shared/featureconfig/flags.go
Expand Up @@ -247,5 +247,6 @@ var E2EBeaconChainFlags = []string{
"--cache-filtered-block-tree",
"--enable-skip-slots-cache",
"--enable-eth1-data-vote-cache",
"--initial-sync-cache-state",
"--proto-array-forkchoice",
}

0 comments on commit 07ba594

Please sign in to comment.