Skip to content

Commit

Permalink
Batch Verify Blocks (#6469)
Browse files Browse the repository at this point in the history
* add everything so far
* checkpoint progress
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* fix
* checkpoint
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* checkpoint again
* checkpoint again
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* commenting
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* bls cleanup
* revert this back
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* revert core changes
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* add flag
* add test
* add one more test
* clean up
* comment
* lint
* terence's review
* Merge refs/heads/master into fastBLS
* Merge refs/heads/master into fastBLS
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* Merge refs/heads/master into fastBLS
* remove additional method
* Merge branch 'fastBLS' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
* fix
* Merge refs/heads/master into fastBLS
* copy
* Merge branch 'fastBLS' of https://github.com/prysmaticlabs/geth-sharding into fastBLS
  • Loading branch information
nisdas committed Jul 7, 2020
1 parent 8ddfde4 commit 64fa474
Show file tree
Hide file tree
Showing 13 changed files with 530 additions and 24 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Expand Up @@ -43,6 +43,7 @@ go_library(
"//beacon-chain/state/stateutil:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/attestationutil:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
Expand Down
149 changes: 132 additions & 17 deletions beacon-chain/blockchain/process_block.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock,
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}

if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}

Expand Down Expand Up @@ -202,17 +203,74 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
if err != nil {
return errors.Wrap(err, "could not execute state transition")
}
return s.handlePostStateInSync(ctx, signed, blockRoot, postState)
}

s.saveInitSyncBlock(blockRoot, signed)
func (s *Service) onBlockBatch(ctx context.Context, blks []*ethpb.SignedBeaconBlock,
blockRoots [][32]byte) (*stateTrie.BeaconState, []*ethpb.Checkpoint, []*ethpb.Checkpoint, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
defer span.End()

if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
if len(blks) == 0 || len(blockRoots) == 0 {
return nil, nil, nil, errors.New("no blocks provided")
}
if blks[0] == nil || blks[0].Block == nil {
return nil, nil, nil, errors.New("nil block")
}
b := blks[0].Block

// Retrieve incoming block's pre state.
preState, err := s.verifyBlkPreState(ctx, b)
if err != nil {
return nil, nil, nil, err
}
// Perform a copy to preserve copy in cache.
preState = preState.Copy()

jCheckpoints := make([]*ethpb.Checkpoint, len(blks))
fCheckpoints := make([]*ethpb.Checkpoint, len(blks))
sigSet := &bls.SignatureSet{
Signatures: []bls.Signature{},
PublicKeys: []bls.PublicKey{},
Messages: [][32]byte{},
}
set := new(bls.SignatureSet)
for i, b := range blks {
set, preState, err = state.ExecuteStateTransitionNoVerifyAnySig(ctx, preState, b)
if err != nil {
return nil, nil, nil, err
}
jCheckpoints[i] = preState.CurrentJustifiedCheckpoint()
fCheckpoints[i] = preState.FinalizedCheckpoint()
sigSet.Join(set)
}
verify, err := bls.VerifyMultipleSignatures(sigSet.Signatures, sigSet.Messages, sigSet.PublicKeys)
if err != nil {
return nil, nil, nil, err
}
if !verify {
return nil, nil, nil, errors.New("batch block signature verification failed")
}
return preState, fCheckpoints, jCheckpoints, nil
}

// handles the state post transition and saves the appropriate checkpoints and forkchoice
// data.
func (s *Service) handlePostStateInSync(ctx context.Context, signed *ethpb.SignedBeaconBlock,
blockRoot [32]byte, postState *stateTrie.BeaconState) error {

b := signed.Block

s.saveInitSyncBlock(blockRoot, signed)

if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil {
return errors.Wrap(err, "could not save state")
}

if err := s.insertBlockAndAttestationsToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}

// Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory.
if uint64(len(s.getInitSyncBlocks())) > initialSyncBlockCacheSize {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
Expand Down Expand Up @@ -245,7 +303,57 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
}
}

// Epoch boundary bookkeeping such as logging epoch summaries.
return s.handleEpochBoundary(postState)
}

