From 0c978507bc9686133bd03be5dde9f77e40b02d1e Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 6 Apr 2020 09:37:07 -0700 Subject: [PATCH 1/9] Started testing --- beacon-chain/interop-cold-start/service.go | 10 +++++++--- beacon-chain/powchain/service.go | 2 +- beacon-chain/sync/service.go | 2 +- shared/params/config.go | 2 +- validator/client/runner.go | 6 +++--- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/beacon-chain/interop-cold-start/service.go b/beacon-chain/interop-cold-start/service.go index 295dc3aeb58..f962e88abe4 100644 --- a/beacon-chain/interop-cold-start/service.go +++ b/beacon-chain/interop-cold-start/service.go @@ -183,16 +183,20 @@ func (s *Service) saveGenesisState(ctx context.Context, genesisState *stateTrie. return errors.Wrap(err, "could save finalized checkpoint") } + pubKeys := make([][48]byte, 0) + indices := make([]uint64, 0) for i := uint64(0); i < uint64(genesisState.NumValidators()); i++ { pk := genesisState.PubkeyAtIndex(i) - if err := s.beaconDB.SaveValidatorIndex(ctx, pk[:], i); err != nil { - return errors.Wrapf(err, "could not save validator index: %d", i) - } + pubKeys = append(pubKeys, pk) + indices = append(indices, i) s.chainStartDeposits[i] = ðpb.Deposit{ Data: ðpb.Deposit_Data{ PublicKey: pk[:], }, } } + if err := s.beaconDB.SaveValidatorIndices(ctx, pubKeys, indices); err != nil { + return errors.Wrap(err, "could not save validator index") + } return nil } diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index 1fe6cedc79a..de775356b71 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -399,7 +399,7 @@ func (s *Service) waitForConnection() { ticker.Stop() return } - log.WithError(err).Error("Could not connect to powchain endpoint") + //log.WithError(err).Error("Could not connect to powchain endpoint") case <-s.ctx.Done(): ticker.Stop() log.Debug("Received cancelled context,closing existing powchain service") diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index fcf6c2cbed9..1d511fa9049 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -135,7 +135,7 @@ func (r *Service) Start() { r.processPendingBlocksQueue() r.processPendingAttsQueue() r.maintainPeerStatuses() - r.resyncIfBehind() + //r.resyncIfBehind() // Update sync metrics. runutil.RunEvery(r.ctx, time.Second*10, r.updateMetrics) diff --git a/shared/params/config.go b/shared/params/config.go index cde379749a6..20f45a5d614 100644 --- a/shared/params/config.go +++ b/shared/params/config.go @@ -144,7 +144,7 @@ var defaultBeaconConfig = &BeaconChainConfig{ // Time parameter constants. MinAttestationInclusionDelay: 1, - SecondsPerSlot: 12, + SecondsPerSlot: 6, SlotsPerEpoch: 32, MinSeedLookahead: 1, MaxSeedLookahead: 4, diff --git a/validator/client/runner.go b/validator/client/runner.go index 5a90579aee1..3c87fb21466 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -51,9 +51,9 @@ func run(ctx context.Context, v Validator) { if err := v.WaitForSync(ctx); err != nil { log.Fatalf("Could not determine if beacon node synced: %v", err) } - if err := v.WaitForActivation(ctx); err != nil { - log.Fatalf("Could not wait for validator activation: %v", err) - } + //if err := v.WaitForActivation(ctx); err != nil { + // log.Fatalf("Could not wait for validator activation: %v", err) + //} headSlot, err := v.CanonicalHeadSlot(ctx) if err != nil { log.Fatalf("Could not get current canonical head slot: %v", err) From 3d7b6c39ff55b9ee8bc43f8a8f0c1de6fd6df92a Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 6 Apr 2020 13:28:32 -0700 Subject: [PATCH 2/9] Bunch of fixes --- .../blockchain/process_block_helpers.go | 2 + beacon-chain/blockchain/service.go | 3 + validator/client/validator.go | 68 +++++++++---------- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index ac01c4044ff..86cf6962e28 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -3,6 +3,7 @@ package blockchain import ( "bytes" "context" + "encoding/hex" "fmt" "github.com/pkg/errors" @@ -64,6 +65,7 @@ func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) ( if !featureconfig.Get().DisableNewStateMgmt { parentRoot := bytesutil.ToBytes32(b.ParentRoot) if !s.stateGen.StateSummaryExists(ctx, parentRoot) { + fmt.Println("Missing state summary root ", hex.EncodeToString(parentRoot[:])) return nil, errors.New("provided block root does not have block saved in the db") } preState, err := s.stateGen.StateByRoot(ctx, parentRoot) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 9019426a705..48101107f7a 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -5,6 +5,7 @@ package blockchain import ( "context" + "encoding/hex" "fmt" "runtime" "sync" @@ -202,6 +203,7 @@ func (s *Service) Start() { log.Fatal("Not configured web3Service for POW chain") return // return need for TestStartUninitializedChainWithoutConfigPOWChain. } + log.Error("HELLO1") go func() { stateChannel := make(chan *feed.Event, 1) stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) @@ -345,6 +347,7 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState *stateTrie.B }); err != nil { return err } + fmt.Println("Saving genesis block root ", hex.EncodeToString(genesisBlkRoot[:])) } else { if err := s.beaconDB.SaveState(ctx, genesisState, genesisBlkRoot); err != nil { return errors.Wrap(err, "could not save genesis state") diff --git a/validator/client/validator.go b/validator/client/validator.go index 7d7cb5a121b..e8b84dc350e 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -312,17 +312,17 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { } lFields["attesterSlot"] = duty.AttesterSlot - aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) - if err != nil { - return errors.Wrap(err, "could not check if a validator is an aggregator") - } - if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ - Slot: duty.AttesterSlot, - CommitteeId: duty.CommitteeIndex, - IsAggregator: aggregator, - }); err != nil { - return err - } + //aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + //if err != nil { + // return errors.Wrap(err, "could not check if a validator is an aggregator") + //} + //if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ + // Slot: duty.AttesterSlot, + // CommitteeId: duty.CommitteeIndex, + // IsAggregator: aggregator, + //}); err != nil { + // return err + //} } log.WithFields(lFields).Info("New assignment") @@ -330,29 +330,29 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { } // Notify beacon node to subscribe to the attester and aggregator subnets for the next epoch. - req.Epoch++ - dutiesNextEpoch, err := v.validatorClient.GetDuties(ctx, req) - if err != nil { - log.Error(err) - return err - } - if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { - for _, duty := range dutiesNextEpoch.Duties { - if duty.Status == ethpb.ValidatorStatus_ACTIVE { - aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) - if err != nil { - return errors.Wrap(err, "could not check if a validator is an aggregator") - } - if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ - Slot: duty.AttesterSlot, - CommitteeId: duty.CommitteeIndex, - IsAggregator: aggregator, - }); err != nil { - return err - } - } - } - } + //req.Epoch++ + //dutiesNextEpoch, err := v.validatorClient.GetDuties(ctx, req) + //if err != nil { + // log.Error(err) + // return err + //} + //if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { + // for _, duty := range dutiesNextEpoch.Duties { + // if duty.Status == ethpb.ValidatorStatus_ACTIVE { + // aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + // if err != nil { + // return errors.Wrap(err, "could not check if a validator is an aggregator") + // } + // if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ + // Slot: duty.AttesterSlot, + // CommitteeId: duty.CommitteeIndex, + // IsAggregator: aggregator, + // }); err != nil { + // return err + // } + // } + // } + //} return nil } From 4090792b9c9e4690cc4a82a29f36d2d1e9d41971 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 6 Apr 2020 14:44:00 -0700 Subject: [PATCH 3/9] use-interop --- beacon-chain/interop-cold-start/service.go | 6 ++++++ beacon-chain/node/node.go | 1 + beacon-chain/state/stategen/hot.go | 3 ++- beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/service.go | 4 ++++ beacon-chain/sync/validate_beacon_blocks.go | 21 ++++++++++++++++----- 6 files changed, 30 insertions(+), 6 deletions(-) diff --git a/beacon-chain/interop-cold-start/service.go b/beacon-chain/interop-cold-start/service.go index f962e88abe4..7b19ac89155 100644 --- a/beacon-chain/interop-cold-start/service.go +++ b/beacon-chain/interop-cold-start/service.go @@ -166,6 +166,12 @@ func (s *Service) saveGenesisState(ctx context.Context, genesisState *stateTrie. if err := s.beaconDB.SaveBlock(ctx, genesisBlk); err != nil { return errors.Wrap(err, "could not save genesis block") } + if err := s.beaconDB.SaveStateSummary(ctx, &pb.StateSummary{ + Slot: 0, + Root: genesisBlkRoot[:], + }); err != nil { + return err + } if err := s.beaconDB.SaveState(ctx, genesisState, genesisBlkRoot); err != nil { return errors.Wrap(err, "could not save genesis state") } diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index c59fe6f31c4..bf92bec3817 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -443,6 +443,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { ExitPool: b.exitPool, SlashingPool: b.slashingsPool, StateSummaryCache: b.stateSummaryCache, + StateGen: b.stateGen, }) return b.services.RegisterService(rs) diff --git a/beacon-chain/state/stategen/hot.go b/beacon-chain/state/stategen/hot.go index 4760769d60d..9c44b1334ec 100644 --- a/beacon-chain/state/stategen/hot.go +++ b/beacon-chain/state/stategen/hot.go @@ -3,6 +3,7 @@ package stategen import ( "context" "encoding/hex" + "fmt" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -34,7 +35,7 @@ func (s *State) saveHotState(ctx context.Context, blockRoot [32]byte, state *sta "slot": state.Slot(), "blockRoot": hex.EncodeToString(bytesutil.Trunc(blockRoot[:]))}).Info("Saved full state on epoch boundary") } - + fmt.Println("Putting ", state.Slot(), hex.EncodeToString(blockRoot[:])) // On an intermediate slots, save the hot state summary. s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{ Slot: state.Slot(), diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 9aa3b1143ee..bc6eb658305 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -53,6 +53,7 @@ go_library( "//beacon-chain/p2p:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/state:go_default_library", + "//beacon-chain/state/stategen:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared:go_default_library", "//shared/attestationutil:go_default_library", diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 1d511fa9049..6ac16f1114c 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -20,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/runutil" ) @@ -47,6 +48,7 @@ type Config struct { BlockNotifier blockfeed.Notifier AttestationNotifier operation.Notifier StateSummaryCache *cache.StateSummaryCache + StateGen *stategen.State } // This defines the interface for interacting with block chain service @@ -79,6 +81,7 @@ func NewRegularSync(cfg *Config) *Service { stateNotifier: cfg.StateNotifier, blockNotifier: cfg.BlockNotifier, stateSummaryCache: cfg.StateSummaryCache, + stateGen: cfg.StateGen, blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), } @@ -122,6 +125,7 @@ type Service struct { seenAttesterSlashingLock sync.RWMutex seenAttesterSlashingCache *lru.Cache stateSummaryCache *cache.StateSummaryCache + stateGen *stategen.State } // Start the regular sync service. diff --git a/beacon-chain/sync/validate_beacon_blocks.go b/beacon-chain/sync/validate_beacon_blocks.go index 4f551b4c5e8..339e92d2446 100644 --- a/beacon-chain/sync/validate_beacon_blocks.go +++ b/beacon-chain/sync/validate_beacon_blocks.go @@ -80,13 +80,24 @@ func (r *Service) validateBeaconBlockPubSub(ctx context.Context, pid peer.ID, ms return false } - parentState, err := r.db.State(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) - if err != nil { - log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Could not get parent state") + // Handle block when the parent is unknown. + if !r.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { + r.pendingQueueLock.Lock() + r.slotToPendingBlocks[blk.Block.Slot] = blk + r.seenPendingBlocks[blockRoot] = true + r.pendingQueueLock.Unlock() return false } - if parentState == nil { - log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Parent state is nil") + + hasStateSummaryDB := r.db.HasStateSummary(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) + hasStateSummaryCache := r.stateSummaryCache.Has(bytesutil.ToBytes32(blk.Block.ParentRoot)) + if !hasStateSummaryDB && !hasStateSummaryCache { + log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("No access to parent state") + return false + } + parentState, err := r.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) + if err != nil { + log.WithError(err).WithField("blockSlot", blk.Block.Slot).Warn("Could not get parent state") return false } From b86ee25947a1c791e7a491f942a262bdb273eacd Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 6 Apr 2020 14:50:27 -0700 Subject: [PATCH 4/9] Sync with v0.11 --- .../blockchain/process_block_helpers.go | 2 - beacon-chain/blockchain/service.go | 3 - beacon-chain/powchain/service.go | 2 +- beacon-chain/state/stategen/hot.go | 3 +- beacon-chain/sync/service.go | 2 +- shared/params/config.go | 2 +- validator/client/validator.go | 68 +++++++++---------- 7 files changed, 38 insertions(+), 44 deletions(-) diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index 86cf6962e28..ac01c4044ff 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -3,7 +3,6 @@ package blockchain import ( "bytes" "context" - "encoding/hex" "fmt" "github.com/pkg/errors" @@ -65,7 +64,6 @@ func (s *Service) verifyBlkPreState(ctx context.Context, b *ethpb.BeaconBlock) ( if !featureconfig.Get().DisableNewStateMgmt { parentRoot := bytesutil.ToBytes32(b.ParentRoot) if !s.stateGen.StateSummaryExists(ctx, parentRoot) { - fmt.Println("Missing state summary root ", hex.EncodeToString(parentRoot[:])) return nil, errors.New("provided block root does not have block saved in the db") } preState, err := s.stateGen.StateByRoot(ctx, parentRoot) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 48101107f7a..9019426a705 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -5,7 +5,6 @@ package blockchain import ( "context" - "encoding/hex" "fmt" "runtime" "sync" @@ -203,7 +202,6 @@ func (s *Service) Start() { log.Fatal("Not configured web3Service for POW chain") return // return need for TestStartUninitializedChainWithoutConfigPOWChain. } - log.Error("HELLO1") go func() { stateChannel := make(chan *feed.Event, 1) stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) @@ -347,7 +345,6 @@ func (s *Service) saveGenesisData(ctx context.Context, genesisState *stateTrie.B }); err != nil { return err } - fmt.Println("Saving genesis block root ", hex.EncodeToString(genesisBlkRoot[:])) } else { if err := s.beaconDB.SaveState(ctx, genesisState, genesisBlkRoot); err != nil { return errors.Wrap(err, "could not save genesis state") diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index de775356b71..1fe6cedc79a 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -399,7 +399,7 @@ func (s *Service) waitForConnection() { ticker.Stop() return } - //log.WithError(err).Error("Could not connect to powchain endpoint") + log.WithError(err).Error("Could not connect to powchain endpoint") case <-s.ctx.Done(): ticker.Stop() log.Debug("Received cancelled context,closing existing powchain service") diff --git a/beacon-chain/state/stategen/hot.go b/beacon-chain/state/stategen/hot.go index 9c44b1334ec..4760769d60d 100644 --- a/beacon-chain/state/stategen/hot.go +++ b/beacon-chain/state/stategen/hot.go @@ -3,7 +3,6 @@ package stategen import ( "context" "encoding/hex" - "fmt" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" @@ -35,7 +34,7 @@ func (s *State) saveHotState(ctx context.Context, blockRoot [32]byte, state *sta "slot": state.Slot(), "blockRoot": hex.EncodeToString(bytesutil.Trunc(blockRoot[:]))}).Info("Saved full state on epoch boundary") } - fmt.Println("Putting ", state.Slot(), hex.EncodeToString(blockRoot[:])) + // On an intermediate slots, save the hot state summary. s.stateSummaryCache.Put(blockRoot, &pb.StateSummary{ Slot: state.Slot(), diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 6ac16f1114c..d513b0429ca 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -139,7 +139,7 @@ func (r *Service) Start() { r.processPendingBlocksQueue() r.processPendingAttsQueue() r.maintainPeerStatuses() - //r.resyncIfBehind() + r.resyncIfBehind() // Update sync metrics. runutil.RunEvery(r.ctx, time.Second*10, r.updateMetrics) diff --git a/shared/params/config.go b/shared/params/config.go index 20f45a5d614..cde379749a6 100644 --- a/shared/params/config.go +++ b/shared/params/config.go @@ -144,7 +144,7 @@ var defaultBeaconConfig = &BeaconChainConfig{ // Time parameter constants. MinAttestationInclusionDelay: 1, - SecondsPerSlot: 6, + SecondsPerSlot: 12, SlotsPerEpoch: 32, MinSeedLookahead: 1, MaxSeedLookahead: 4, diff --git a/validator/client/validator.go b/validator/client/validator.go index e8b84dc350e..7d7cb5a121b 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -312,17 +312,17 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { } lFields["attesterSlot"] = duty.AttesterSlot - //aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) - //if err != nil { - // return errors.Wrap(err, "could not check if a validator is an aggregator") - //} - //if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ - // Slot: duty.AttesterSlot, - // CommitteeId: duty.CommitteeIndex, - // IsAggregator: aggregator, - //}); err != nil { - // return err - //} + aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + if err != nil { + return errors.Wrap(err, "could not check if a validator is an aggregator") + } + if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ + Slot: duty.AttesterSlot, + CommitteeId: duty.CommitteeIndex, + IsAggregator: aggregator, + }); err != nil { + return err + } } log.WithFields(lFields).Info("New assignment") @@ -330,29 +330,29 @@ func (v *validator) UpdateDuties(ctx context.Context, slot uint64) error { } // Notify beacon node to subscribe to the attester and aggregator subnets for the next epoch. - //req.Epoch++ - //dutiesNextEpoch, err := v.validatorClient.GetDuties(ctx, req) - //if err != nil { - // log.Error(err) - // return err - //} - //if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { - // for _, duty := range dutiesNextEpoch.Duties { - // if duty.Status == ethpb.ValidatorStatus_ACTIVE { - // aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) - // if err != nil { - // return errors.Wrap(err, "could not check if a validator is an aggregator") - // } - // if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ - // Slot: duty.AttesterSlot, - // CommitteeId: duty.CommitteeIndex, - // IsAggregator: aggregator, - // }); err != nil { - // return err - // } - // } - // } - //} + req.Epoch++ + dutiesNextEpoch, err := v.validatorClient.GetDuties(ctx, req) + if err != nil { + log.Error(err) + return err + } + if slot%params.BeaconConfig().SlotsPerEpoch == 0 || firstDutiesReceived { + for _, duty := range dutiesNextEpoch.Duties { + if duty.Status == ethpb.ValidatorStatus_ACTIVE { + aggregator, err := v.isAggregator(ctx, duty.Committee, duty.AttesterSlot, bytesutil.ToBytes48(duty.PublicKey)) + if err != nil { + return errors.Wrap(err, "could not check if a validator is an aggregator") + } + if _, err := v.validatorClient.SubscribeCommitteeSubnet(ctx, ðpb.CommitteeSubnetSubscribeRequest{ + Slot: duty.AttesterSlot, + CommitteeId: duty.CommitteeIndex, + IsAggregator: aggregator, + }); err != nil { + return err + } + } + } + } return nil } From cd3f445c02be054ef3e4856a55e30e98e76d0524 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 6 Apr 2020 14:54:23 -0700 Subject: [PATCH 5/9] Uncomment wait for activation --- validator/client/runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/validator/client/runner.go b/validator/client/runner.go index 3c87fb21466..5a90579aee1 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -51,9 +51,9 @@ func run(ctx context.Context, v Validator) { if err := v.WaitForSync(ctx); err != nil { log.Fatalf("Could not determine if beacon node synced: %v", err) } - //if err := v.WaitForActivation(ctx); err != nil { - // log.Fatalf("Could not wait for validator activation: %v", err) - //} + if err := v.WaitForActivation(ctx); err != nil { + log.Fatalf("Could not wait for validator activation: %v", err) + } headSlot, err := v.CanonicalHeadSlot(ctx) if err != nil { log.Fatalf("Could not get current canonical head slot: %v", err) From 87692389ca4302b5ffc56c30a4983f22cebbb0c0 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Mon, 6 Apr 2020 14:57:15 -0700 Subject: [PATCH 6/9] Move pending block queue from subscriber to validator pipeline --- beacon-chain/sync/subscriber_beacon_blocks.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index ebf08efe2e0..1141f062aef 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -6,12 +6,10 @@ import ( "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop" - "github.com/prysmaticlabs/prysm/shared/bytesutil" ) func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) error { @@ -25,21 +23,6 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) block := signed.Block - blockRoot, err := ssz.HashTreeRoot(block) - if err != nil { - log.Errorf("Could not sign root block: %v", err) - return nil - } - - // Handle block when the parent is unknown. - if !r.db.HasBlock(ctx, bytesutil.ToBytes32(block.ParentRoot)) { - r.pendingQueueLock.Lock() - r.slotToPendingBlocks[block.Slot] = signed - r.seenPendingBlocks[blockRoot] = true - r.pendingQueueLock.Unlock() - return nil - } - // Broadcast the block on a feed to notify other services in the beacon node // of a received block (even if it does not process correctly through a state transition). r.blockNotifier.BlockFeed().Send(&feed.Event{ @@ -49,7 +32,7 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) }, }) - err = r.chain.ReceiveBlockNoPubsub(ctx, signed) + err := r.chain.ReceiveBlockNoPubsub(ctx, signed) if err != nil { interop.WriteBlockToDisk(signed, true /*failed*/) } From b50408bc03291c765240e9c342d4cb00b7d02832 Mon Sep 17 00:00:00 2001 From: rauljordan Date: Mon, 6 Apr 2020 18:25:54 -0500 Subject: [PATCH 7/9] passing tests --- beacon-chain/sync/BUILD.bazel | 1 + .../sync/validate_beacon_blocks_test.go | 76 ++++++++++++++----- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index bc6eb658305..1cde9f745b3 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -128,6 +128,7 @@ go_test( "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/state:go_default_library", + "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/sync/initial-sync/testing:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//proto/testing:go_default_library", diff --git a/beacon-chain/sync/validate_beacon_blocks_test.go b/beacon-chain/sync/validate_beacon_blocks_test.go index 354e882fbc4..5fcc0602038 100644 --- a/beacon-chain/sync/validate_beacon_blocks_test.go +++ b/beacon-chain/sync/validate_beacon_blocks_test.go @@ -13,12 +13,15 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-ssz" mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/params" @@ -93,11 +96,12 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { c, _ := lru.New(10) r := &Service{ - db: db, - p2p: p, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: &mock.ChainService{Genesis: time.Now()}, - seenBlockCache: c, + db: db, + p2p: p, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: &mock.ChainService{Genesis: time.Now()}, + seenBlockCache: c, + stateSummaryCache: cache.NewStateSummaryCache(), } buf := new(bytes.Buffer) @@ -114,7 +118,6 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) { }, } result := r.validateBeaconBlockPubSub(ctx, "", m) - if result { t.Error("Expected false result, got true") } @@ -126,10 +129,24 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { p := p2ptest.NewTestP2P(t) ctx := context.Background() beaconState, privKeys := testutil.DeterministicGenesisState(t, 100) - bRoot := [32]byte{'a'} + parentBlock := ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + ProposerIndex: 0, + Slot: 0, + }, + } + if err := db.SaveBlock(ctx, parentBlock); err != nil { + t.Fatal(err) + } + bRoot, err := ssz.HashTreeRoot(parentBlock.Block) if err := db.SaveState(ctx, beaconState, bRoot); err != nil { t.Fatal(err) } + if err := db.SaveStateSummary(ctx, &pb.StateSummary{ + Root: bRoot[:], + }); err != nil { + t.Fatal(err) + } proposerIdx, err := helpers.BeaconProposerIndex(beaconState) if err != nil { t.Fatal(err) @@ -154,6 +171,8 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { msg.Signature = blockSig[:] c, _ := lru.New(10) + stateSummaryCache := cache.NewStateSummaryCache() + stateGen := stategen.New(db, stateSummaryCache) r := &Service{ db: db, p2p: p, @@ -163,7 +182,11 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) { FinalizedCheckPoint: ðpb.Checkpoint{ Epoch: 0, }}, - seenBlockCache: c, + seenBlockCache: c, + slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), + stateSummaryCache: stateSummaryCache, + stateGen: stateGen, } buf := new(bytes.Buffer) @@ -256,11 +279,13 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) { c, _ := lru.New(10) r := &Service{ - p2p: p, - db: db, - initialSync: &mockSync.Sync{IsSyncing: false}, - chain: &mock.ChainService{Genesis: time.Now()}, - seenBlockCache: c, + p2p: p, + db: db, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: &mock.ChainService{Genesis: time.Now()}, + seenBlockCache: c, + slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), } buf := new(bytes.Buffer) @@ -339,10 +364,22 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { p := p2ptest.NewTestP2P(t) ctx := context.Background() beaconState, privKeys := testutil.DeterministicGenesisState(t, 100) - bRoot := [32]byte{'a'} + parentBlock := ðpb.SignedBeaconBlock{ + Block: ðpb.BeaconBlock{ + ProposerIndex: 0, + Slot: 0, + }, + } + if err := db.SaveBlock(ctx, parentBlock); err != nil { + t.Fatal(err) + } + bRoot, err := ssz.HashTreeRoot(parentBlock.Block) if err := db.SaveState(ctx, beaconState, bRoot); err != nil { t.Fatal(err) } + if err != nil { + t.Fatal(err) + } proposerIdx, err := helpers.BeaconProposerIndex(beaconState) if err != nil { t.Fatal(err) @@ -377,7 +414,10 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { FinalizedCheckPoint: ðpb.Checkpoint{ Epoch: 0, }}, - seenBlockCache: c, + seenBlockCache: c, + slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), + stateSummaryCache: cache.NewStateSummaryCache(), } buf := new(bytes.Buffer) @@ -392,13 +432,9 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) { }, }, } - result := r.validateBeaconBlockPubSub(ctx, "", m) - if !result { - t.Error("Expected true result, got false") - } r.setSeenBlockIndexSlot(msg.Block.Slot, msg.Block.ProposerIndex) time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers. - result = r.validateBeaconBlockPubSub(ctx, "", m) + result := r.validateBeaconBlockPubSub(ctx, "", m) if result { t.Error("Expected false result, got true") } From 944448f7fc8fb1aab46e2e849e1eca311149e714 Mon Sep 17 00:00:00 2001 From: rauljordan Date: Mon, 6 Apr 2020 22:48:18 -0500 Subject: [PATCH 8/9] nil checks to prevent panics --- beacon-chain/blockchain/process_attestation.go | 10 ++++++++++ .../blockchain/process_attestation_helpers.go | 16 ++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/beacon-chain/blockchain/process_attestation.go b/beacon-chain/blockchain/process_attestation.go index c20d6dc6623..6aa0d593c2a 100644 --- a/beacon-chain/blockchain/process_attestation.go +++ b/beacon-chain/blockchain/process_attestation.go @@ -125,6 +125,16 @@ func (s *Service) onAttestation(ctx context.Context, a *ethpb.Attestation) ([]ui } } + if indexedAtt.AttestingIndices == nil { + return nil, errors.New("nil attesting indices") + } + if a.Data == nil { + return nil, errors.New("nil att data") + } + if a.Data.Target == nil { + return nil, errors.New("nil att target") + } + // Update forkchoice store with the new attestation for updating weight. s.forkChoiceStore.ProcessAttestation(ctx, indexedAtt.AttestingIndices, bytesutil.ToBytes32(a.Data.BeaconBlockRoot), a.Data.Target.Epoch) diff --git a/beacon-chain/blockchain/process_attestation_helpers.go b/beacon-chain/blockchain/process_attestation_helpers.go index 3805b99555b..92be8925715 100644 --- a/beacon-chain/blockchain/process_attestation_helpers.go +++ b/beacon-chain/blockchain/process_attestation_helpers.go @@ -130,14 +130,18 @@ func (s *Service) verifyAttestation(ctx context.Context, baseState *stateTrie.Be var err error if !featureconfig.Get().DisableNewStateMgmt { aState, err = s.stateGen.StateByRoot(ctx, bytesutil.ToBytes32(a.Data.BeaconBlockRoot)) - return nil, err + if err != nil { + return nil, err + } + } else { + aState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(a.Data.BeaconBlockRoot)) + if err != nil { + return nil, err + } } - - aState, err = s.beaconDB.State(ctx, bytesutil.ToBytes32(a.Data.BeaconBlockRoot)) - if err != nil { - return nil, err + if aState == nil { + return nil, fmt.Errorf("nil state for block root %#x\n", a.Data.BeaconBlockRoot) } - epoch := helpers.SlotToEpoch(a.Data.Slot) origSeed, err := helpers.Seed(baseState, epoch, params.BeaconConfig().DomainBeaconAttester) if err != nil { From bdbf59d1d73b8ea765abef712a5667204c7c61b6 Mon Sep 17 00:00:00 2001 From: rauljordan Date: Mon, 6 Apr 2020 22:50:43 -0500 Subject: [PATCH 9/9] lint --- beacon-chain/blockchain/process_attestation_helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/blockchain/process_attestation_helpers.go b/beacon-chain/blockchain/process_attestation_helpers.go index 92be8925715..6f497a28f7e 100644 --- a/beacon-chain/blockchain/process_attestation_helpers.go +++ b/beacon-chain/blockchain/process_attestation_helpers.go @@ -140,7 +140,7 @@ func (s *Service) verifyAttestation(ctx context.Context, baseState *stateTrie.Be } } if aState == nil { - return nil, fmt.Errorf("nil state for block root %#x\n", a.Data.BeaconBlockRoot) + return nil, fmt.Errorf("nil state for block root %#x", a.Data.BeaconBlockRoot) } epoch := helpers.SlotToEpoch(a.Data.Slot) origSeed, err := helpers.Seed(baseState, epoch, params.BeaconConfig().DomainBeaconAttester)