Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.11 run time fixes to use interop config #5324

Merged
merged 11 commits into from
Apr 7, 2020
3 changes: 1 addition & 2 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ func (s *Service) Start() {
attestationProcessorSubscribed := make(chan struct{}, 1)

// If the chain has already been initialized, simply start the block processing routine.
// A node should still initialize rest of the chain service if the finalized state's slot is 0.
if beaconState != nil && beaconState.Slot() != 0 {
if beaconState != nil {
log.Info("Blockchain data already exists in DB, initializing...")
s.genesisTime = time.Unix(int64(beaconState.GenesisTime()), 0)
s.opsService.SetGenesisTime(beaconState.GenesisTime())
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/interop-cold-start/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ func (s *Service) saveGenesisState(ctx context.Context, genesisState *stateTrie.
if err := s.beaconDB.SaveBlock(ctx, genesisBlk); err != nil {
return errors.Wrap(err, "could not save genesis block")
}
if err := s.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{
Slot: 0,
Root: genesisBlkRoot[:],
}); err != nil {
return err
}
if err := s.beaconDB.SaveState(ctx, genesisState, genesisBlkRoot); err != nil {
return errors.Wrap(err, "could not save genesis state")
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
ExitPool: b.exitPool,
SlashingPool: b.slashingsPool,
StateSummaryCache: b.stateSummaryCache,
StateGen: b.stateGen,
})

return b.services.RegisterService(rs)
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_library(
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared:go_default_library",
"//shared/attestationutil:go_default_library",
Expand Down Expand Up @@ -127,6 +128,7 @@ go_test(
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync/initial-sync/testing:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/testing:go_default_library",
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/runutil"
)
Expand Down Expand Up @@ -47,6 +48,7 @@ type Config struct {
BlockNotifier blockfeed.Notifier
AttestationNotifier operation.Notifier
StateSummaryCache *cache.StateSummaryCache
StateGen *stategen.State
}

// This defines the interface for interacting with block chain service
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewRegularSync(cfg *Config) *Service {
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
stateGen: cfg.StateGen,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}

Expand Down Expand Up @@ -122,6 +125,7 @@ type Service struct {
seenAttesterSlashingLock sync.RWMutex
seenAttesterSlashingCache *lru.Cache
stateSummaryCache *cache.StateSummaryCache
stateGen *stategen.State
}

// Start the regular sync service.
Expand Down
19 changes: 1 addition & 18 deletions beacon-chain/sync/subscriber_beacon_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (

"github.com/gogo/protobuf/proto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)

func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) error {
Expand All @@ -25,21 +23,6 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)

block := signed.Block

blockRoot, err := ssz.HashTreeRoot(block)
if err != nil {
log.Errorf("Could not sign root block: %v", err)
return nil
}

// Handle block when the parent is unknown.
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(block.ParentRoot)) {
r.pendingQueueLock.Lock()
r.slotToPendingBlocks[block.Slot] = signed
r.seenPendingBlocks[blockRoot] = true
r.pendingQueueLock.Unlock()
return nil
}

// Broadcast the block on a feed to notify other services in the beacon node
// of a received block (even if it does not process correctly through a state transition).
r.blockNotifier.BlockFeed().Send(&feed.Event{
Expand All @@ -49,7 +32,7 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message)
},
})

err = r.chain.ReceiveBlockNoPubsub(ctx, signed)
err := r.chain.ReceiveBlockNoPubsub(ctx, signed)
if err != nil {
interop.WriteBlockToDisk(signed, true /*failed*/)
}
Expand Down
21 changes: 16 additions & 5 deletions beacon-chain/sync/validate_beacon_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,24 @@ func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms
return false
}

