diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 3e348cb53977..c0f77a32437f 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -8,8 +8,6 @@ import ( "github.com/paulbellamy/ratecounter" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" - blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" @@ -27,6 +25,9 @@ const ( refreshTime = 6 * time.Second ) +// blockReceiverFn defines block receiving function. +type blockReceiverFn func(ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error + // Round Robin sync looks at the latest peer statuses and syncs with the highest // finalized peer. // @@ -44,7 +45,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error { state.SkipSlotCache.Disable() defer state.SkipSlotCache.Enable() - counter := ratecounter.NewRateCounter(counterSeconds * time.Second) + s.counter = ratecounter.NewRateCounter(counterSeconds * time.Second) + s.lastProcessedSlot = s.chain.HeadSlot() highestFinalizedSlot := helpers.StartSlot(s.highestFinalizedEpoch() + 1) queue := newBlocksQueue(ctx, &blocksQueueConfig{ p2p: s.p2p, @@ -54,17 +56,17 @@ func (s *Service) roundRobinSync(genesis time.Time) error { if err := queue.start(); err != nil { return err } + var blockReceiver blockReceiverFn + if featureconfig.Get().InitSyncNoVerify { + blockReceiver = s.chain.ReceiveBlockNoVerify + } else { + blockReceiver = s.chain.ReceiveBlockNoPubsubForkchoice + } // Step 1 - Sync to end of finalized epoch. for blk := range queue.fetchedBlocks { - s.logSyncStatus(genesis, blk.Block, counter) - root, err := stateutil.BlockRoot(blk.Block) - if err != nil { - log.WithError(err).Info("Cannot determine root of block") - continue - } - if err := s.processBlock(ctx, blk, root); err != nil { - log.WithError(err).Info("Block is invalid") + if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil { + log.WithError(err).Info("Block is not processed") continue } } @@ -109,14 +111,9 @@ func (s *Service) roundRobinSync(genesis time.Time) error { log.WithError(err).Error("Failed to receive blocks, exiting init sync") return nil } - for _, blk := range resp { - s.logSyncStatus(genesis, blk.Block, counter) - root, err := stateutil.BlockRoot(blk.Block) + err := s.processBlock(ctx, genesis, blk, s.chain.ReceiveBlockNoPubsubForkchoice) if err != nil { - return err - } - if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk, root); err != nil { log.WithError(err).Error("Failed to process block, exiting init sync") return nil } @@ -144,50 +141,45 @@ func (s *Service) highestFinalizedEpoch() uint64 { } // logSyncStatus and increment block processing counter. -func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, counter *ratecounter.RateCounter) { - counter.Incr(1) - rate := float64(counter.Rate()) / counterSeconds +func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, blkRoot [32]byte) { + s.counter.Incr(1) + rate := float64(s.counter.Rate()) / counterSeconds if rate == 0 { rate = 1 } timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-blk.Slot)/rate) * time.Second - blockRoot := "unknown" - root, err := stateutil.BlockRoot(blk) - if err == nil { - blockRoot = fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8]) - } - log.WithField( - "peers", - len(s.p2p.Peers().Connected()), - ).WithField( - "blocksPerSecond", - fmt.Sprintf("%.1f", rate), - ).Infof( + log.WithFields(logrus.Fields{ + "peers": len(s.p2p.Peers().Connected()), + "blocksPerSecond": fmt.Sprintf("%.1f", rate), + }).Infof( "Processing block %s %d/%d - estimated time remaining %s", - blockRoot, - blk.Slot, - helpers.SlotsSince(genesis), - timeRemaining, + fmt.Sprintf("0x%s...", hex.EncodeToString(blkRoot[:])[:8]), + blk.Slot, helpers.SlotsSince(genesis), timeRemaining, ) } -func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock, blockRoot [32]byte) error { +// processBlock performs basic checks on incoming block, and triggers receiver function. +func (s *Service) processBlock( + ctx context.Context, + genesis time.Time, + blk *eth.SignedBeaconBlock, + blockReceiver blockReceiverFn, +) error { + if blk.Block.Slot <= s.lastProcessedSlot { + return fmt.Errorf("slot %d already processed", blk.Block.Slot) + } + blkRoot, err := stateutil.BlockRoot(blk.Block) + if err != nil { + return err + } + s.logSyncStatus(genesis, blk.Block, blkRoot) parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot) if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) { return fmt.Errorf("beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot) } - s.blockNotifier.BlockFeed().Send(&feed.Event{ - Type: blockfeed.ReceivedBlock, - Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, - }) - if featureconfig.Get().InitSyncNoVerify { - if err := s.chain.ReceiveBlockNoVerify(ctx, blk, blockRoot); err != nil { - return err - } - } else { - if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk, blockRoot); err != nil { - return err - } + if err := blockReceiver(ctx, blk, blkRoot); err != nil { + return err } + s.lastProcessedSlot = blk.Block.Slot return nil } diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index f9f0627b153f..e72b7e8c95e4 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -2,6 +2,7 @@ package initialsync import ( "context" + "fmt" "testing" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -10,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/flags" p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/sliceutil" @@ -21,7 +23,7 @@ func TestConstants(t *testing.T) { } } -func TestRoundRobinSync(t *testing.T) { +func TestService_roundRobinSync(t *testing.T) { tests := []struct { name string currentSlot uint64 @@ -254,12 +256,11 @@ func TestRoundRobinSync(t *testing.T) { DB: beaconDB, } // no-op mock s := &Service{ - chain: mc, - blockNotifier: mc.BlockNotifier(), - p2p: p, - db: beaconDB, - synced: false, - chainStarted: true, + chain: mc, + p2p: p, + db: beaconDB, + synced: false, + chainStarted: true, } if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil { t.Error(err) @@ -280,3 +281,90 @@ func TestRoundRobinSync(t *testing.T) { }) } } + +func TestService_processBlock(t *testing.T) { + beaconDB := dbtest.SetupDB(t) + genesisBlk := ð.BeaconBlock{ + Slot: 0, + } + genesisBlkRoot, err := stateutil.BlockRoot(genesisBlk) + if err != nil { + t.Fatal(err) + } + err = beaconDB.SaveBlock(context.Background(), ð.SignedBeaconBlock{Block: genesisBlk}) + if err != nil { + t.Fatal(err) + } + st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{}) + if err != nil { + t.Fatal(err) + } + s := NewInitialSync(&Config{ + P2P: p2pt.NewTestP2P(t), + DB: beaconDB, + Chain: &mock.ChainService{ + State: st, + Root: genesisBlkRoot[:], + DB: beaconDB, + }, + }) + ctx := context.Background() + genesis := makeGenesisTime(32) + + t.Run("process duplicate block", func(t *testing.T) { + blk1 := ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: 1, + ParentRoot: genesisBlkRoot[:], + }, + } + blk1Root, err := stateutil.BlockRoot(blk1.Block) + if err != nil { + t.Fatal(err) + } + blk2 := ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: 2, + ParentRoot: blk1Root[:], + }, + } + + // Process block normally. + err = s.processBlock(ctx, genesis, blk1, func( + ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error { + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, block, blockRoot); err != nil { + t.Error(err) + } + return nil + }) + if err != nil { + t.Error(err) + } + + // Duplicate processing should trigger error. + err = s.processBlock(ctx, genesis, blk1, func( + ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error { + return nil + }) + expectedErr := fmt.Errorf("slot %d already processed", blk1.Block.Slot) + if err == nil || err.Error() != expectedErr.Error() { + t.Errorf("Expected error not thrown, want: %v, got: %v", expectedErr, err) + } + + // Continue normal processing, should proceed w/o errors. + err = s.processBlock(ctx, genesis, blk2, func( + ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error { + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, block, blockRoot); err != nil { + t.Error(err) + } + return nil + }) + if err != nil { + t.Error(err) + } + + if s.chain.HeadSlot() != 2 { + t.Errorf("Unexpected head slot, want: %d, got: %d", 2, s.chain.HeadSlot()) + } + }) +} diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 04d45f601977..6f8877db22fb 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -7,6 +7,7 @@ import ( "context" "time" + "github.com/paulbellamy/ratecounter" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" @@ -43,15 +44,16 @@ type Config struct { // Service service. type Service struct { - ctx context.Context - cancel context.CancelFunc - chain blockchainService - p2p p2p.P2P - db db.ReadOnlyDatabase - synced bool - chainStarted bool - stateNotifier statefeed.Notifier - blockNotifier blockfeed.Notifier + ctx context.Context + cancel context.CancelFunc + chain blockchainService + p2p p2p.P2P + db db.ReadOnlyDatabase + synced bool + chainStarted bool + stateNotifier statefeed.Notifier + counter *ratecounter.RateCounter + lastProcessedSlot uint64 } // NewInitialSync configures the initial sync service responsible for bringing the node up to the @@ -65,7 +67,7 @@ func NewInitialSync(cfg *Config) *Service { p2p: cfg.P2P, db: cfg.DB, stateNotifier: cfg.StateNotifier, - blockNotifier: cfg.BlockNotifier, + counter: ratecounter.NewRateCounter(counterSeconds * time.Second), } }