// handles a block after the block's batch has been verified, where we can save blocks
// their state summaries and split them off to relative hot/cold storage.
func (s *Service) handleBlockAfterBatchVerify(ctx context.Context, signed *ethpb.SignedBeaconBlock,
blockRoot [32]byte, fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error {
b := signed.Block

s.saveInitSyncBlock(blockRoot, signed)
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, fCheckpoint, jCheckpoint); err != nil {
return err
}
s.stateGen.SaveStateSummary(ctx, signed, blockRoot)

// Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory.
if uint64(len(s.getInitSyncBlocks())) > initialSyncBlockCacheSize {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return err
}
s.clearInitSyncBlocks()
}

// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
if fCheckpoint.Epoch > s.finalizedCheckpt.Epoch {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return err
}
s.clearInitSyncBlocks()

if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, fCheckpoint); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
}

s.prevFinalizedCheckpt = s.finalizedCheckpt
s.finalizedCheckpt = fCheckpoint

fRoot := bytesutil.ToBytes32(fCheckpoint.Root)
fBlock, err := s.beaconDB.Block(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get finalized block to migrate")
}
if err := s.stateGen.MigrateToCold(ctx, fBlock.Block.Slot, fRoot); err != nil {
return errors.Wrap(err, "could not migrate to cold")
}
}
return nil
}

// Epoch boundary bookkeeping such as logging epoch summaries.
func (s *Service) handleEpochBoundary(postState *stateTrie.BeaconState) error {
if postState.Slot() >= s.nextEpochBoundarySlot {
reportEpochMetrics(postState)
s.nextEpochBoundarySlot = helpers.StartSlot(helpers.NextEpoch(postState))
Expand All @@ -258,25 +366,18 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
return err
}
}

return nil
}

// This feeds in the block and block's attestations to fork choice store. It's allows fork choice store
// to gain information on the most current chain.
func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte, state *stateTrie.BeaconState) error {
if err := s.fillInForkChoiceMissingBlocks(ctx, blk, state); err != nil {
func (s *Service) insertBlockAndAttestationsToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock, root [32]byte,
state *stateTrie.BeaconState) error {
fCheckpoint := state.FinalizedCheckpoint()
jCheckpoint := state.CurrentJustifiedCheckpoint()
if err := s.insertBlockToForkChoiceStore(ctx, blk, root, fCheckpoint, jCheckpoint); err != nil {
return err
}

// Feed in block to fork choice store.
if err := s.forkChoiceStore.ProcessBlock(ctx,
blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot), bytesutil.ToBytes32(blk.Body.Graffiti),
state.CurrentJustifiedCheckpoint().Epoch,
state.FinalizedCheckpointEpoch()); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}

// Feed in block's attestations to fork choice store.
for _, a := range blk.Body.Attestations {
committee, err := helpers.BeaconCommitteeFromState(state, a.Data.Slot, a.Data.CommitteeIndex)
Expand All @@ -286,6 +387,20 @@ func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.B
indices := attestationutil.AttestingIndices(a.AggregationBits, committee)
s.forkChoiceStore.ProcessAttestation(ctx, indices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch)
}
return nil
}

