Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init-sync queue assert order and uniqueness of blocks emitted #6146

Merged
merged 17 commits into from
Jun 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 41 additions & 49 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (

"github.com/paulbellamy/ratecounter"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
Expand All @@ -27,6 +25,9 @@ const (
refreshTime = 6 * time.Second
)

// blockReceiverFn defines block receiving function.
type blockReceiverFn func(ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error

// Round Robin sync looks at the latest peer statuses and syncs with the highest
// finalized peer.
//
Expand All @@ -44,7 +45,8 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
state.SkipSlotCache.Disable()
defer state.SkipSlotCache.Enable()

counter := ratecounter.NewRateCounter(counterSeconds * time.Second)
s.counter = ratecounter.NewRateCounter(counterSeconds * time.Second)
s.lastProcessedSlot = s.chain.HeadSlot()
highestFinalizedSlot := helpers.StartSlot(s.highestFinalizedEpoch() + 1)
queue := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.p2p,
Expand All @@ -54,17 +56,17 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
if err := queue.start(); err != nil {
return err
}
var blockReceiver blockReceiverFn
if featureconfig.Get().InitSyncNoVerify {
blockReceiver = s.chain.ReceiveBlockNoVerify
} else {
blockReceiver = s.chain.ReceiveBlockNoPubsubForkchoice
}

// Step 1 - Sync to end of finalized epoch.
for blk := range queue.fetchedBlocks {
s.logSyncStatus(genesis, blk.Block, counter)
root, err := stateutil.BlockRoot(blk.Block)
if err != nil {
log.WithError(err).Info("Cannot determine root of block")
continue
}
if err := s.processBlock(ctx, blk, root); err != nil {
log.WithError(err).Info("Block is invalid")
if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil {
log.WithError(err).Info("Block is not processed")
continue
}
}
Expand Down Expand Up @@ -109,14 +111,9 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
log.WithError(err).Error("Failed to receive blocks, exiting init sync")
return nil
}

for _, blk := range resp {
s.logSyncStatus(genesis, blk.Block, counter)
root, err := stateutil.BlockRoot(blk.Block)
err := s.processBlock(ctx, genesis, blk, s.chain.ReceiveBlockNoPubsubForkchoice)
if err != nil {
return err
}
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk, root); err != nil {
log.WithError(err).Error("Failed to process block, exiting init sync")
return nil
}
Expand Down Expand Up @@ -144,50 +141,45 @@ func (s *Service) highestFinalizedEpoch() uint64 {
}

// logSyncStatus and increment block processing counter.
func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, counter *ratecounter.RateCounter) {
counter.Incr(1)
rate := float64(counter.Rate()) / counterSeconds
func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, blkRoot [32]byte) {
s.counter.Incr(1)
rate := float64(s.counter.Rate()) / counterSeconds
if rate == 0 {
rate = 1
}
timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-blk.Slot)/rate) * time.Second
blockRoot := "unknown"
root, err := stateutil.BlockRoot(blk)
if err == nil {
blockRoot = fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8])
}
log.WithField(
"peers",
len(s.p2p.Peers().Connected()),
).WithField(
"blocksPerSecond",
fmt.Sprintf("%.1f", rate),
).Infof(
log.WithFields(logrus.Fields{
"peers": len(s.p2p.Peers().Connected()),
"blocksPerSecond": fmt.Sprintf("%.1f", rate),
}).Infof(
"Processing block %s %d/%d - estimated time remaining %s",
blockRoot,
blk.Slot,
helpers.SlotsSince(genesis),
timeRemaining,
fmt.Sprintf("0x%s...", hex.EncodeToString(blkRoot[:])[:8]),
blk.Slot, helpers.SlotsSince(genesis), timeRemaining,
)
}

func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock, blockRoot [32]byte) error {
// processBlock performs basic checks on incoming block, and triggers receiver function.
func (s *Service) processBlock(
ctx context.Context,
genesis time.Time,
blk *eth.SignedBeaconBlock,
blockReceiver blockReceiverFn,
) error {
if blk.Block.Slot <= s.lastProcessedSlot {
return fmt.Errorf("slot %d already processed", blk.Block.Slot)
}
blkRoot, err := stateutil.BlockRoot(blk.Block)
if err != nil {
return err
}
s.logSyncStatus(genesis, blk.Block, blkRoot)
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) {
return fmt.Errorf("beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
}
s.blockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: blk},
})
if featureconfig.Get().InitSyncNoVerify {
if err := s.chain.ReceiveBlockNoVerify(ctx, blk, blockRoot); err != nil {
return err
}
} else {
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk, blockRoot); err != nil {
return err
}
if err := blockReceiver(ctx, blk, blkRoot); err != nil {
return err
}
s.lastProcessedSlot = blk.Block.Slot
return nil
}
102 changes: 95 additions & 7 deletions beacon-chain/sync/initial-sync/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package initialsync

