Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch save blocks for initial sync. 80% faster BPS #5215

Merged
merged 28 commits into from Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
512ef28
Starting a quick PoC
terencechain Mar 25, 2020
7a370b0
Rate limit to one epoch worth of blocks in memory
terencechain Mar 26, 2020
5ef3f0b
Proof of concept working
terencechain Mar 26, 2020
96d78ea
Quick comment out
terencechain Mar 26, 2020
4197c1d
Save previous finalized checkpoint
terencechain Mar 26, 2020
15d629e
Merge branch 'master' of github.com:prysmaticlabs/prysm into batch-save
terencechain Mar 26, 2020
cf92a43
Test
terencechain Mar 26, 2020
b6013af
Merge branch 'prev-finalized-getter' into batch-save
terencechain Mar 26, 2020
7e8c0b9
Minor fixes
terencechain Mar 26, 2020
14e6379
Use a map
terencechain Mar 28, 2020
7e7273a
More run time fixes
terencechain Mar 28, 2020
9ba5b11
Remove panic
terencechain Mar 28, 2020
0281a5f
Feature flag
terencechain Mar 28, 2020
8df754c
Removed unused methods
terencechain Mar 28, 2020
1b6f64c
Fixed tests
terencechain Mar 28, 2020
09729aa
E2e test
terencechain Mar 28, 2020
fc2589e
Merge branch 'master' into batch-save
terencechain Mar 28, 2020
1b7f614
comment
terencechain Mar 28, 2020
3b52bd1
Merge branch 'master' into batch-save
terencechain Mar 28, 2020
9e88161
Compatible with current initial sync
terencechain Mar 28, 2020
280ea97
Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into batc…
terencechain Mar 28, 2020
e7ee4d5
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 29, 2020
a0fad8a
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 29, 2020
271b489
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 29, 2020
add721e
Merge branch 'master' of github.com:prysmaticlabs/prysm into batch-save
terencechain Mar 30, 2020
acfba22
Feedback
terencechain Mar 30, 2020
ef3263f
Merge branch 'batch-save' of github.com:prysmaticlabs/prysm into batc…
terencechain Mar 30, 2020
f3b415f
Merge refs/heads/master into batch-save
prylabs-bulldozer[bot] Mar 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 63 additions & 3 deletions beacon-chain/blockchain/init_sync_process_block.go
Expand Up @@ -5,9 +5,11 @@ import (
"sort"

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
)

Expand Down Expand Up @@ -170,10 +172,23 @@ func (s *Service) generateState(ctx context.Context, startRoot [32]byte, endRoot
if preState == nil {
return nil, errors.New("finalized state does not exist in db")
}
endBlock, err := s.beaconDB.Block(ctx, endRoot)
if err != nil {
return nil, err

var endBlock *ethpb.SignedBeaconBlock
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(endRoot) {
endBlock = s.getInitSyncBlock(endRoot)
if featureconfig.Get().InitSyncBatchSaveBlocks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this condition here? Wouldn't it already be true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why check again ? this will always be true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return nil, err
}
s.clearInitSyncBlocks()
}
} else {
endBlock, err = s.beaconDB.Block(ctx, endRoot)
if err != nil {
return nil, err
}
}

if endBlock == nil {
return nil, errors.New("provided block root does not have block saved in the db")
}
Expand All @@ -189,3 +204,48 @@ func (s *Service) generateState(ctx context.Context, startRoot [32]byte, endRoot
}
return postState, nil
}

// This saves a beacon block to the initial sync blocks cache.
func (s *Service) saveInitSyncBlock(r [32]byte, b *ethpb.SignedBeaconBlock) {
s.initSyncBlocksLock.Lock()
defer s.initSyncBlocksLock.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a lock for this ? initial sync is single-threaded , so there isnt a risk of concurrent writes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried without the lock, the performance is neglectable. Just figure be safe than sorry later if initial sync moves to multi threaded

s.initSyncBlocks[r] = b
}

