From 64fa4744340f67bfc2a272bdb0a9e32c9ae8009d Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Tue, 7 Jul 2020 12:16:12 +0800 Subject: [PATCH] Batch Verify Blocks (#6469) * add everything so far * checkpoint progress * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * fix * checkpoint * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * checkpoint again * checkpoint again * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * commenting * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * bls cleanup * revert this back * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * revert core changes * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * add flag * add test * add one more test * clean up * comment * lint * terence's review * Merge refs/heads/master into fastBLS * Merge refs/heads/master into fastBLS * Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * Merge refs/heads/master into fastBLS * remove additional method * Merge branch 'fastBLS' of https://github.com/prysmaticlabs/geth-sharding into fastBLS * fix * Merge refs/heads/master into fastBLS * copy * Merge branch 'fastBLS' of https://github.com/prysmaticlabs/geth-sharding into fastBLS --- beacon-chain/blockchain/BUILD.bazel | 1 + beacon-chain/blockchain/process_block.go | 149 ++++++++++++++++-- .../blockchain/process_block_helpers.go | 7 +- beacon-chain/blockchain/process_block_test.go | 65 +++++++- beacon-chain/blockchain/receive_block.go | 65 ++++++++ beacon-chain/blockchain/service_test.go | 4 +- beacon-chain/blockchain/testing/mock.go | 29 ++++ beacon-chain/state/stategen/hot.go | 11 ++ beacon-chain/sync/initial-sync/BUILD.bazel | 1 + beacon-chain/sync/initial-sync/round_robin.go | 77 +++++++++ .../sync/initial-sync/round_robin_test.go | 134 ++++++++++++++++ shared/featureconfig/config.go | 5 + shared/featureconfig/flags.go | 6 + 13 files changed, 530 insertions(+), 24 deletions(-) diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index f2db42357f1..07b84dd6639 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//beacon-chain/state/stateutil:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/attestationutil:go_default_library", + "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index fc783fcbdd3..6c60026954b 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/state" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/shared/attestationutil" + "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" @@ -84,7 +85,7 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock, return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot) } - if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil { + if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil { return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot) } @@ -202,17 +203,74 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed if err != nil { return errors.Wrap(err, "could not execute state transition") } + return s.handlePostStateInSync(ctx, signed, blockRoot, postState) +} - s.saveInitSyncBlock(blockRoot, signed) +func (s *Service) onBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBlock, + blockRoots [][32]byte) (*stateTrie.BeaconState, []*ethpb.Checkpoint, []*ethpb.Checkpoint, error) { + ctx, span := trace.StartSpan(ctx, "blockchain.onBlock") + defer span.End() - if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil { - return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot) + if len(blks) == 0 || len(blockRoots) == 0 { + return nil, nil, nil, errors.New("no blocks provided") + } + if blks[0] == nil || blks[0].Block == nil { + return nil, nil, nil, errors.New("nil block") + } + b := blks[0].Block + + // Retrieve incoming block's pre state. + preState, err := s.verifyBlkPreState(ctx, b) + if err != nil { + return nil, nil, nil, err } + // Perform a copy to preserve copy in cache. + preState = preState.Copy() + + jCheckpoints := make([]*ethpb.Checkpoint, len(blks)) + fCheckpoints := make([]*ethpb.Checkpoint, len(blks)) + sigSet := &bls.SignatureSet{ + Signatures: []bls.Signature{}, + PublicKeys: []bls.PublicKey{}, + Messages: [][32]byte{}, + } + set := new(bls.SignatureSet) + for i, b := range blks { + set, preState, err = state.ExecuteStateTransitionNoVerifyAnySig(ctx, preState, b) + if err != nil { + return nil, nil, nil, err + } + jCheckpoints[i] = preState.CurrentJustifiedCheckpoint() + fCheckpoints[i] = preState.FinalizedCheckpoint() + sigSet.Join(set) + } + verify, err := bls.VerifyMultipleSignatures(sigSet.Signatures, sigSet.Messages, sigSet.PublicKeys) + if err != nil { + return nil, nil, nil, err + } + if !verify { + return nil, nil, nil, errors.New("batch block signature verification failed") + } + return preState, fCheckpoints, jCheckpoints, nil +} + +// handles the state post transition and saves the appropriate checkpoints and forkchoice +// data. +func (s *Service) handlePostStateInSync(ctx context.Context, signed *ethpb.SignedBeaconBlock, + blockRoot [32]byte, postState *stateTrie.BeaconState) error { + + b := signed.Block + + s.saveInitSyncBlock(blockRoot, signed) if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil { return errors.Wrap(err, "could not save state") } + if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil { + return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot) + } + // Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory. if uint64(len(s.getInitSyncBlocks())) > initialSyncBlockCacheSize { if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil { @@ -245,7 +303,57 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed } } - // Epoch boundary bookkeeping such as logging epoch summaries. + return s.handleEpochBoundary(postState) +} + +// handles a block after the block's batch has been verified, where we can save blocks +// their state summaries and split them off to relative hot/cold storage. +func (s *Service) handleBlockAfterBatchVerify(ctx context.Context, signed *ethpb.SignedBeaconBlock, + blockRoot [32]byte, fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error { + b := signed.Block + + s.saveInitSyncBlock(blockRoot, signed) + if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, fCheckpoint, jCheckpoint); err != nil { + return err + } + s.stateGen.SaveStateSummary(ctx, signed, blockRoot) + + // Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory. + if uint64(len(s.getInitSyncBlocks())) > initialSyncBlockCacheSize { + if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil { + return err + } + s.clearInitSyncBlocks() + } + + // Update finalized check point. Prune the block cache and helper caches on every new finalized epoch. + if fCheckpoint.Epoch > s.finalizedCheckpt.Epoch { + if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil { + return err + } + s.clearInitSyncBlocks() + + if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, fCheckpoint); err != nil { + return errors.Wrap(err, "could not save finalized checkpoint") + } + + s.prevFinalizedCheckpt = s.finalizedCheckpt + s.finalizedCheckpt = fCheckpoint + + fRoot := bytesutil.ToBytes32(fCheckpoint.Root) + fBlock, err := s.beaconDB.Block(ctx, fRoot) + if err != nil { + return errors.Wrap(err, "could not get finalized block to migrate") + } + if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil { + return errors.Wrap(err, "could not migrate to cold") + } + } + return nil +} + +// Epoch boundary bookkeeping such as logging epoch summaries. +func (s *Service) handleEpochBoundary(postState *stateTrie.BeaconState) error { if postState.Slot() >= s.nextEpochBoundarySlot { reportEpochMetrics(postState) s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState)) @@ -258,25 +366,18 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed return err } } - return nil } // This feeds in the block and block's attestations to fork choice store. It's allows fork choice store // to gain information on the most current chain. -func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte, state *stateTrie.BeaconState) error { - if err := s.fillInForkChoiceMissingBlocks(ctx, blk, state); err != nil { +func (s *Service) insertBlockAndAttestationsToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte, + state *stateTrie.BeaconState) error { + fCheckpoint := state.FinalizedCheckpoint() + jCheckpoint := state.CurrentJustifiedCheckpoint() + if err := s.insertBlockToForkChoiceStore(ctx, blk, root, fCheckpoint, jCheckpoint); err != nil { return err } - - // Feed in block to fork choice store. - if err := s.forkChoiceStore.ProcessBlock(ctx, - blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot), bytesutil.ToBytes32(blk.Body.Graffiti), - state.CurrentJustifiedCheckpoint().Epoch, - state.FinalizedCheckpointEpoch()); err != nil { - return errors.Wrap(err, "could not process block for proto array fork choice") - } - // Feed in block's attestations to fork choice store. for _, a := range blk.Body.Attestations { committee, err := helpers.BeaconCommitteeFromState(state, a.Data.Slot, a.Data.CommitteeIndex) @@ -286,6 +387,20 @@ func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.B indices := attestationutil.AttestingIndices(a.AggregationBits, committee) s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch) } + return nil +} +func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, + root [32]byte, fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error { + if err := s.fillInForkChoiceMissingBlocks(ctx, blk, fCheckpoint, jCheckpoint); err != nil { + return err + } + // Feed in block to fork choice store. + if err := s.forkChoiceStore.ProcessBlock(ctx, + blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot), bytesutil.ToBytes32(blk.Body.Graffiti), + jCheckpoint.Epoch, + fCheckpoint.Epoch); err != nil { + return errors.Wrap(err, "could not process block for proto array fork choice") + } return nil } diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index 12220d60719..9899b404f1f 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -295,7 +295,8 @@ func (s *Service) finalizedImpliesNewJustified(ctx context.Context, state *state // This retrieves missing blocks from DB (ie. the blocks that couldn't be received over sync) and inserts them to fork choice store. // This is useful for block tree visualizer and additional vote accounting. -func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.BeaconBlock, state *stateTrie.BeaconState) error { +func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.BeaconBlock, + fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error { pendingNodes := make([]*ethpb.BeaconBlock, 0) parentRoot := bytesutil.ToBytes32(blk.ParentRoot) @@ -326,8 +327,8 @@ func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb. if err := s.forkChoiceStore.ProcessBlock(ctx, b.Slot, r, bytesutil.ToBytes32(b.ParentRoot), bytesutil.ToBytes32(b.Body.Graffiti), - state.CurrentJustifiedCheckpoint().Epoch, - state.FinalizedCheckpointEpoch()); err != nil { + jCheckpoint.Epoch, + fCheckpoint.Epoch); err != nil { return errors.Wrap(err, "could not process block for proto array fork choice") } } diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 786fa200970..b2d03624513 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -10,6 +10,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" + "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray" @@ -129,6 +130,65 @@ func TestStore_OnBlock(t *testing.T) { } } +func TestStore_OnBlockBatch(t *testing.T) { + ctx := context.Background() + db, sc := testDB.SetupDB(t) + + cfg := &Config{ + BeaconDB: db, + StateGen: stategen.New(db, sc), + } + service, err := NewService(ctx, cfg) + if err != nil { + t.Fatal(err) + } + + genesisStateRoot := [32]byte{} + genesis := blocks.NewGenesisBlock(genesisStateRoot[:]) + if err := db.SaveBlock(ctx, genesis); err != nil { + t.Error(err) + } + + st, keys := testutil.DeterministicGenesisState(t, 64) + + bState := st.Copy() + + blks := []*ethpb.SignedBeaconBlock{} + blkRoots := [][32]byte{} + var firstState *stateTrie.BeaconState + for i := 1; i < 10; i++ { + b, err := testutil.GenerateFullBlock(bState, keys, testutil.DefaultBlockGenConfig(), uint64(i)) + if err != nil { + t.Fatal(err) + } + bState, err = state.ExecuteStateTransition(ctx, bState, b) + if err != nil { + t.Fatal(err) + } + if i == 1 { + firstState = bState.Copy() + } + root, err := stateutil.BlockRoot(b.Block) + if err != nil { + t.Fatal(err) + } + blks = append(blks, b) + blkRoots = append(blkRoots, root) + } + err = db.SaveBlock(context.Background(), blks[0]) + if err != nil { + t.Fatal(err) + } + err = service.stateGen.SaveState(ctx, blkRoots[0], firstState) + if err != nil { + t.Fatal(err) + } + _, _, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:]) + if err != nil { + t.Fatal(err) + } +} + func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) { ctx := context.Background() db, _ := testDB.SetupDB(t) @@ -391,7 +451,8 @@ func TestFillForkChoiceMissingBlocks_CanSave(t *testing.T) { beaconState, _ := testutil.DeterministicGenesisState(t, 32) block := ðpb.BeaconBlock{Slot: 9, ParentRoot: roots[8], Body: ðpb.BeaconBlockBody{Graffiti: []byte{}}} - if err := service.fillInForkChoiceMissingBlocks(context.Background(), block, beaconState); err != nil { + if err := service.fillInForkChoiceMissingBlocks(context.Background(), block, + beaconState.FinalizedCheckpoint(), beaconState.CurrentJustifiedCheckpoint()); err != nil { t.Fatal(err) } @@ -462,7 +523,7 @@ func TestFillForkChoiceMissingBlocks_FilterFinalized(t *testing.T) { } beaconState, _ := testutil.DeterministicGenesisState(t, 32) - if err := service.fillInForkChoiceMissingBlocks(context.Background(), b65.Block, beaconState); err != nil { + if err := service.fillInForkChoiceMissingBlocks(context.Background(), b65.Block, beaconState.FinalizedCheckpoint(), beaconState.CurrentJustifiedCheckpoint()); err != nil { t.Fatal(err) } diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 1ea2ce9eb6a..5a7f39aa9aa 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -21,6 +21,7 @@ type BlockReceiver interface { ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error ReceiveBlockInitialSync(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error + ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedBeaconBlock, blkRoots [][32]byte) error HasInitSyncBlock(root [32]byte) bool } @@ -163,6 +164,70 @@ func (s *Service) ReceiveBlockInitialSync(ctx context.Context, block *ethpb.Sign return nil } +// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning +// the state, performing batch verification of all collected signatures and then performing the appropriate +// actions for a block post-transition. +func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedBeaconBlock, blkRoots [][32]byte) error { + ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockBatch") + defer span.End() + + // Apply state transition on the incoming newly received blockCopy without verifying its BLS contents. + postState, fCheckpoints, jCheckpoints, err := s.onBlockBatch(ctx, blocks, blkRoots) + if err != nil { + err := errors.Wrap(err, "could not process block") + traceutil.AnnotateError(span, err) + return err + } + + for i, b := range blocks { + blockCopy := stateTrie.CopySignedBeaconBlock(b) + if err = s.handleBlockAfterBatchVerify(ctx, blockCopy, blkRoots[i], fCheckpoints[i], jCheckpoints[i]); err != nil { + traceutil.AnnotateError(span, err) + return err + } + // Send notification of the processed block to the state feed. + s.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.BlockProcessed, + Data: &statefeed.BlockProcessedData{ + Slot: blockCopy.Block.Slot, + BlockRoot: blkRoots[i], + Verified: false, + }, + }) + + // Reports on blockCopy and fork choice metrics. + reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt) + + // Log state transition data. + log.WithFields(logrus.Fields{ + "slot": blockCopy.Block.Slot, + "attestations": len(blockCopy.Block.Body.Attestations), + "deposits": len(blockCopy.Block.Body.Deposits), + }).Debug("Finished applying state transition") + } + lastBlk := blocks[len(blocks)-1] + lastRoot := blkRoots[len(blkRoots)-1] + + if err := s.stateGen.SaveState(ctx, lastRoot, postState); err != nil { + return errors.Wrap(err, "could not save state") + } + + cachedHeadRoot, err := s.HeadRoot(ctx) + if err != nil { + return errors.Wrap(err, "could not get head root from cache") + } + + if !bytes.Equal(lastRoot[:], cachedHeadRoot) { + if err := s.saveHeadNoDB(ctx, lastBlk, lastRoot); err != nil { + err := errors.Wrap(err, "could not save head") + traceutil.AnnotateError(span, err) + return err + } + } + + return s.handleEpochBoundary(postState) +} + // HasInitSyncBlock returns true if the block of the input root exists in initial sync blocks cache. func (s *Service) HasInitSyncBlock(root [32]byte) bool { return s.hasInitSyncBlock(root) diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index eedb81dcbf1..efb29e05e96 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -410,7 +410,7 @@ func TestHasBlock_ForkChoiceAndDB(t *testing.T) { if err != nil { t.Fatal(err) } - if err := s.insertBlockToForkChoiceStore(ctx, block.Block, r, state); err != nil { + if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, block.Block, r, state); err != nil { t.Fatal(err) } @@ -464,7 +464,7 @@ func BenchmarkHasBlockForkChoiceStore(b *testing.B) { if err != nil { b.Fatal(err) } - if err := s.insertBlockToForkChoiceStore(ctx, block.Block, r, state); err != nil { + if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, block.Block, r, state); err != nil { b.Fatal(err) } diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index 86e54529253..bf1d681cb3c 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -171,6 +171,35 @@ func (ms *ChainService) ReceiveBlockInitialSync(ctx context.Context, block *ethp return nil } +// ReceiveBlockBatch processes blocks in batches from initial-sync. +func (ms *ChainService) ReceiveBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBlock, roots [][32]byte) error { + if ms.State == nil { + ms.State = &stateTrie.BeaconState{} + } + for _, block := range blks { + if !bytes.Equal(ms.Root, block.Block.ParentRoot) { + return errors.Errorf("wanted %#x but got %#x", ms.Root, block.Block.ParentRoot) + } + if err := ms.State.SetSlot(block.Block.Slot); err != nil { + return err + } + ms.BlocksReceived = append(ms.BlocksReceived, block) + signingRoot, err := stateutil.BlockRoot(block.Block) + if err != nil { + return err + } + if ms.DB != nil { + if err := ms.DB.SaveBlock(ctx, block); err != nil { + return err + } + logrus.Infof("Saved block with root: %#x at slot %d", signingRoot, block.Block.Slot) + } + ms.Root = signingRoot[:] + ms.Block = block + } + return nil +} + // ReceiveBlockNoPubsub mocks ReceiveBlockNoPubsub method in chain service. func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { if ms.State == nil { diff --git a/beacon-chain/state/stategen/hot.go b/beacon-chain/state/stategen/hot.go index 1f3908798c2..7b2ff25cc79 100644 --- a/beacon-chain/state/stategen/hot.go +++ b/beacon-chain/state/stategen/hot.go @@ -4,6 +4,7 @@ import ( "context" "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/state" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -21,6 +22,16 @@ func (s *State) HasState(ctx context.Context, blockRoot [32]byte) bool { return s.beaconDB.HasState(ctx, blockRoot) } +// SaveStateSummary saves the relevant state summary for a block and its corresponding state slot in the +// state summary cache. +func (s *State) SaveStateSummary(ctx context.Context, blk *ethpb.SignedBeaconBlock, blockRoot [32]byte) { + // Save State summary + s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{ + Slot: blk.Block.Slot, + Root: blockRoot[:], + }) +} + // This saves a post finalized beacon state in the hot section of the DB. On the epoch boundary, // it saves a full state. On an intermediate slot, it saves a back pointer to the // nearest epoch boundary state. diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index dafb635fe23..c8043cd8b05 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//proto/beacon/p2p/v1:go_default_library", "//shared:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/featureconfig:go_default_library", "//shared/mathutil:go_default_library", "//shared/params:go_default_library", "//shared/rand:go_default_library", diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 1e3e8d4cb6f..9622bd93a73 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -1,8 +1,10 @@ package initialsync import ( + "bytes" "context" "encoding/hex" + "errors" "fmt" "time" @@ -13,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/mathutil" "github.com/sirupsen/logrus" ) @@ -27,6 +30,8 @@ const ( // blockReceiverFn defines block receiving function. type blockReceiverFn func(ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error +type batchBlockReceiverFn func(ctx context.Context, blks []*eth.SignedBeaconBlock, roots [][32]byte) error + // Round Robin sync looks at the latest peer statuses and syncs with the highest // finalized peer. // @@ -55,10 +60,19 @@ func (s *Service) roundRobinSync(genesis time.Time) error { if err := queue.start(); err != nil { return err } + blockReceiver := s.chain.ReceiveBlockInitialSync + batchReceiver := s.chain.ReceiveBlockBatch // Step 1 - Sync to end of finalized epoch. for fetchedBlocks := range queue.fetchedBlocks { + // Use Batch Block Verify to process and verify batches directly. + if featureconfig.Get().BatchBlockVerify { + if err := s.processBatchedBlocks(ctx, genesis, fetchedBlocks, batchReceiver); err != nil { + log.WithError(err).Info("Batch is not processed") + } + continue + } for _, blk := range fetchedBlocks { if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil { log.WithError(err).Info("Block is not processed") @@ -158,6 +172,25 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, blkRoot ) } +// logBatchSyncStatus and increments the block processing counter. +func (s *Service) logBatchSyncStatus(genesis time.Time, blks []*eth.SignedBeaconBlock, blkRoot [32]byte) { + s.counter.Incr(int64(len(blks))) + rate := float64(s.counter.Rate()) / counterSeconds + if rate == 0 { + rate = 1 + } + firstBlk := blks[0] + timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-firstBlk.Block.Slot)/rate) * time.Second + log.WithFields(logrus.Fields{ + "peers": len(s.p2p.Peers().Connected()), + "blocksPerSecond": fmt.Sprintf("%.1f", rate), + }).Infof( + "Processing block batch of size %d starting from %s %d/%d - estimated time remaining %s", + len(blks), fmt.Sprintf("0x%s...", hex.EncodeToString(blkRoot[:])[:8]), + firstBlk.Block.Slot, helpers.SlotsSince(genesis), timeRemaining, + ) +} + // processBlock performs basic checks on incoming block, and triggers receiver function. func (s *Service) processBlock( ctx context.Context, @@ -183,3 +216,47 @@ func (s *Service) processBlock( s.lastProcessedSlot = blk.Block.Slot return nil } + +func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time, + blks []*eth.SignedBeaconBlock, bFunc batchBlockReceiverFn) error { + if len(blks) == 0 { + return errors.New("0 blocks provided into method") + } + firstBlock := blks[0] + for s.lastProcessedSlot >= firstBlock.Block.Slot { + if len(blks) == 1 { + return errors.New("no good blocks in batch") + } + blks = blks[1:] + firstBlock = blks[0] + } + blkRoot, err := stateutil.BlockRoot(firstBlock.Block) + if err != nil { + return err + } + s.logBatchSyncStatus(genesis, blks, blkRoot) + parentRoot := bytesutil.ToBytes32(firstBlock.Block.ParentRoot) + if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) { + return fmt.Errorf("beacon node doesn't have a block in db with root %#x", firstBlock.Block.ParentRoot) + } + blockRoots := make([][32]byte, len(blks)) + blockRoots[0] = blkRoot + for i := 1; i < len(blks); i++ { + b := blks[i] + if !bytes.Equal(b.Block.ParentRoot, blockRoots[i-1][:]) { + return fmt.Errorf("expected linear block list with parent root of %#x but received %#x", + blockRoots[i-1][:], b.Block.ParentRoot) + } + blkRoot, err := stateutil.BlockRoot(b.Block) + if err != nil { + return err + } + blockRoots[i] = blkRoot + } + if err := bFunc(ctx, blks, blockRoots); err != nil { + return err + } + lastBlk := blks[len(blks)-1] + s.lastProcessedSlot = lastBlk.Block.Slot + return nil +} diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index fe39c30ee89..11325d43521 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -3,6 +3,7 @@ package initialsync import ( "context" "fmt" + "strings" "testing" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -368,3 +369,136 @@ func TestService_processBlock(t *testing.T) { } }) } + +func TestService_processBlockBatch(t *testing.T) { + beaconDB, _ := dbtest.SetupDB(t) + genesisBlk := ð.BeaconBlock{ + Slot: 0, + } + genesisBlkRoot, err := stateutil.BlockRoot(genesisBlk) + if err != nil { + t.Fatal(err) + } + err = beaconDB.SaveBlock(context.Background(), ð.SignedBeaconBlock{Block: genesisBlk}) + if err != nil { + t.Fatal(err) + } + st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{}) + if err != nil { + t.Fatal(err) + } + s := NewInitialSync(&Config{ + P2P: p2pt.NewTestP2P(t), + DB: beaconDB, + Chain: &mock.ChainService{ + State: st, + Root: genesisBlkRoot[:], + DB: beaconDB, + }, + }) + ctx := context.Background() + genesis := makeGenesisTime(32) + + t.Run("process non-linear batch", func(t *testing.T) { + batch := []*eth.SignedBeaconBlock{} + currBlockRoot := genesisBlkRoot + for i := 1; i < 10; i++ { + parentRoot := currBlockRoot + blk1 := ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: uint64(i), + ParentRoot: parentRoot[:], + }, + } + blk1Root, err := stateutil.BlockRoot(blk1.Block) + if err != nil { + t.Fatal(err) + } + err = beaconDB.SaveBlock(context.Background(), blk1) + if err != nil { + t.Fatal(err) + } + batch = append(batch, blk1) + currBlockRoot = blk1Root + } + + batch2 := []*eth.SignedBeaconBlock{} + for i := 10; i < 20; i++ { + parentRoot := currBlockRoot + blk1 := ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: uint64(i), + ParentRoot: parentRoot[:], + }, + } + blk1Root, err := stateutil.BlockRoot(blk1.Block) + if err != nil { + t.Fatal(err) + } + err = beaconDB.SaveBlock(context.Background(), blk1) + if err != nil { + t.Fatal(err) + } + batch2 = append(batch2, blk1) + currBlockRoot = blk1Root + } + + // Process block normally. + err = s.processBatchedBlocks(ctx, genesis, batch, func( + ctx context.Context, blks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error { + if err := s.chain.ReceiveBlockBatch(ctx, blks, blockRoots); err != nil { + t.Error(err) + } + return nil + }) + if err != nil { + t.Error(err) + } + + // Duplicate processing should trigger error. + err = s.processBatchedBlocks(ctx, genesis, batch, func( + ctx context.Context, blocks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error { + return nil + }) + expectedErr := fmt.Errorf("no good blocks in batch") + if err == nil || err.Error() != expectedErr.Error() { + t.Errorf("Expected error not thrown, want: %v, got: %v", expectedErr, err) + } + + badBatch2 := []*eth.SignedBeaconBlock{} + + for i, b := range batch2 { + // create a non-linear batch + if i%3 == 0 && i != 0 { + continue + } + badBatch2 = append(badBatch2, b) + } + + // Bad batch should fail because it is non linear + err = s.processBatchedBlocks(ctx, genesis, badBatch2, func( + ctx context.Context, blks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error { + return nil + }) + expectedSubErr := "expected linear block list" + if err == nil || !strings.Contains(err.Error(), expectedSubErr) { + t.Errorf("Expected error not thrown, wanted error to include: %v, got: %v", expectedSubErr, err) + } + + // Continue normal processing, should proceed w/o errors. + err = s.processBatchedBlocks(ctx, genesis, batch2, func( + ctx context.Context, blks []*eth.SignedBeaconBlock, blockRoots [][32]byte) error { + if err := s.chain.ReceiveBlockBatch(ctx, blks, blockRoots); err != nil { + t.Error(err) + } + return nil + }) + if err != nil { + t.Error(err) + } + + if s.chain.HeadSlot() != 19 { + t.Errorf("Unexpected head slot, want: %d, got: %d", 2, s.chain.HeadSlot()) + } + }) +} diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index 5abe535a982..3fe36d6c8ae 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -55,6 +55,7 @@ type Flags struct { WaitForSynced bool // WaitForSynced uses WaitForSynced in validator startup to ensure it can communicate with the beacon node as soon as possible. SkipRegenHistoricalStates bool // SkipRegenHistoricalState skips regenerating historical states from genesis to last finalized. This enables a quick switch over to using new-state-mgmt. ReduceAttesterStateCopy bool // ReduceAttesterStateCopy reduces head state copies for attester rpc. + BatchBlockVerify bool // BatchBlockVerify performs batched verification of block batches that we receive when syncing. // DisableForkChoice disables using LMD-GHOST fork choice to update // the head of the chain based on attestations and instead accepts any valid received block // as the chain head. UNSAFE, use with caution. @@ -226,6 +227,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { log.Warn("Forcing max_cover strategy on attestation aggregation") cfg.AttestationAggregationStrategy = "max_cover" } + if ctx.Bool(batchBlockVerify.Name) { + log.Warn("Performing batch block verification when syncing.") + cfg.BatchBlockVerify = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 1c1f89df90e..834457c1029 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -148,6 +148,10 @@ var ( Name: "altona", Usage: "This defines the flag through which we can run on the Altona Multiclient Testnet", } + batchBlockVerify = &cli.BoolFlag{ + Name: "batch-block-verify", + Usage: "When enabled we will perform full signature verification of blocks in batches instead of singularly.", + } ) // devModeFlags holds list of flags that are set when development mode is on. @@ -155,6 +159,7 @@ var devModeFlags = []cli.Flag{ initSyncVerifyEverythingFlag, forceMaxCoverAttestationAggregation, newBeaconStateLocks, + batchBlockVerify, } // Deprecated flags list. @@ -589,6 +594,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ newBeaconStateLocks, forceMaxCoverAttestationAggregation, altonaTestnet, + batchBlockVerify, }...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E.