import (
"context"
"fmt"
"testing"

eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
Expand All @@ -21,7 +23,7 @@ func TestConstants(t *testing.T) {
}
}

func TestRoundRobinSync(t *testing.T) {
func TestService_roundRobinSync(t *testing.T) {
tests := []struct {
name string
currentSlot uint64
Expand Down Expand Up @@ -254,12 +256,11 @@ func TestRoundRobinSync(t *testing.T) {
DB: beaconDB,
} // no-op mock
s := &Service{
chain: mc,
blockNotifier: mc.BlockNotifier(),
p2p: p,
db: beaconDB,
synced: false,
chainStarted: true,
chain: mc,
p2p: p,
db: beaconDB,
synced: false,
chainStarted: true,
}
if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil {
t.Error(err)
Expand All @@ -280,3 +281,90 @@ func TestRoundRobinSync(t *testing.T) {
})
}
}

func TestService_processBlock(t *testing.T) {
beaconDB := dbtest.SetupDB(t)
genesisBlk := &eth.BeaconBlock{
Slot: 0,
}
genesisBlkRoot, err := stateutil.BlockRoot(genesisBlk)
if err != nil {
t.Fatal(err)
}
err = beaconDB.SaveBlock(context.Background(), &eth.SignedBeaconBlock{Block: genesisBlk})
if err != nil {
t.Fatal(err)
}
st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{})
if err != nil {
t.Fatal(err)
}
s := NewInitialSync(&Config{
P2P: p2pt.NewTestP2P(t),
DB: beaconDB,
Chain: &mock.ChainService{
State: st,
Root: genesisBlkRoot[:],
DB: beaconDB,
},
})
ctx := context.Background()
genesis := makeGenesisTime(32)

t.Run("process duplicate block", func(t *testing.T) {
blk1 := &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: 1,
ParentRoot: genesisBlkRoot[:],
},
}
blk1Root, err := stateutil.BlockRoot(blk1.Block)
if err != nil {
t.Fatal(err)
}
blk2 := &eth.SignedBeaconBlock{
Block: &eth.BeaconBlock{
Slot: 2,
ParentRoot: blk1Root[:],
},
}

// Process block normally.
err = s.processBlock(ctx, genesis, blk1, func(
ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error {
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, block, blockRoot); err != nil {
t.Error(err)
}
return nil
})
if err != nil {
t.Error(err)
}

// Duplicate processing should trigger error.
err = s.processBlock(ctx, genesis, blk1, func(
ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error {
return nil
})
expectedErr := fmt.Errorf("slot %d already processed", blk1.Block.Slot)
if err == nil || err.Error() != expectedErr.Error() {
t.Errorf("Expected error not thrown, want: %v, got: %v", expectedErr, err)
}

// Continue normal processing, should proceed w/o errors.
err = s.processBlock(ctx, genesis, blk2, func(
ctx context.Context, block *eth.SignedBeaconBlock, blockRoot [32]byte) error {
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, block, blockRoot); err != nil {
t.Error(err)
}
return nil
})
if err != nil {
t.Error(err)
}

if s.chain.HeadSlot() != 2 {
t.Errorf("Unexpected head slot, want: %d, got: %d", 2, s.chain.HeadSlot())
}
})
}
22 changes: 12 additions & 10 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"time"

"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
Expand Down Expand Up @@ -43,15 +44,16 @@ type Config struct {

// Service service.
type Service struct {
ctx context.Context
cancel context.CancelFunc
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
synced bool
chainStarted bool
stateNotifier statefeed.Notifier
blockNotifier blockfeed.Notifier
ctx context.Context
cancel context.CancelFunc
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
synced bool
chainStarted bool
stateNotifier statefeed.Notifier
counter *ratecounter.RateCounter
lastProcessedSlot uint64
}

// NewInitialSync configures the initial sync service responsible for bringing the node up to the
Expand All @@ -65,7 +67,7 @@ func NewInitialSync(cfg *Config) *Service {
p2p: cfg.P2P,
db: cfg.DB,
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
}
}

Expand Down