func (s *Service) insertBlockToForkChoiceStore(ctx context.Context, blk *ethpb.BeaconBlock,
root [32]byte, fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error {
if err := s.fillInForkChoiceMissingBlocks(ctx, blk, fCheckpoint, jCheckpoint); err != nil {
return err
}
// Feed in block to fork choice store.
if err := s.forkChoiceStore.ProcessBlock(ctx,
blk.Slot, root, bytesutil.ToBytes32(blk.ParentRoot), bytesutil.ToBytes32(blk.Body.Graffiti),
jCheckpoint.Epoch,
fCheckpoint.Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}
return nil
}
7 changes: 4 additions & 3 deletions beacon-chain/blockchain/process_block_helpers.go
Expand Up @@ -295,7 +295,8 @@ func (s *Service) finalizedImpliesNewJustified(ctx context.Context, state *state

// This retrieves missing blocks from DB (ie. the blocks that couldn't be received over sync) and inserts them to fork choice store.
// This is useful for block tree visualizer and additional vote accounting.
func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.BeaconBlock, state *stateTrie.BeaconState) error {
func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.BeaconBlock,
fCheckpoint *ethpb.Checkpoint, jCheckpoint *ethpb.Checkpoint) error {
pendingNodes := make([]*ethpb.BeaconBlock, 0)

parentRoot := bytesutil.ToBytes32(blk.ParentRoot)
Expand Down Expand Up @@ -326,8 +327,8 @@ func (s *Service) fillInForkChoiceMissingBlocks(ctx context.Context, blk *ethpb.

if err := s.forkChoiceStore.ProcessBlock(ctx,
b.Slot, r, bytesutil.ToBytes32(b.ParentRoot), bytesutil.ToBytes32(b.Body.Graffiti),
state.CurrentJustifiedCheckpoint().Epoch,
state.FinalizedCheckpointEpoch()); err != nil {
jCheckpoint.Epoch,
fCheckpoint.Epoch); err != nil {
return errors.Wrap(err, "could not process block for proto array fork choice")
}
}
Expand Down
65 changes: 63 additions & 2 deletions beacon-chain/blockchain/process_block_test.go
Expand Up @@ -10,6 +10,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/forkchoice/protoarray"
Expand Down Expand Up @@ -129,6 +130,65 @@ func TestStore_OnBlock(t *testing.T) {
}
}

func TestStore_OnBlockBatch(t *testing.T) {
ctx := context.Background()
db, sc := testDB.SetupDB(t)

cfg := &Config{
BeaconDB: db,
StateGen: stategen.New(db, sc),
}
service, err := NewService(ctx, cfg)
if err != nil {
t.Fatal(err)
}

genesisStateRoot := [32]byte{}
genesis := blocks.NewGenesisBlock(genesisStateRoot[:])
if err := db.SaveBlock(ctx, genesis); err != nil {
t.Error(err)
}

st, keys := testutil.DeterministicGenesisState(t, 64)

bState := st.Copy()

blks := []*ethpb.SignedBeaconBlock{}
blkRoots := [][32]byte{}
var firstState *stateTrie.BeaconState
for i := 1; i < 10; i++ {
b, err := testutil.GenerateFullBlock(bState, keys, testutil.DefaultBlockGenConfig(), uint64(i))
if err != nil {
t.Fatal(err)
}
bState, err = state.ExecuteStateTransition(ctx, bState, b)
if err != nil {
t.Fatal(err)
}
if i == 1 {
firstState = bState.Copy()
}
root, err := stateutil.BlockRoot(b.Block)
if err != nil {
t.Fatal(err)
}
blks = append(blks, b)
blkRoots = append(blkRoots, root)
}
err = db.SaveBlock(context.Background(), blks[0])
if err != nil {
t.Fatal(err)
}
err = service.stateGen.SaveState(ctx, blkRoots[0], firstState)
if err != nil {
t.Fatal(err)
}
_, _, _, err = service.onBlockBatch(ctx, blks[1:], blkRoots[1:])
if err != nil {
t.Fatal(err)
}
}

func TestRemoveStateSinceLastFinalized_EmptyStartSlot(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
Expand Down Expand Up @@ -391,7 +451,8 @@ func TestFillForkChoiceMissingBlocks_CanSave(t *testing.T) {

beaconState, _ := testutil.DeterministicGenesisState(t, 32)
block := &ethpb.BeaconBlock{Slot: 9, ParentRoot: roots[8], Body: &ethpb.BeaconBlockBody{Graffiti: []byte{}}}
if err := service.fillInForkChoiceMissingBlocks(context.Background(), block, beaconState); err != nil {
if err := service.fillInForkChoiceMissingBlocks(context.Background(), block,
beaconState.FinalizedCheckpoint(), beaconState.CurrentJustifiedCheckpoint()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -462,7 +523,7 @@ func TestFillForkChoiceMissingBlocks_FilterFinalized(t *testing.T) {
}

beaconState, _ := testutil.DeterministicGenesisState(t, 32)
if err := service.fillInForkChoiceMissingBlocks(context.Background(), b65.Block, beaconState); err != nil {
if err := service.fillInForkChoiceMissingBlocks(context.Background(), b65.Block, beaconState.FinalizedCheckpoint(), beaconState.CurrentJustifiedCheckpoint()); err != nil {
t.Fatal(err)
}

Expand Down

0 comments on commit 64fa474

Please sign in to comment.