// This checks if a beacon block exists in the initial sync blocks cache using the root
// of the block.
func (s *Service) hasInitSyncBlock(r [32]byte) bool {
s.initSyncBlocksLock.RLock()
defer s.initSyncBlocksLock.RUnlock()
_, ok := s.initSyncBlocks[r]
return ok
}

// This retrieves a beacon block from the initial sync blocks cache using the root of
// the block.
func (s *Service) getInitSyncBlock(r [32]byte) *ethpb.SignedBeaconBlock {
s.initSyncBlocksLock.RLock()
defer s.initSyncBlocksLock.RUnlock()
b := s.initSyncBlocks[r]
return b
}

// This retrieves all the beacon blocks from the initial sync blocks cache, the returned
// blocks are unordered.
func (s *Service) getInitSyncBlocks() []*ethpb.SignedBeaconBlock {
s.initSyncBlocksLock.RLock()
defer s.initSyncBlocksLock.RUnlock()

blks := make([]*ethpb.SignedBeaconBlock, 0, len(s.initSyncBlocks))
for _, b := range s.initSyncBlocks {
blks = append(blks, b)
}
return blks
}

// This clears out the initial sync blocks cache.
func (s *Service) clearInitSyncBlocks() {
s.initSyncBlocksLock.Lock()
defer s.initSyncBlocksLock.Unlock()
s.initSyncBlocks = make(map[[32]byte]*ethpb.SignedBeaconBlock)
}
26 changes: 23 additions & 3 deletions beacon-chain/blockchain/process_block.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
Expand Down Expand Up @@ -210,13 +211,17 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
return errors.Wrap(err, "could not execute state transition")
}

if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
root, err := stateutil.BlockRoot(b)
if err != nil {
return errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
if featureconfig.Get().InitSyncBatchSaveBlocks {
s.saveInitSyncBlock(root, signed)
} else {
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
}

if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil {
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
Expand Down Expand Up @@ -247,6 +252,14 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
}
}

// Rate limit how many blocks (2 epochs worth of blocks) a node keeps in the memory.
if len(s.getInitSyncBlocks()) > 2*int(params.BeaconConfig().SlotsPerEpoch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this value a configurable constant at the start of the file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, done!

if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return err
}
s.clearInitSyncBlocks()
}

// Update finalized check point. Prune the block cache and helper caches on every new finalized epoch.
if postState.FinalizedCheckpointEpoch() > s.finalizedCheckpt.Epoch {
if !featureconfig.Get().NewStateMgmt {
Expand All @@ -264,6 +277,13 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
}
}

if featureconfig.Get().InitSyncBatchSaveBlocks {
if err := s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks()); err != nil {
return err
}
s.clearInitSyncBlocks()
}

