From 655f57e3f2663345773336008ddfcfb7428bbe76 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Wed, 19 Feb 2020 05:10:54 +0800 Subject: [PATCH] Bound Initial Sync Cache Size (#4844) * bound initial sync * fix lint * Revert "Better block attestation inclusion (#4838)" This reverts commit 090d9627feed47281f9ab9660bb4d9854c491a7a. * add memory pool * more fixes * revert changes * add hack * revert hack * push halving * bring back hack * increase cache size * more fixes * more changes * new fixes * add test * add reverse test * more tests and clean up * add helper * more cleanup and tests * fix test * remove code * set gc percent flag * lint * lint * Fix comment formatting * Fix some formatting * inverse if statement * remove debug log * Apply suggestions from code review Co-Authored-By: Ivan Martinez * Update beacon-chain/state/getters.go Co-Authored-By: Ivan Martinez * Update beacon-chain/db/kv/state.go * integrate state generator * gaz * fixes * terence's review * reduce bound further * fix test * separate into new files * gaz * mod build file * add test * revert changes * fix test * Update beacon-chain/core/helpers/slot_epoch.go Co-Authored-By: terence tsao * handle edge case * add back test * fix test again * handle edge case * Update beacon-chain/blockchain/init_sync_process_block.go * Update beacon-chain/blockchain/init_sync_process_block.go * Update beacon-chain/stategen/service_test.go Co-Authored-By: Raul Jordan * Update beacon-chain/blockchain/init_sync_process_block.go Co-Authored-By: Raul Jordan * Update beacon-chain/stategen/service.go Co-Authored-By: Raul Jordan * Update beacon-chain/stategen/service.go Co-Authored-By: Raul Jordan * raul's review * raul's review * fix refs * terence's review * one more fix * Update beacon-chain/blockchain/init_sync_process_block.go Co-authored-by: Raul Jordan Co-authored-by: Ivan Martinez Co-authored-by: Preston Van Loon Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: terence tsao --- beacon-chain/blockchain/BUILD.bazel | 4 + .../blockchain/init_sync_process_block.go | 191 ++++++++++++ .../init_sync_process_block_test.go | 279 ++++++++++++++++++ beacon-chain/blockchain/process_block.go | 15 +- .../blockchain/process_block_helpers.go | 35 ++- beacon-chain/blockchain/process_block_test.go | 46 ++- beacon-chain/blockchain/service.go | 5 + beacon-chain/core/helpers/slot_epoch.go | 9 + beacon-chain/core/helpers/slot_epoch_test.go | 16 + beacon-chain/db/iface/interface.go | 1 + beacon-chain/db/kafka/passthrough.go | 5 + beacon-chain/db/kv/state.go | 28 ++ shared/bytesutil/bytes.go | 8 + shared/bytesutil/bytes_test.go | 20 ++ 14 files changed, 646 insertions(+), 16 deletions(-) create mode 100644 beacon-chain/blockchain/init_sync_process_block.go create mode 100644 beacon-chain/blockchain/init_sync_process_block_test.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index c780acb27b5..8a3d3290feb 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "chain_info.go", "head.go", "info.go", + "init_sync_process_block.go", "log.go", "process_attestation.go", "process_attestation_helpers.go", @@ -37,6 +38,7 @@ go_library( "//beacon-chain/p2p:go_default_library", "//beacon-chain/powchain:go_default_library", "//beacon-chain/state:go_default_library", + "//beacon-chain/stategen:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/attestationutil:go_default_library", "//shared/bytesutil:go_default_library", @@ -67,6 +69,7 @@ go_test( srcs = [ "chain_info_test.go", "head_test.go", + "init_sync_process_block_test.go", "process_attestation_test.go", "process_block_test.go", "receive_attestation_test.go", @@ -97,6 +100,7 @@ go_test( "@com_github_prysmaticlabs_go_ssz//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", + "@in_gopkg_d4l3k_messagediff_v1//:go_default_library", "@org_golang_x_net//context:go_default_library", ], ) diff --git a/beacon-chain/blockchain/init_sync_process_block.go b/beacon-chain/blockchain/init_sync_process_block.go new file mode 100644 index 00000000000..1aced02e890 --- /dev/null +++ b/beacon-chain/blockchain/init_sync_process_block.go @@ -0,0 +1,191 @@ +package blockchain + +import ( + "context" + "sort" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" +) + +const maxCacheSize = 70 +const initialSyncCacheSize = 45 +const minimumCacheSize = initialSyncCacheSize / 3 + +func (s *Service) persistCachedStates(ctx context.Context, numOfStates int) error { + oldStates := make([]*stateTrie.BeaconState, 0, numOfStates) + + // Add slots to the map and add epoch boundary states to the slice. + for _, rt := range s.boundaryRoots[:numOfStates-minimumCacheSize] { + oldStates = append(oldStates, s.initSyncState[rt]) + } + + err := s.beaconDB.SaveStates(ctx, oldStates, s.boundaryRoots[:numOfStates-minimumCacheSize]) + if err != nil { + return err + } + for _, rt := range s.boundaryRoots[:numOfStates-minimumCacheSize] { + delete(s.initSyncState, rt) + } + s.boundaryRoots = s.boundaryRoots[numOfStates-minimumCacheSize:] + return nil +} + +// filter out boundary candidates from our currently processed batch of states. +func (s *Service) filterBoundaryCandidates(ctx context.Context, root [32]byte, postState *stateTrie.BeaconState) { + // Only trigger on epoch start. + if !helpers.IsEpochStart(postState.Slot()) { + return + } + + stateSlice := make([][32]byte, 0, len(s.initSyncState)) + // Add epoch boundary roots to slice. + for rt := range s.initSyncState { + stateSlice = append(stateSlice, rt) + } + + sort.Slice(stateSlice, func(i int, j int) bool { + return s.initSyncState[stateSlice[i]].Slot() < s.initSyncState[stateSlice[j]].Slot() + }) + epochLength := params.BeaconConfig().SlotsPerEpoch + + if len(s.boundaryRoots) > 0 { + // Retrieve previous boundary root. + previousBoundaryRoot := s.boundaryRoots[len(s.boundaryRoots)-1] + previousState, ok := s.initSyncState[previousBoundaryRoot] + if !ok { + // Remove the non-existent root and exit filtering. + s.boundaryRoots = s.boundaryRoots[:len(s.boundaryRoots)-1] + return + } + previousSlot := previousState.Slot() + + // Round up slot number to account for skipped slots. + previousSlot = helpers.RoundUpToNearestEpoch(previousSlot) + if postState.Slot()-previousSlot >= epochLength { + targetSlot := postState.Slot() + tempRoots := s.loopThroughCandidates(stateSlice, previousBoundaryRoot, previousSlot, targetSlot) + s.boundaryRoots = append(s.boundaryRoots, tempRoots...) + } + } + s.boundaryRoots = append(s.boundaryRoots, root) + s.pruneOldStates() + s.pruneNonBoundaryStates() +} + +// loop-through the provided candidate roots to filter out which would be appropriate boundary roots. +func (s *Service) loopThroughCandidates(stateSlice [][32]byte, previousBoundaryRoot [32]byte, + previousSlot uint64, targetSlot uint64) [][32]byte { + tempRoots := [][32]byte{} + epochLength := params.BeaconConfig().SlotsPerEpoch + + // Loop through current states to filter for valid boundary states. + for i := len(stateSlice) - 1; stateSlice[i] != previousBoundaryRoot && i >= 0; i-- { + currentSlot := s.initSyncState[stateSlice[i]].Slot() + // Skip if the current slot is larger than the previous epoch + // boundary. + if currentSlot > targetSlot-epochLength { + continue + } + tempRoots = append(tempRoots, stateSlice[i]) + + // Switch target slot if the current slot is greater than + // 1 epoch boundary from the previously saved boundary slot. + if currentSlot > previousSlot+epochLength { + currentSlot = helpers.RoundUpToNearestEpoch(currentSlot) + targetSlot = currentSlot + continue + } + break + } + // Reverse to append the roots in ascending order corresponding + // to the respective slots. + tempRoots = bytesutil.ReverseBytes32Slice(tempRoots) + return tempRoots +} + +// prune for states past the current finalized checkpoint. +func (s *Service) pruneOldStates() { + prunedBoundaryRoots := [][32]byte{} + for _, rt := range s.boundaryRoots { + st, ok := s.initSyncState[rt] + // Skip non-existent roots. + if !ok { + continue + } + if st.Slot() < helpers.StartSlot(s.FinalizedCheckpt().Epoch) { + delete(s.initSyncState, rt) + continue + } + prunedBoundaryRoots = append(prunedBoundaryRoots, rt) + } + s.boundaryRoots = prunedBoundaryRoots +} + +// prune cache for non-boundary states. +func (s *Service) pruneNonBoundaryStates() { + boundaryMap := make(map[[32]byte]bool) + for i := range s.boundaryRoots { + boundaryMap[s.boundaryRoots[i]] = true + } + for rt := range s.initSyncState { + if !boundaryMap[rt] { + delete(s.initSyncState, rt) + } + } +} + +func (s *Service) pruneOldNonFinalizedStates() { + stateSlice := make([][32]byte, 0, len(s.initSyncState)) + // Add epoch boundary roots to slice. + for rt := range s.initSyncState { + stateSlice = append(stateSlice, rt) + } + + // Sort by slots. + sort.Slice(stateSlice, func(i int, j int) bool { + return s.initSyncState[stateSlice[i]].Slot() < s.initSyncState[stateSlice[j]].Slot() + }) + + boundaryMap := make(map[[32]byte]bool) + for i := range s.boundaryRoots { + boundaryMap[s.boundaryRoots[i]] = true + } + for _, rt := range stateSlice[:initialSyncCacheSize] { + if boundaryMap[rt] { + continue + } + delete(s.initSyncState, rt) + } +} + +func (s *Service) generateState(ctx context.Context, startRoot [32]byte, endRoot [32]byte) (*stateTrie.BeaconState, error) { + preState, err := s.beaconDB.State(ctx, startRoot) + if err != nil { + return nil, err + } + if preState == nil { + return nil, errors.New("finalized state does not exist in db") + } + endBlock, err := s.beaconDB.Block(ctx, endRoot) + if err != nil { + return nil, err + } + if endBlock == nil { + return nil, errors.New("provided block root does not have block saved in the db") + } + log.Warnf("Generating missing state of slot %d and root %#x", endBlock.Block.Slot, endRoot) + + blocks, err := s.stateGen.LoadBlocks(ctx, preState.Slot()+1, endBlock.Block.Slot, endRoot) + if err != nil { + return nil, errors.Wrap(err, "could not load the required blocks") + } + postState, err := s.stateGen.ReplayBlocks(ctx, preState, blocks, endBlock.Block.Slot) + if err != nil { + return nil, errors.Wrap(err, "could not replay the blocks to generate the resultant state") + } + return postState, nil +} diff --git a/beacon-chain/blockchain/init_sync_process_block_test.go b/beacon-chain/blockchain/init_sync_process_block_test.go new file mode 100644 index 00000000000..4228dadd088 --- /dev/null +++ b/beacon-chain/blockchain/init_sync_process_block_test.go @@ -0,0 +1,279 @@ +package blockchain + +import ( + "context" + "testing" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/core/state" + testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/testutil" + "gopkg.in/d4l3k/messagediff.v1" +) + +func TestFilterBoundaryCandidates_FilterCorrect(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + st, _ := stateTrie.InitializeFromProtoUnsafe(&pb.BeaconState{}) + + for i := uint64(0); i < 500; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + service.initSyncState[root] = st.Copy() + if helpers.IsEpochStart(i) { + service.boundaryRoots = append(service.boundaryRoots, root) + } + } + lastIndex := len(service.boundaryRoots) - 1 + for i := uint64(500); i < 2000; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + service.initSyncState[root] = st.Copy() + } + // Set current state. + latestSlot := helpers.RoundUpToNearestEpoch(2000) + st.SetSlot(latestSlot) + lastRoot := [32]byte{} + copy(lastRoot[:], bytesutil.Bytes32(latestSlot)) + + service.initSyncState[lastRoot] = st.Copy() + service.finalizedCheckpt = ðpb.Checkpoint{ + Epoch: 0, + Root: []byte{}, + } + service.filterBoundaryCandidates(context.Background(), lastRoot, st) + if len(service.boundaryRoots[lastIndex+1:]) == 0 { + t.Fatal("Wanted non zero added boundary roots") + } + for _, rt := range service.boundaryRoots[lastIndex+1:] { + st, ok := service.initSyncState[rt] + if !ok { + t.Error("Root doen't exist in cache map") + continue + } + if !(helpers.IsEpochStart(st.Slot()) || helpers.IsEpochStart(st.Slot()-1) || helpers.IsEpochStart(st.Slot()+1)) { + t.Errorf("boundary roots not validly stored. They have slot %d", st.Slot()) + } + } +} + +func TestFilterBoundaryCandidates_HandleSkippedSlots(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + st, _ := stateTrie.InitializeFromProtoUnsafe(&pb.BeaconState{}) + + for i := uint64(0); i < 500; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + service.initSyncState[root] = st.Copy() + if helpers.IsEpochStart(i) { + service.boundaryRoots = append(service.boundaryRoots, root) + } + } + lastIndex := len(service.boundaryRoots) - 1 + for i := uint64(500); i < 2000; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + // save only for offsetted slots + if helpers.IsEpochStart(i + 10) { + service.initSyncState[root] = st.Copy() + } + } + // Set current state. + latestSlot := helpers.RoundUpToNearestEpoch(2000) + st.SetSlot(latestSlot) + lastRoot := [32]byte{} + copy(lastRoot[:], bytesutil.Bytes32(latestSlot)) + + service.initSyncState[lastRoot] = st.Copy() + service.finalizedCheckpt = ðpb.Checkpoint{ + Epoch: 0, + Root: []byte{}, + } + service.filterBoundaryCandidates(context.Background(), lastRoot, st) + if len(service.boundaryRoots[lastIndex+1:]) == 0 { + t.Fatal("Wanted non zero added boundary roots") + } + for _, rt := range service.boundaryRoots[lastIndex+1:] { + st, ok := service.initSyncState[rt] + if !ok { + t.Error("Root doen't exist in cache map") + continue + } + if st.Slot() >= 500 { + // Ignore head boundary root. + if st.Slot() == 2016 { + continue + } + if !helpers.IsEpochStart(st.Slot() + 10) { + t.Errorf("boundary roots not validly stored. They have slot %d "+ + "instead of the offset from epoch start", st.Slot()) + } + } + } +} + +func TestPruneOldStates_AlreadyFinalized(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + st, _ := stateTrie.InitializeFromProtoUnsafe(&pb.BeaconState{}) + + for i := uint64(100); i < 200; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + service.initSyncState[root] = st.Copy() + service.boundaryRoots = append(service.boundaryRoots, root) + } + finalizedEpoch := uint64(5) + service.finalizedCheckpt = ðpb.Checkpoint{Epoch: finalizedEpoch} + service.pruneOldStates() + for _, rt := range service.boundaryRoots { + st, ok := service.initSyncState[rt] + if !ok { + t.Error("Root doen't exist in cache map") + continue + } + if st.Slot() < helpers.StartSlot(finalizedEpoch) { + t.Errorf("State with slot %d still exists and not pruned", st.Slot()) + } + } +} + +func TestPruneNonBoundary_CanPrune(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + st, _ := stateTrie.InitializeFromProtoUnsafe(&pb.BeaconState{}) + + for i := uint64(0); i < 2000; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + service.initSyncState[root] = st.Copy() + if helpers.IsEpochStart(i) { + service.boundaryRoots = append(service.boundaryRoots, root) + } + } + service.pruneNonBoundaryStates() + for _, rt := range service.boundaryRoots { + st, ok := service.initSyncState[rt] + if !ok { + t.Error("Root doesn't exist in cache map") + continue + } + if !helpers.IsEpochStart(st.Slot()) { + t.Errorf("Non boundary state with slot %d still exists and not pruned", st.Slot()) + } + } +} + +func TestGenerateState_CorrectlyGenerated(t *testing.T) { + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + cfg := &Config{BeaconDB: db} + service, err := NewService(context.Background(), cfg) + if err != nil { + t.Fatal(err) + } + + beaconState, privs := testutil.DeterministicGenesisState(t, 32) + genesisBlock := blocks.NewGenesisBlock([]byte{}) + bodyRoot, err := ssz.HashTreeRoot(genesisBlock.Block) + if err != nil { + t.Fatal(err) + } + beaconState.SetLatestBlockHeader(ðpb.BeaconBlockHeader{ + Slot: genesisBlock.Block.Slot, + ParentRoot: genesisBlock.Block.ParentRoot, + StateRoot: params.BeaconConfig().ZeroHash[:], + BodyRoot: bodyRoot[:], + }) + beaconState.SetSlashings(make([]uint64, params.BeaconConfig().EpochsPerSlashingsVector)) + cp := beaconState.CurrentJustifiedCheckpoint() + mockRoot := [32]byte{} + copy(mockRoot[:], "hello-world") + cp.Root = mockRoot[:] + beaconState.SetCurrentJustifiedCheckpoint(cp) + beaconState.SetCurrentEpochAttestations([]*pb.PendingAttestation{}) + err = db.SaveBlock(context.Background(), genesisBlock) + if err != nil { + t.Fatal(err) + } + genRoot, err := ssz.HashTreeRoot(genesisBlock) + if err != nil { + t.Fatal(err) + } + err = db.SaveState(context.Background(), beaconState, genRoot) + if err != nil { + t.Fatal(err) + } + + lastBlock := ðpb.SignedBeaconBlock{} + for i := uint64(1); i < 10; i++ { + block, err := testutil.GenerateFullBlock(beaconState, privs, testutil.DefaultBlockGenConfig(), i) + if err != nil { + t.Fatal(err) + } + beaconState, err = state.ExecuteStateTransition(context.Background(), beaconState, block) + if err != nil { + t.Fatal(err) + } + err = db.SaveBlock(context.Background(), block) + if err != nil { + t.Fatal(err) + } + lastBlock = block + } + root, err := ssz.HashTreeRoot(lastBlock.Block) + if err != nil { + t.Fatal(err) + } + + newState, err := service.generateState(context.Background(), genRoot, root) + if err != nil { + t.Fatal(err) + } + if !ssz.DeepEqual(newState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) { + diff, _ := messagediff.PrettyDiff(newState.InnerStateUnsafe(), beaconState.InnerStateUnsafe()) + t.Errorf("Generated state is different from what is expected: %s", diff) + } +} diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 58ca2ea96aa..b5f56108d6d 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -198,7 +198,8 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed } if featureconfig.Get().InitSyncCacheState { - s.initSyncState[root] = postState + s.initSyncState[root] = postState.Copy() + s.filterBoundaryCandidates(ctx, root, postState) } else { if err := s.beaconDB.SaveState(ctx, postState, root); err != nil { return errors.Wrap(err, "could not save state") @@ -251,6 +252,18 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed return errors.Wrap(err, "could not save finalized checkpoint") } + if featureconfig.Get().InitSyncCacheState { + numOfStates := len(s.boundaryRoots) + if numOfStates > initialSyncCacheSize { + if err = s.persistCachedStates(ctx, numOfStates); err != nil { + return err + } + } + if len(s.initSyncState) > maxCacheSize { + s.pruneOldNonFinalizedStates() + } + } + // Epoch boundary bookkeeping such as logging epoch summaries. if postState.Slot() >= s.nextEpochBoundarySlot { metrics.ReportEpochMetrics(postState) diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index fc5901a9ff7..a857662a940 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -67,7 +67,13 @@ func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) ( return nil, errors.Wrapf(err, "could not get pre state for slot %d", b.Slot) } if preState == nil { - return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + if bytes.Equal(s.finalizedCheckpt.Root, b.ParentRoot) { + return nil, fmt.Errorf("pre state of slot %d does not exist", b.Slot) + } + preState, err = s.generateState(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root), bytesutil.ToBytes32(b.ParentRoot)) + if err != nil { + return nil, err + } } return preState, nil // No copy needed from newly hydrated DB object. } @@ -261,7 +267,11 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt // If justified state is nil, resume back to normal syncing process and save // justified check point. if justifiedState == nil { - return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt) + justifiedState, err = s.generateState(ctx, bytesutil.ToBytes32(s.finalizedCheckpt.Root), justifiedRoot) + if err != nil { + log.Error(err) + return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt) + } } if err := s.beaconDB.SaveState(ctx, justifiedState, justifiedRoot); err != nil { return errors.Wrap(err, "could not save justified state") @@ -281,19 +291,24 @@ func (s *Service) saveInitState(ctx context.Context, state *stateTrie.BeaconStat finalizedRoot := bytesutil.ToBytes32(cpt.Root) fs := s.initSyncState[finalizedRoot] if fs == nil { - // This might happen if the client was in sync and is now re-syncing for whatever reason. - log.Warn("Initial sync cache did not have finalized state root cached") - return nil + var err error + fs, err = s.beaconDB.State(ctx, finalizedRoot) + if err != nil { + return err + } + if fs == nil { + fs, err = s.generateState(ctx, bytesutil.ToBytes32(s.prevFinalizedCheckpt.Root), finalizedRoot) + if err != nil { + // This might happen if the client was in sync and is now re-syncing for whatever reason. + log.Warn("Initial sync cache did not have finalized state root cached") + return err + } + } } if err := s.beaconDB.SaveState(ctx, fs, finalizedRoot); err != nil { return errors.Wrap(err, "could not save state") } - for r, oldState := range s.initSyncState { - if oldState.Slot() < cpt.Epoch*params.BeaconConfig().SlotsPerEpoch { - delete(s.initSyncState, r) - } - } return nil } diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index ac1ef547feb..8a0f6c9e536 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -373,6 +373,7 @@ func TestCachedPreState_CanGetFromDB(t *testing.T) { r := [32]byte{'A'} b := ðpb.BeaconBlock{Slot: 1, ParentRoot: r[:]} + service.finalizedCheckpt = ðpb.Checkpoint{Root: r[:]} _, err = service.verifyBlkPreState(ctx, b) wanted := "pre state of slot 1 does not exist" if err.Error() != wanted { @@ -431,11 +432,6 @@ func TestSaveInitState_CanSaveDelete(t *testing.T) { if finalizedState == nil { t.Error("finalized state can't be nil") } - - // Verify cached state is properly pruned - if len(service.initSyncState) != int(params.BeaconConfig().SlotsPerEpoch) { - t.Errorf("wanted: %d, got: %d", len(service.initSyncState), params.BeaconConfig().SlotsPerEpoch) - } } func TestUpdateJustified_CouldUpdateBest(t *testing.T) { @@ -537,6 +533,46 @@ func TestFilterBlockRoots_CanFilter(t *testing.T) { } } +func TestPersistCache_CanSave(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + cfg := &Config{BeaconDB: db} + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + st, _ := stateTrie.InitializeFromProtoUnsafe(&pb.BeaconState{}) + + for i := uint64(0); i < initialSyncCacheSize; i++ { + st.SetSlot(i) + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + service.initSyncState[root] = st.Copy() + service.boundaryRoots = append(service.boundaryRoots, root) + } + + if err = service.persistCachedStates(ctx, initialSyncCacheSize); err != nil { + t.Fatal(err) + } + + for i := uint64(0); i < initialSyncCacheSize-minimumCacheSize; i++ { + root := [32]byte{} + copy(root[:], bytesutil.Bytes32(i)) + state, err := db.State(context.Background(), root) + if err != nil { + t.Errorf("State with root of %#x , could not be retrieved: %v", root, err) + } + if state == nil { + t.Errorf("State with root of %#x , does not exist", root) + } + if state.Slot() != i { + t.Errorf("Incorrect slot retrieved. Wanted %d but got %d", i, state.Slot()) + } + } +} + func TestFillForkChoiceMissingBlocks_CanSave(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 822cca3f7c0..4730aac6a0b 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -30,6 +30,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/beacon-chain/stategen" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" @@ -64,9 +65,11 @@ type Service struct { nextEpochBoundarySlot uint64 voteLock sync.RWMutex initSyncState map[[32]byte]*stateTrie.BeaconState + boundaryRoots [][32]byte initSyncStateLock sync.RWMutex checkpointState *cache.CheckpointStateCache checkpointStateLock sync.Mutex + stateGen *stategen.State } // Config options for the service. @@ -101,7 +104,9 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { epochParticipation: make(map[uint64]*precompute.Balance), forkChoiceStore: cfg.ForkChoiceStore, initSyncState: make(map[[32]byte]*stateTrie.BeaconState), + boundaryRoots: [][32]byte{}, checkpointState: cache.NewCheckpointStateCache(), + stateGen: stategen.New(cfg.BeaconDB), }, nil } diff --git a/beacon-chain/core/helpers/slot_epoch.go b/beacon-chain/core/helpers/slot_epoch.go index fa94ec04e50..b6228ad9fee 100644 --- a/beacon-chain/core/helpers/slot_epoch.go +++ b/beacon-chain/core/helpers/slot_epoch.go @@ -106,3 +106,12 @@ func VerifySlotTime(genesisTime uint64, slot uint64) error { func SlotsSince(time time.Time) uint64 { return uint64(roughtime.Since(time).Seconds()) / params.BeaconConfig().SecondsPerSlot } + +// RoundUpToNearestEpoch rounds up the provided slot value to the nearest epoch. +func RoundUpToNearestEpoch(slot uint64) uint64 { + if slot%params.BeaconConfig().SlotsPerEpoch != 0 { + slot -= slot % params.BeaconConfig().SlotsPerEpoch + slot += params.BeaconConfig().SlotsPerEpoch + } + return slot +} diff --git a/beacon-chain/core/helpers/slot_epoch_test.go b/beacon-chain/core/helpers/slot_epoch_test.go index 1261533821d..8784535fc5f 100644 --- a/beacon-chain/core/helpers/slot_epoch_test.go +++ b/beacon-chain/core/helpers/slot_epoch_test.go @@ -175,3 +175,19 @@ func TestSlotsSinceEpochStarts(t *testing.T) { } } } + +func TestRoundUpToNearestEpoch_OK(t *testing.T) { + tests := []struct { + startSlot uint64 + roundedUpSlot uint64 + }{ + {startSlot: 0 * params.BeaconConfig().SlotsPerEpoch, roundedUpSlot: 0}, + {startSlot: 1*params.BeaconConfig().SlotsPerEpoch - 10, roundedUpSlot: 1 * params.BeaconConfig().SlotsPerEpoch}, + {startSlot: 10*params.BeaconConfig().SlotsPerEpoch - (params.BeaconConfig().SlotsPerEpoch - 1), roundedUpSlot: 10 * params.BeaconConfig().SlotsPerEpoch}, + } + for _, tt := range tests { + if tt.roundedUpSlot != RoundUpToNearestEpoch(tt.startSlot) { + t.Errorf("RoundUpToNearestEpoch(%d) = %d, wanted: %d", tt.startSlot, RoundUpToNearestEpoch(tt.startSlot), tt.roundedUpSlot) + } + } +} diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 3a90bb8d65f..9c20ccc86fc 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -77,6 +77,7 @@ type NoHeadAccessDatabase interface { SaveValidatorIndices(ctx context.Context, publicKeys [][48]byte, validatorIndices []uint64) error // State related methods. SaveState(ctx context.Context, state *state.BeaconState, blockRoot [32]byte) error + SaveStates(ctx context.Context, states []*state.BeaconState, blockRoots [][32]byte) error DeleteState(ctx context.Context, blockRoot [32]byte) error DeleteStates(ctx context.Context, blockRoots [][32]byte) error // Slashing operations. diff --git a/beacon-chain/db/kafka/passthrough.go b/beacon-chain/db/kafka/passthrough.go index 77d40f4d01f..47b92ab817a 100644 --- a/beacon-chain/db/kafka/passthrough.go +++ b/beacon-chain/db/kafka/passthrough.go @@ -227,6 +227,11 @@ func (e Exporter) SaveState(ctx context.Context, state *state.BeaconState, block return e.db.SaveState(ctx, state, blockRoot) } +// SaveStates -- passthrough. +func (e Exporter) SaveStates(ctx context.Context, states []*state.BeaconState, blockRoots [][32]byte) error { + return e.db.SaveStates(ctx, states, blockRoots) +} + // SaveProposerSlashing -- passthrough. func (e Exporter) SaveProposerSlashing(ctx context.Context, slashing *eth.ProposerSlashing) error { return e.db.SaveProposerSlashing(ctx, slashing) diff --git a/beacon-chain/db/kv/state.go b/beacon-chain/db/kv/state.go index 352fda25bf1..d9e83e77efa 100644 --- a/beacon-chain/db/kv/state.go +++ b/beacon-chain/db/kv/state.go @@ -120,6 +120,34 @@ func (k *Store) SaveState(ctx context.Context, state *state.BeaconState, blockRo }) } +// SaveStates stores multiple states to the db using the provided corresponding roots. +func (k *Store) SaveStates(ctx context.Context, states []*state.BeaconState, blockRoots [][32]byte) error { + ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveStates") + defer span.End() + if states == nil { + return errors.New("nil state") + } + var err error + multipleEncs := make([][]byte, len(states)) + for i, st := range states { + multipleEncs[i], err = encode(st.InnerStateUnsafe()) + if err != nil { + return err + } + } + + return k.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(stateBucket) + for i, rt := range blockRoots { + err = bucket.Put(rt[:], multipleEncs[i]) + if err != nil { + return err + } + } + return nil + }) +} + // HasState checks if a state by root exists in the db. func (k *Store) HasState(ctx context.Context, blockRoot [32]byte) bool { ctx, span := trace.StartSpan(ctx, "BeaconDB.HasState") diff --git a/shared/bytesutil/bytes.go b/shared/bytesutil/bytes.go index f685e9cdebc..908a7b3951b 100644 --- a/shared/bytesutil/bytes.go +++ b/shared/bytesutil/bytes.go @@ -194,3 +194,11 @@ func Copy2dBytes(ary [][]byte) [][]byte { } return nil } + +// ReverseBytes32Slice will reverse the provided slice's order. +func ReverseBytes32Slice(arr [][32]byte) [][32]byte { + for i, j := 0, len(arr)-1; i < j; i, j = i+1, j-1 { + arr[i], arr[j] = arr[j], arr[i] + } + return arr +} diff --git a/shared/bytesutil/bytes_test.go b/shared/bytesutil/bytes_test.go index edad06ad9e5..44228566f89 100644 --- a/shared/bytesutil/bytes_test.go +++ b/shared/bytesutil/bytes_test.go @@ -2,6 +2,7 @@ package bytesutil_test import ( "bytes" + "reflect" "testing" "github.com/prysmaticlabs/prysm/shared/bytesutil" @@ -207,3 +208,22 @@ func TestTruncate(t *testing.T) { } } } + +func TestReverse(t *testing.T) { + tests := []struct { + input [][32]byte + output [][32]byte + }{ + {[][32]byte{[32]byte{'A'}, [32]byte{'B'}, [32]byte{'C'}, [32]byte{'D'}, [32]byte{'E'}, [32]byte{'F'}, [32]byte{'G'}, [32]byte{'H'}}, + [][32]byte{[32]byte{'H'}, [32]byte{'G'}, [32]byte{'F'}, [32]byte{'E'}, [32]byte{'D'}, [32]byte{'C'}, [32]byte{'B'}, [32]byte{'A'}}}, + {[][32]byte{[32]byte{1}, [32]byte{2}, [32]byte{3}, [32]byte{4}}, + [][32]byte{[32]byte{4}, [32]byte{3}, [32]byte{2}, [32]byte{1}}}, + {[][32]byte{}, [][32]byte{}}, + } + for _, tt := range tests { + b := bytesutil.ReverseBytes32Slice(tt.input) + if !reflect.DeepEqual(b, tt.output) { + t.Errorf("Reverse(%d) = %v, want = %d", tt.input, b, tt.output) + } + } +}