parentState, err := r.db.State(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
if err != nil {
log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Could not get parent state")
// Handle block when the parent is unknown.
if !r.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed to be put back, if validation fails the block isn't gossiped onwards to other peers.

r.pendingQueueLock.Lock()
r.slotToPendingBlocks[blk.Block.Slot] = blk
r.seenPendingBlocks[blockRoot] = true
r.pendingQueueLock.Unlock()
return false
}
if parentState == nil {
log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Parent state is nil")

hasStateSummaryDB := r.db.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
hasStateSummaryCache := r.stateSummaryCache.Has(bytesutil.ToBytes32(blk.Block.ParentRoot))
if !hasStateSummaryDB && !hasStateSummaryCache {
log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("No access to parent state")
return false
}
parentState, err := r.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot))
if err != nil {
log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Could not get parent state")
return false
}

Expand Down
76 changes: 56 additions & 20 deletions beacon-chain/sync/validate_beacon_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -93,11 +96,12 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {

c, _ := lru.New(10)
r := &Service{
db: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now()},
seenBlockCache: c,
db: db,
p2p: p,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now()},
seenBlockCache: c,
stateSummaryCache: cache.NewStateSummaryCache(),
}

buf := new(bytes.Buffer)
Expand All @@ -114,7 +118,6 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m)

if result {
t.Error("Expected false result, got true")
}
Expand All @@ -126,10 +129,24 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
p := p2ptest.NewTestP2P(t)
ctx := context.Background()
beaconState, privKeys := testutil.DeterministicGenesisState(t, 100)
bRoot := [32]byte{'a'}
parentBlock := &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{
ProposerIndex: 0,
Slot: 0,
},
}
if err := db.SaveBlock(ctx, parentBlock); err != nil {
t.Fatal(err)
}
bRoot, err := ssz.HashTreeRoot(parentBlock.Block)
if err := db.SaveState(ctx, beaconState, bRoot); err != nil {
t.Fatal(err)
}
if err := db.SaveStateSummary(ctx, &pb.StateSummary{
Root: bRoot[:],
}); err != nil {
t.Fatal(err)
}
proposerIdx, err := helpers.BeaconProposerIndex(beaconState)
if err != nil {
t.Fatal(err)
Expand All @@ -154,6 +171,8 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
msg.Signature = blockSig[:]

c, _ := lru.New(10)
stateSummaryCache := cache.NewStateSummaryCache()
stateGen := stategen.New(db, stateSummaryCache)
r := &Service{
db: db,
p2p: p,
Expand All @@ -163,7 +182,11 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
}},
seenBlockCache: c,
seenBlockCache: c,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
stateSummaryCache: stateSummaryCache,
stateGen: stateGen,
}

buf := new(bytes.Buffer)
Expand Down Expand Up @@ -256,11 +279,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {

c, _ := lru.New(10)
r := &Service{
p2p: p,
db: db,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now()},
seenBlockCache: c,
p2p: p,
db: db,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: &mock.ChainService{Genesis: time.Now()},
seenBlockCache: c,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}

buf := new(bytes.Buffer)
Expand Down Expand Up @@ -339,10 +364,22 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
p := p2ptest.NewTestP2P(t)
ctx := context.Background()
beaconState, privKeys := testutil.DeterministicGenesisState(t, 100)
bRoot := [32]byte{'a'}
parentBlock := &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{
ProposerIndex: 0,
Slot: 0,
},
}
if err := db.SaveBlock(ctx, parentBlock); err != nil {
t.Fatal(err)
}
bRoot, err := ssz.HashTreeRoot(parentBlock.Block)
if err := db.SaveState(ctx, beaconState, bRoot); err != nil {
t.Fatal(err)
}
if err != nil {
t.Fatal(err)
}
proposerIdx, err := helpers.BeaconProposerIndex(beaconState)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -377,7 +414,10 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 0,
}},
seenBlockCache: c,
seenBlockCache: c,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
stateSummaryCache: cache.NewStateSummaryCache(),
}

buf := new(bytes.Buffer)
Expand All @@ -392,13 +432,9 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
},
},
}
result := r.validateBeaconBlockPubSub(ctx, "", m)
if !result {
t.Error("Expected true result, got false")
}
r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex)
time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
result = r.validateBeaconBlockPubSub(ctx, "", m)
result := r.validateBeaconBlockPubSub(ctx, "", m)
if result {
t.Error("Expected false result, got true")
}
Expand Down