if err := s.beaconDB.SaveFinalizedCheckpoint(ctx, postState.FinalizedCheckpoint()); err != nil {
return errors.Wrap(err, "could not save finalized checkpoint")
}
Expand Down
34 changes: 28 additions & 6 deletions beacon-chain/blockchain/process_block_helpers.go
Expand Up @@ -229,21 +229,36 @@ func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustified
if helpers.SlotsSinceEpochStarts(s.CurrentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
return true, nil
}
newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root))
if err != nil {
return false, err
var newJustifiedBlockSigned *ethpb.SignedBeaconBlock
justifiedRoot := bytesutil.ToBytes32(newJustifiedCheckpt.Root)
var err error
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(justifiedRoot) {
newJustifiedBlockSigned = s.getInitSyncBlock(justifiedRoot)
} else {
newJustifiedBlockSigned, err = s.beaconDB.Block(ctx, justifiedRoot)
if err != nil {
return false, err
}
}
if newJustifiedBlockSigned == nil || newJustifiedBlockSigned.Block == nil {
return false, errors.New("nil new justified block")
}

newJustifiedBlock := newJustifiedBlockSigned.Block
if newJustifiedBlock.Slot <= helpers.StartSlot(s.justifiedCheckpt.Epoch) {
return false, nil
}
justifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root))
if err != nil {
return false, err
var justifiedBlockSigned *ethpb.SignedBeaconBlock
cachedJustifiedRoot := bytesutil.ToBytes32(s.justifiedCheckpt.Root)
if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(cachedJustifiedRoot) {
justifiedBlockSigned = s.getInitSyncBlock(cachedJustifiedRoot)
} else {
justifiedBlockSigned, err = s.beaconDB.Block(ctx, cachedJustifiedRoot)
if err != nil {
return false, err
}
}

if justifiedBlockSigned == nil || justifiedBlockSigned.Block == nil {
return false, errors.New("nil justified block")
}
Expand All @@ -267,6 +282,7 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
if err != nil {
return err
}

if canUpdate {
s.prevJustifiedCheckpt = s.justifiedCheckpt
s.justifiedCheckpt = cpt
Expand All @@ -278,6 +294,7 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
justifiedState := s.initSyncState[justifiedRoot]
// If justified state is nil, resume back to normal syncing process and save
// justified check point.
var err error
if justifiedState == nil {
if s.beaconDB.HasState(ctx, justifiedRoot) {
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
Expand Down Expand Up @@ -376,6 +393,11 @@ func (s *Service) ancestor(ctx context.Context, root []byte, slot uint64) ([]byt
if err != nil {
return nil, errors.Wrap(err, "could not get ancestor block")
}

if featureconfig.Get().InitSyncBatchSaveBlocks && s.hasInitSyncBlock(bytesutil.ToBytes32(root)) {
signed = s.getInitSyncBlock(bytesutil.ToBytes32(root))
}

if signed == nil || signed.Block == nil {
return nil, errors.New("nil block")
}
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Expand Up @@ -26,6 +26,7 @@ type BlockReceiver interface {
ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error
ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error
ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error
HasInitSyncBlock(root [32]byte) bool
}

// ReceiveBlock is a function that defines the operations that are preformed on
Expand Down Expand Up @@ -236,3 +237,8 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB

return nil
}

// HasInitSyncBlock returns true if the block of the input root exists in initial sync blocks cache.
func (s *Service) HasInitSyncBlock(root [32]byte) bool {
return s.hasInitSyncBlock(root)
}
3 changes: 3 additions & 0 deletions beacon-chain/blockchain/service.go
Expand Up @@ -75,6 +75,8 @@ type Service struct {
checkpointStateLock sync.Mutex
stateGen *stategen.State
opsService *attestations.Service
initSyncBlocks map[[32]byte]*ethpb.SignedBeaconBlock
initSyncBlocksLock sync.RWMutex
}

// Config options for the service.
Expand Down Expand Up @@ -117,6 +119,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
checkpointState: cache.NewCheckpointStateCache(),
opsService: cfg.OpsService,
stateGen: cfg.StateGen,
initSyncBlocks: make(map[[32]byte]*ethpb.SignedBeaconBlock),
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -234,3 +234,8 @@ func (ms *ChainService) IsValidAttestation(ctx context.Context, att *ethpb.Attes

// ClearCachedStates does nothing.
func (ms *ChainService) ClearCachedStates() {}

// HasInitSyncBlock mocks the same method in the chain service.
func (ms *ChainService) HasInitSyncBlock(root [32]byte) bool {
return false
}
8 changes: 4 additions & 4 deletions beacon-chain/db/kv/blocks.go
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
Expand Down Expand Up @@ -227,16 +226,16 @@ func (k *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.SignedBeaconBloc
defer span.End()

return k.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(blocksBucket)
for _, block := range blocks {
if err := k.setBlockSlotBitField(ctx, tx, block.Block.Slot); err != nil {
return err
}

blockRoot, err := ssz.HashTreeRoot(block.Block)
blockRoot, err := stateutil.BlockRoot(block.Block)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

if err != nil {
return err
}
bkt := tx.Bucket(blocksBucket)

if existingBlock := bkt.Get(blockRoot[:]); existingBlock != nil {
continue
}
Expand All @@ -249,6 +248,7 @@ func (k *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.SignedBeaconBloc
return errors.Wrap(err, "could not update DB indices")
}
k.blockCache.Set(string(blockRoot[:]), block, int64(len(enc)))

if err := bkt.Put(blockRoot[:], enc); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/db/kv/checkpoint.go
Expand Up @@ -109,6 +109,7 @@ func (k *Store) SaveFinalizedCheckpoint(ctx context.Context, checkpoint *ethpb.C
if err := bucket.Put(finalizedCheckpointKey, enc); err != nil {
return err
}

return k.updateFinalizedBlockRoots(ctx, tx, checkpoint)
})
}
1 change: 1 addition & 0 deletions beacon-chain/db/kv/finalized_block_roots.go
Expand Up @@ -55,6 +55,7 @@ func (k *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, chec
return err
}
}

blockRoots, err := k.BlockRoots(ctx, filters.NewFilter().
SetStartEpoch(previousFinalizedCheckpoint.Epoch).
SetEndEpoch(checkpoint.Epoch+1),
Expand Down
20 changes: 16 additions & 4 deletions beacon-chain/sync/initial-sync-old/round_robin.go
Expand Up @@ -181,7 +181,17 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
}
}
}
startBlock := s.chain.HeadSlot() + 1
var startBlock uint64
if featureconfig.Get().InitSyncBatchSaveBlocks {
lastFinalizedEpoch := s.chain.FinalizedCheckpt().Epoch
lastFinalizedState, err := s.db.HighestSlotStatesBelow(ctx, helpers.StartSlot(lastFinalizedEpoch))
if err != nil {
return err
}
startBlock = lastFinalizedState[0].Slot() + 1
} else {
startBlock = s.chain.HeadSlot() + 1
}
skippedBlocks := blockBatchSize * uint64(lastEmptyRequests*len(peers))
if startBlock+skippedBlocks > helpers.StartSlot(finalizedEpoch+1) {
log.WithField("finalizedEpoch", finalizedEpoch).Debug("Requested block range is greater than the finalized epoch")
Expand All @@ -193,7 +203,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
1, // step
blockBatchSize, // count
peers, // peers
0, // remainder
0, // reminder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder? What does that mean in this context?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry it should have been remainder

)
if err != nil {
log.WithError(err).Error("Round robing sync request failed")
Expand All @@ -209,10 +219,12 @@ func (s *Service) roundRobinSync(genesis time.Time) error {

for _, blk := range blocks {
s.logSyncStatus(genesis, blk.Block, peers, counter)
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) {
log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) {
log.Debugf("Beacon node doesn't have a block in db or cache with root %#x", parentRoot)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be changed to WithField() for better log tracking?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not feature flag needed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to WithField(), feature flag not needed in this csae

continue
}

s.blockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/sync/initial-sync/round_robin.go
Expand Up @@ -198,7 +198,8 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, counter
}

func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock) error {
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) {
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) {
return fmt.Errorf("beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
}
s.blockNotifier.BlockFeed().Send(&feed.Event{
Expand Down
5 changes: 5 additions & 0 deletions shared/featureconfig/config.go
Expand Up @@ -53,6 +53,7 @@ type Flags struct {
EnableInitSyncQueue bool // EnableInitSyncQueue enables the new initial sync implementation.
EnableFieldTrie bool // EnableFieldTrie enables the state from using field specific tries when computing the root.
EnableBlockHTR bool // EnableBlockHTR enables custom hashing of our beacon blocks.
InitSyncBatchSaveBlocks bool // InitSyncBatchSaveBlocks enables batch save blocks mode during initial syncing.
// DisableForkChoice disables using LMD-GHOST fork choice to update
// the head of the chain based on attestations and instead accepts any valid received block
// as the chain head. UNSAFE, use with caution.
Expand Down Expand Up @@ -190,6 +191,10 @@ func ConfigureBeaconChain(ctx *cli.Context) {
log.Warn("Enabling custom block hashing")
cfg.EnableBlockHTR = true
}
if ctx.Bool(initSyncBatchSaveBlocks.Name) {
log.Warn("Enabling init sync batch save blocks mode")
cfg.InitSyncBatchSaveBlocks = true
}
Init(cfg)
}

Expand Down