diff --git a/validator/accounts/BUILD.bazel b/validator/accounts/BUILD.bazel index 40602d79df1..ef4a60eff61 100644 --- a/validator/accounts/BUILD.bazel +++ b/validator/accounts/BUILD.bazel @@ -75,6 +75,7 @@ go_test( "//proto/validator/accounts/v2:go_default_library", "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/event:go_default_library", "//shared/fileutil:go_default_library", "//shared/mock:go_default_library", "//shared/params:go_default_library", diff --git a/validator/accounts/accounts.go b/validator/accounts/accounts.go index caa1bad314d..4ccce2e308a 100644 --- a/validator/accounts/accounts.go +++ b/validator/accounts/accounts.go @@ -5,10 +5,9 @@ import ( "github.com/prysmaticlabs/prysm/validator/keymanager" ) -var msgKeymanagerNotSupported = "keymanager kind not supported: %s" - var ( - // ErrCouldNotInitializeKeymanager informs about failed keymanager initialization + errKeymanagerNotSupported = "keymanager kind not supported: %s" + // MsgCouldNotInitializeKeymanager informs about failed keymanager initialization ErrCouldNotInitializeKeymanager = "could not initialize keymanager" ) diff --git a/validator/accounts/accounts_backup.go b/validator/accounts/accounts_backup.go index 77ec6219826..037922ed8e9 100644 --- a/validator/accounts/accounts_backup.go +++ b/validator/accounts/accounts_backup.go @@ -114,7 +114,7 @@ func BackupAccountsCli(cliCtx *cli.Context) error { case keymanager.Remote: return errors.New("backing up keys is not supported for a remote keymanager") default: - return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind()) + return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind()) } return zipKeystoresToOutputDir(keystoresToBackup, backupDir) } diff --git a/validator/accounts/accounts_delete.go b/validator/accounts/accounts_delete.go index aeb2c30a7ac..4c3226d5821 100644 --- a/validator/accounts/accounts_delete.go +++ b/validator/accounts/accounts_delete.go @@ -131,7 +131,7 @@ func DeleteAccount(ctx context.Context, cfg *AccountsConfig) error { return errors.Wrap(err, "could not delete accounts") } default: - return fmt.Errorf(msgKeymanagerNotSupported, cfg.Wallet.KeymanagerKind()) + return fmt.Errorf(errKeymanagerNotSupported, cfg.Wallet.KeymanagerKind()) } return nil } diff --git a/validator/accounts/accounts_list.go b/validator/accounts/accounts_list.go index 52c74bdd0da..87d4f2163b5 100644 --- a/validator/accounts/accounts_list.go +++ b/validator/accounts/accounts_list.go @@ -62,7 +62,7 @@ func ListAccountsCli(cliCtx *cli.Context) error { return errors.Wrap(err, "could not list validator accounts with remote keymanager") } default: - return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind().String()) + return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind().String()) } return nil } diff --git a/validator/accounts/accounts_list_test.go b/validator/accounts/accounts_list_test.go index dc3a35c575d..141cf585811 100644 --- a/validator/accounts/accounts_list_test.go +++ b/validator/accounts/accounts_list_test.go @@ -13,6 +13,7 @@ import ( validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/petnames" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" @@ -41,6 +42,10 @@ func (m *mockRemoteKeymanager) Sign(context.Context, *validatorpb.SignRequest) ( return nil, nil } +func (m *mockRemoteKeymanager) SubscribeAccountChanges(_ chan [][48]byte) event.Subscription { + return nil +} + func createRandomKeystore(t testing.TB, password string) *keymanager.Keystore { encryptor := keystorev4.New() id, err := uuid.NewRandom() diff --git a/validator/accounts/wallet_create.go b/validator/accounts/wallet_create.go index 57e1d54b199..c801e062c76 100644 --- a/validator/accounts/wallet_create.go +++ b/validator/accounts/wallet_create.go @@ -116,7 +116,7 @@ func CreateWalletWithKeymanager(ctx context.Context, cfg *CreateWalletConfig) (* "Successfully created wallet with remote keymanager configuration", ) default: - return nil, errors.Wrapf(err, msgKeymanagerNotSupported, w.KeymanagerKind()) + return nil, errors.Wrapf(err, errKeymanagerNotSupported, w.KeymanagerKind()) } return w, nil } diff --git a/validator/accounts/wallet_edit.go b/validator/accounts/wallet_edit.go index c2b1e051131..7f3835ead90 100644 --- a/validator/accounts/wallet_edit.go +++ b/validator/accounts/wallet_edit.go @@ -50,7 +50,7 @@ func EditWalletConfigurationCli(cliCtx *cli.Context) error { return errors.Wrap(err, "could not write config to disk") } default: - return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind()) + return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind()) } return nil } diff --git a/validator/client/BUILD.bazel b/validator/client/BUILD.bazel index 462284a28f1..a7cd796ed27 100644 --- a/validator/client/BUILD.bazel +++ b/validator/client/BUILD.bazel @@ -103,8 +103,10 @@ go_test( "//shared/testutil/assert:go_default_library", "//shared/testutil/require:go_default_library", "//shared/timeutils:go_default_library", + "//validator/accounts/testing:go_default_library", "//validator/db/testing:go_default_library", "//validator/graffiti:go_default_library", + "//validator/keymanager/derived:go_default_library", "//validator/testing:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_golang_mock//gomock:go_default_library", @@ -114,6 +116,8 @@ go_test( "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", + "@com_github_tyler_smith_go_bip39//:go_default_library", + "@com_github_wealdtech_go_eth2_util//:go_default_library", "@in_gopkg_d4l3k_messagediff_v1//:go_default_library", "@org_golang_google_grpc//metadata:go_default_library", ], diff --git a/validator/client/mock_validator.go b/validator/client/mock_validator.go index 4be9ce2fd40..ea72ec62af1 100644 --- a/validator/client/mock_validator.go +++ b/validator/client/mock_validator.go @@ -6,6 +6,7 @@ import ( ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/shared/timeutils" + "github.com/prysmaticlabs/prysm/validator/keymanager" ) var _ Validator = (*FakeValidator)(nil) @@ -41,6 +42,7 @@ type FakeValidator struct { IndexToPubkeyMap map[uint64][48]byte PubkeyToIndexMap map[[48]byte]uint64 PubkeysToStatusesMap map[[48]byte]ethpb.ValidatorStatus + Keymanager keymanager.IKeymanager } type ctxKey string @@ -65,7 +67,7 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error { } // WaitForActivation for mocking. -func (fv *FakeValidator) WaitForActivation(_ context.Context) error { +func (fv *FakeValidator) WaitForActivation(_ context.Context, _ chan struct{}) error { fv.WaitForActivationCalled = true return nil } @@ -187,5 +189,10 @@ func (fv *FakeValidator) AllValidatorsAreExited(ctx context.Context) (bool, erro return ctx.Value(allValidatorsAreExitedCtxKey).(bool), nil } +// GetKeymanager for mocking +func (fv *FakeValidator) GetKeymanager() keymanager.IKeymanager { + return fv.Keymanager +} + // ReceiveBlocks for mocking func (fv *FakeValidator) ReceiveBlocks(ctx context.Context) {} diff --git a/validator/client/runner.go b/validator/client/runner.go index aedc349da0c..93c552cf1bd 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -10,6 +10,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/validator/keymanager" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -20,7 +21,7 @@ type Validator interface { Done() WaitForChainStart(ctx context.Context) error WaitForSync(ctx context.Context) error - WaitForActivation(ctx context.Context) error + WaitForActivation(ctx context.Context, accountsChangedChan chan struct{}) error SlasherReady(ctx context.Context) error CanonicalHeadSlot(ctx context.Context) (uint64, error) NextSlot() <-chan uint64 @@ -36,6 +37,7 @@ type Validator interface { UpdateDomainDataCaches(ctx context.Context, slot uint64) WaitForWalletInitialization(ctx context.Context) error AllValidatorsAreExited(ctx context.Context) (bool, error) + GetKeymanager() keymanager.IKeymanager ReceiveBlocks(ctx context.Context) } @@ -68,9 +70,13 @@ func run(ctx context.Context, v Validator) { if err := v.WaitForSync(ctx); err != nil { log.Fatalf("Could not determine if beacon node synced: %v", err) } - if err := v.WaitForActivation(ctx); err != nil { + + accountsChangedChan := make(chan struct{}, 1) + go handleAccountsChanged(ctx, v, accountsChangedChan) + if err := v.WaitForActivation(ctx, accountsChangedChan); err != nil { log.Fatalf("Could not wait for validator activation: %v", err) } + go v.ReceiveBlocks(ctx) headSlot, err := v.CanonicalHeadSlot(ctx) @@ -175,3 +181,24 @@ func handleAssignmentError(err error, slot uint64) { log.WithField("error", err).Error("Failed to update assignments") } } + +func handleAccountsChanged(ctx context.Context, v Validator, accountsChangedChan chan struct{}) { + validatingPubKeysChan := make(chan [][48]byte, 1) + var sub = v.GetKeymanager().SubscribeAccountChanges(validatingPubKeysChan) + defer func() { + sub.Unsubscribe() + close(validatingPubKeysChan) + }() + + for { + select { + case <-validatingPubKeysChan: + accountsChangedChan <- struct{}{} + case err := <-sub.Err(): + log.WithError(err).Error("accounts changed subscription failed") + return + case <-ctx.Done(): + return + } + } +} diff --git a/validator/client/runner_test.go b/validator/client/runner_test.go index 31703e4a571..15efc2ead4a 100644 --- a/validator/client/runner_test.go +++ b/validator/client/runner_test.go @@ -6,12 +6,15 @@ import ( "testing" "time" + "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" logTest "github.com/sirupsen/logrus/hooks/test" ) +var walletPassword = "OhWOWthisisatest42!$" + func cancelledContext() context.Context { ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -19,25 +22,25 @@ func cancelledContext() context.Context { } func TestCancelledContext_CleansUpValidator(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} run(cancelledContext(), v) assert.Equal(t, true, v.DoneCalled, "Expected Done() to be called") } func TestCancelledContext_WaitsForChainStart(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} run(cancelledContext(), v) assert.Equal(t, true, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called") } func TestCancelledContext_WaitsForActivation(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} run(cancelledContext(), v) assert.Equal(t, true, v.WaitForActivationCalled, "Expected WaitForActivation() to be called") } func TestCancelledContext_ChecksSlasherReady(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} cfg := &featureconfig.Flags{ SlasherProtection: true, } @@ -48,7 +51,7 @@ func TestCancelledContext_ChecksSlasherReady(t *testing.T) { } func TestUpdateDuties_NextSlot(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.Background()) slot := uint64(55) @@ -68,7 +71,7 @@ func TestUpdateDuties_NextSlot(t *testing.T) { func TestUpdateDuties_HandlesError(t *testing.T) { hook := logTest.NewGlobal() - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.Background()) slot := uint64(55) @@ -87,7 +90,7 @@ func TestUpdateDuties_HandlesError(t *testing.T) { } func TestRoleAt_NextSlot(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.Background()) slot := uint64(55) @@ -106,7 +109,7 @@ func TestRoleAt_NextSlot(t *testing.T) { } func TestAttests_NextSlot(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.Background()) slot := uint64(55) @@ -126,7 +129,7 @@ func TestAttests_NextSlot(t *testing.T) { } func TestProposes_NextSlot(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.Background()) slot := uint64(55) @@ -146,7 +149,7 @@ func TestProposes_NextSlot(t *testing.T) { } func TestBothProposesAndAttests_NextSlot(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.Background()) slot := uint64(55) @@ -168,7 +171,7 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) { } func TestAllValidatorsAreExited_NextSlot(t *testing.T) { - v := &FakeValidator{} + v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}} ctx, cancel := context.WithCancel(context.WithValue(context.Background(), allValidatorsAreExitedCtxKey, true)) hook := logTest.NewGlobal() @@ -183,3 +186,55 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) { run(ctx, v) assert.LogsContain(t, hook, "All validators are exited") } + +func TestHandleAccountsChanged_Ok(t *testing.T) { + ctx := context.Background() + defer ctx.Done() + + km := &mockKeymanager{accountsChangedFeed: &event.Feed{}} + v := &FakeValidator{Keymanager: km} + channel := make(chan struct{}) + go handleAccountsChanged(ctx, v, channel) + time.Sleep(time.Second) // Allow time for subscribing to changes. + km.SimulateAccountChanges() + time.Sleep(time.Second) // Allow time for handling subscribed changes. + + select { + case _, ok := <-channel: + if !ok { + t.Error("Account changed channel is closed") + } + default: + t.Error("Accounts changed channel is empty") + } +} + +func TestHandleAccountsChanged_CtxCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + km := &mockKeymanager{accountsChangedFeed: &event.Feed{}} + v := &FakeValidator{Keymanager: km} + channel := make(chan struct{}, 2) + go handleAccountsChanged(ctx, v, channel) + time.Sleep(time.Second) // Allow time for subscribing to changes. + km.SimulateAccountChanges() + time.Sleep(time.Second) // Allow time for handling subscribed changes. + + cancel() + time.Sleep(time.Second) // Allow time for handling cancellation. + km.SimulateAccountChanges() + time.Sleep(time.Second) // Allow time for handling subscribed changes. + + var values int + for loop := true; loop == true; { + select { + case _, ok := <-channel: + if ok { + values++ + } + default: + loop = false + } + } + assert.Equal(t, 1, values, "Incorrect number of values were passed to the channel") +} diff --git a/validator/client/service.go b/validator/client/service.go index 933d412f232..9b026024408 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -330,7 +330,10 @@ func recheckValidatingKeysBucket(ctx context.Context, valDB db.Database, km keym } validatingPubKeysChan := make(chan [][48]byte, 1) sub := importedKeymanager.SubscribeAccountChanges(validatingPubKeysChan) - defer sub.Unsubscribe() + defer func() { + sub.Unsubscribe() + close(validatingPubKeysChan) + }() for { select { case keys := <-validatingPubKeysChan: diff --git a/validator/client/validator.go b/validator/client/validator.go index 26ad2f2dee7..5e57dd3f145 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -491,6 +491,11 @@ func (v *validator) RolesAt(ctx context.Context, slot uint64) (map[[48]byte][]Va return rolesAt, nil } +// GetKeymanager returns the underlying validator's keymanager. +func (v *validator) GetKeymanager() keymanager.IKeymanager { + return v.keyManager +} + // isAggregator checks if a validator is an aggregator of a given slot, it uses the selection algorithm outlined in: // https://github.com/ethereum/eth2.0-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#aggregation-selection func (v *validator) isAggregator(ctx context.Context, committee []uint64, slot uint64, pubKey [48]byte) (bool, error) { diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index cd6376e6e22..616c6cafe52 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -47,9 +47,10 @@ func genMockKeymanger(numKeys int) *mockKeymanager { } type mockKeymanager struct { - lock sync.RWMutex - keysMap map[[48]byte]bls.SecretKey - fetchNoKeys bool + lock sync.RWMutex + keysMap map[[48]byte]bls.SecretKey + fetchNoKeys bool + accountsChangedFeed *event.Feed } func (m *mockKeymanager) FetchValidatingPublicKeys(ctx context.Context) ([][48]byte, error) { @@ -89,6 +90,14 @@ func (m *mockKeymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) return sig, nil } +func (m *mockKeymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription { + return m.accountsChangedFeed.Subscribe(pubKeysChan) +} + +func (m *mockKeymanager) SimulateAccountChanges() { + m.accountsChangedFeed.Send(make([][48]byte, 0)) +} + func generateMockStatusResponse(pubkeys [][]byte) *ethpb.ValidatorActivationResponse { multipleStatus := make([]*ethpb.ValidatorActivationResponse_Status, len(pubkeys)) for i, key := range pubkeys { @@ -351,7 +360,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) { resp, nil, ) - require.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") + require.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") require.LogsContain(t, hook, "Validator activated") } @@ -389,7 +398,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") + assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") } func TestWaitSync_ContextCanceled(t *testing.T) { diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index 377430c3bbc..393fa0747b2 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -20,7 +20,7 @@ import ( // WaitForActivation checks whether the validator pubkey is in the active // validator set. If not, this operation will block until an activation message is // received. -func (v *validator) WaitForActivation(ctx context.Context) error { +func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan chan struct{}) error { ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation") defer span.End() @@ -36,17 +36,14 @@ func (v *validator) WaitForActivation(ctx context.Context) error { for { select { case <-ticker.C: - keys, err := v.keyManager.FetchValidatingPublicKeys(ctx) + validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx) if err != nil { return errors.Wrap(err, msgCouldNotFetchKeys) } - if len(keys) == 0 { + if len(validatingKeys) == 0 { log.Warn(msgNoKeysFetched) continue } - // after this statement we jump out of `select` and hit `break`, - // thus jumping out of `for` into the rest of the function - validatingKeys = keys case <-ctx.Done(): log.Debug("Context closed, exiting fetching validating keys") return ctx.Err() @@ -66,44 +63,52 @@ func (v *validator) WaitForActivation(ctx context.Context) error { Error("Stream broken while waiting for activation. Reconnecting...") // Reconnection attempt backoff, up to 60s. time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) - return v.WaitForActivation(incrementRetries(ctx)) + return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan) } for { - res, err := stream.Recv() - // If the stream is closed, we stop the loop. - if errors.Is(err, io.EOF) { - break - } - // If context is canceled we stop the loop. - if ctx.Err() == context.Canceled { - return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") - } - if err != nil { - traceutil.AnnotateError(span, err) - attempts := getStreamAttempts(ctx) - log.WithError(err).WithField("attempts", attempts). - Error("Stream broken while waiting for activation. Reconnecting...") - // Reconnection attempt backoff, up to 60s. - time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) - return v.WaitForActivation(incrementRetries(ctx)) - } - valActivated := v.checkAndLogValidatorStatus(res.Statuses) + select { + case <-accountsChangedChan: + // Accounts (keys) changed, restart the process. + return v.WaitForActivation(ctx, accountsChangedChan) + default: + res, err := stream.Recv() + // If the stream is closed, we stop the loop. + if errors.Is(err, io.EOF) { + break + } + // If context is canceled we return from the function. + if ctx.Err() == context.Canceled { + return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop") + } + if err != nil { + traceutil.AnnotateError(span, err) + attempts := getStreamAttempts(ctx) + log.WithError(err).WithField("attempts", attempts). + Error("Stream broken while waiting for activation. Reconnecting...") + // Reconnection attempt backoff, up to 60s. + time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60))) + return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan) + } + valActivated := v.checkAndLogValidatorStatus(res.Statuses) - if valActivated { - for _, statusResp := range res.Statuses { - if statusResp.Status.Status != ethpb.ValidatorStatus_ACTIVE { - continue + if valActivated { + for _, statusResp := range res.Statuses { + if statusResp.Status.Status != ethpb.ValidatorStatus_ACTIVE { + continue + } + log.WithFields(logrus.Fields{ + "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(statusResp.PublicKey)), + "index": statusResp.Index, + }).Info("Validator activated") } - log.WithFields(logrus.Fields{ - "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(statusResp.PublicKey)), - "index": statusResp.Index, - }).Info("Validator activated") + } else { + continue } - break } + break } - v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) + v.ticker = slotutil.GetSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) return nil } diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index 70147a1a502..6d37e39e9a7 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -2,6 +2,7 @@ package client import ( "context" + "fmt" "testing" "time" @@ -12,7 +13,11 @@ import ( "github.com/prysmaticlabs/prysm/shared/mock" "github.com/prysmaticlabs/prysm/shared/testutil/assert" "github.com/prysmaticlabs/prysm/shared/testutil/require" + walletMock "github.com/prysmaticlabs/prysm/validator/accounts/testing" + "github.com/prysmaticlabs/prysm/validator/keymanager/derived" logTest "github.com/sirupsen/logrus/hooks/test" + "github.com/tyler-smith/go-bip39" + util "github.com/wealdtech/go-eth2-util" ) func TestWaitActivation_ContextCanceled(t *testing.T) { @@ -46,7 +51,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) { ) ctx, cancel := context.WithCancel(context.Background()) cancel() - assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx)) + assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, make(chan struct{}))) } func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) { @@ -77,7 +82,7 @@ func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) { resp := generateMockStatusResponse([][]byte{pubKey[:]}) resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE clientStream.EXPECT().Recv().Return(resp, nil) - assert.NoError(t, v.WaitForActivation(context.Background())) + assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{}))) } func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testing.T) { @@ -112,7 +117,7 @@ func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testin nil, errors.New("fails"), ).Return(resp, nil) - assert.NoError(t, v.WaitForActivation(context.Background())) + assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{}))) } func TestWaitActivation_LogsActivationEpochOK(t *testing.T) { @@ -147,8 +152,8 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") - require.LogsContain(t, hook, "Validator activated") + assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") + assert.LogsContain(t, hook, "Validator activated") } func TestWaitForActivation_Exiting(t *testing.T) { @@ -182,7 +187,7 @@ func TestWaitForActivation_Exiting(t *testing.T) { resp, nil, ) - require.NoError(t, v.WaitForActivation(context.Background())) + assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{}))) } func TestWaitForActivation_RefetchKeys(t *testing.T) { @@ -190,7 +195,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { defer func() { keyRefetchPeriod = originalPeriod }() - keyRefetchPeriod = 5 * time.Second + keyRefetchPeriod = 1 * time.Second hook := logTest.NewGlobal() ctrl := gomock.NewController(t) @@ -224,7 +229,152 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { resp, nil, ) - assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation") + assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation") assert.LogsContain(t, hook, msgNoKeysFetched) assert.LogsContain(t, hook, "Validator activated") } + +// Regression test for a scenario where you start with an inactive key and then import an active key. +func TestWaitForActivation_AccountsChanged(t *testing.T) { + hook := logTest.NewGlobal() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + t.Run("Imported keymanager", func(t *testing.T) { + inactivePrivKey, err := bls.RandKey() + require.NoError(t, err) + inactivePubKey := [48]byte{} + copy(inactivePubKey[:], inactivePrivKey.PublicKey().Marshal()) + activePrivKey, err := bls.RandKey() + require.NoError(t, err) + activePubKey := [48]byte{} + copy(activePubKey[:], activePrivKey.PublicKey().Marshal()) + km := &mockKeymanager{ + keysMap: map[[48]byte]bls.SecretKey{ + inactivePubKey: inactivePrivKey, + }, + } + client := mock.NewMockBeaconNodeValidatorClient(ctrl) + v := validator{ + validatorClient: client, + keyManager: km, + genesisTime: 1, + } + + inactiveResp := generateMockStatusResponse([][]byte{inactivePubKey[:]}) + inactiveResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS + inactiveClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) + client.EXPECT().WaitForActivation( + gomock.Any(), + ðpb.ValidatorActivationRequest{ + PublicKeys: [][]byte{inactivePubKey[:]}, + }, + ).Return(inactiveClientStream, nil) + inactiveClientStream.EXPECT().Recv().Return( + inactiveResp, + nil, + ).AnyTimes() + + activeResp := generateMockStatusResponse([][]byte{inactivePubKey[:], activePubKey[:]}) + activeResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS + activeResp.Statuses[1].Status.Status = ethpb.ValidatorStatus_ACTIVE + activeClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) + client.EXPECT().WaitForActivation( + gomock.Any(), + ðpb.ValidatorActivationRequest{ + PublicKeys: [][]byte{inactivePubKey[:], activePubKey[:]}, + }, + ).Return(activeClientStream, nil) + activeClientStream.EXPECT().Recv().Return( + activeResp, + nil, + ) + + channel := make(chan struct{}) + go func() { + // We add the active key into the keymanager and simulate a key refresh. + time.Sleep(time.Second * 1) + km.keysMap[activePubKey] = activePrivKey + channel <- struct{}{} + }() + + assert.NoError(t, v.WaitForActivation(context.Background(), channel)) + assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") + assert.LogsContain(t, hook, "Validator activated") + }) + + t.Run("Derived keymanager", func(t *testing.T) { + mnemonic := "tumble turn jewel sudden social great water general cabin jacket bounce dry flip monster advance problem social half flee inform century chicken hard reason" + seed := bip39.NewSeed(mnemonic, "") + inactivePrivKey, err := + util.PrivateKeyFromSeedAndPath(seed, fmt.Sprintf(derived.ValidatingKeyDerivationPathTemplate, 0)) + require.NoError(t, err) + inactivePubKey := [48]byte{} + copy(inactivePubKey[:], inactivePrivKey.PublicKey().Marshal()) + activePrivKey, err := + util.PrivateKeyFromSeedAndPath(seed, fmt.Sprintf(derived.ValidatingKeyDerivationPathTemplate, 1)) + require.NoError(t, err) + activePubKey := [48]byte{} + copy(activePubKey[:], activePrivKey.PublicKey().Marshal()) + wallet := &walletMock.Wallet{ + Files: make(map[string]map[string][]byte), + AccountPasswords: make(map[string]string), + WalletPassword: "secretPassw0rd$1999", + } + ctx := context.Background() + km, err := derived.NewKeymanager(ctx, &derived.SetupConfig{ + Wallet: wallet, + }) + require.NoError(t, err) + err = km.RecoverAccountsFromMnemonic(ctx, mnemonic, "", 1) + require.NoError(t, err) + client := mock.NewMockBeaconNodeValidatorClient(ctrl) + v := validator{ + validatorClient: client, + keyManager: km, + genesisTime: 1, + } + + inactiveResp := generateMockStatusResponse([][]byte{inactivePubKey[:]}) + inactiveResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS + inactiveClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) + client.EXPECT().WaitForActivation( + gomock.Any(), + ðpb.ValidatorActivationRequest{ + PublicKeys: [][]byte{inactivePubKey[:]}, + }, + ).Return(inactiveClientStream, nil) + inactiveClientStream.EXPECT().Recv().Return( + inactiveResp, + nil, + ).AnyTimes() + + activeResp := generateMockStatusResponse([][]byte{inactivePubKey[:], activePubKey[:]}) + activeResp.Statuses[0].Status.Status = ethpb.ValidatorStatus_UNKNOWN_STATUS + activeResp.Statuses[1].Status.Status = ethpb.ValidatorStatus_ACTIVE + activeClientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) + client.EXPECT().WaitForActivation( + gomock.Any(), + ðpb.ValidatorActivationRequest{ + PublicKeys: [][]byte{inactivePubKey[:], activePubKey[:]}, + }, + ).Return(activeClientStream, nil) + activeClientStream.EXPECT().Recv().Return( + activeResp, + nil, + ) + + channel := make(chan struct{}) + go func() { + // We add the active key into the keymanager and simulate a key refresh. + time.Sleep(time.Second * 1) + err = km.RecoverAccountsFromMnemonic(ctx, mnemonic, "", 2) + require.NoError(t, err) + channel <- struct{}{} + }() + + assert.NoError(t, v.WaitForActivation(context.Background(), channel)) + assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node") + assert.LogsContain(t, hook, "Validator activated") + }) +} diff --git a/validator/keymanager/BUILD.bazel b/validator/keymanager/BUILD.bazel index 0de04b6aa1f..fcc8461609b 100644 --- a/validator/keymanager/BUILD.bazel +++ b/validator/keymanager/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//proto/validator/accounts/v2:go_default_library", "//shared/bls:go_default_library", + "//shared/event:go_default_library", ], ) diff --git a/validator/keymanager/derived/BUILD.bazel b/validator/keymanager/derived/BUILD.bazel index 438311e4412..323a5b33f95 100644 --- a/validator/keymanager/derived/BUILD.bazel +++ b/validator/keymanager/derived/BUILD.bazel @@ -13,6 +13,7 @@ go_library( deps = [ "//proto/validator/accounts/v2:go_default_library", "//shared/bls:go_default_library", + "//shared/event:go_default_library", "//shared/promptutil:go_default_library", "//shared/rand:go_default_library", "//validator/accounts/iface:go_default_library", diff --git a/validator/keymanager/derived/keymanager.go b/validator/keymanager/derived/keymanager.go index 391ab2f974d..f1b489393c1 100644 --- a/validator/keymanager/derived/keymanager.go +++ b/validator/keymanager/derived/keymanager.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2" "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/validator/accounts/iface" "github.com/prysmaticlabs/prysm/validator/keymanager" "github.com/prysmaticlabs/prysm/validator/keymanager/imported" @@ -112,3 +113,10 @@ func (km *Keymanager) FetchValidatingPrivateKeys(ctx context.Context) ([][32]byt func (km *Keymanager) DeleteAccounts(ctx context.Context, publicKeys [][]byte) error { return km.importedKM.DeleteAccounts(ctx, publicKeys) } + +// SubscribeAccountChanges creates an event subscription for a channel +// to listen for public key changes at runtime, such as when new validator accounts +// are imported into the keymanager while the validator process is running. +func (dr *Keymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription { + return dr.importedKM.SubscribeAccountChanges(pubKeysChan) +} diff --git a/validator/keymanager/remote/BUILD.bazel b/validator/keymanager/remote/BUILD.bazel index bb8cd0a463a..5e8ff388b96 100644 --- a/validator/keymanager/remote/BUILD.bazel +++ b/validator/keymanager/remote/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//proto/validator/accounts/v2:go_default_library", "//shared/bls:go_default_library", "//shared/bytesutil:go_default_library", + "//shared/event:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", "@com_github_logrusorgru_aurora//:go_default_library", "@com_github_pkg_errors//:go_default_library", diff --git a/validator/keymanager/remote/keymanager.go b/validator/keymanager/remote/keymanager.go index 47d7643fd2c..41b0a99663b 100644 --- a/validator/keymanager/remote/keymanager.go +++ b/validator/keymanager/remote/keymanager.go @@ -16,6 +16,7 @@ import ( validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2" "github.com/prysmaticlabs/prysm/shared/bls" "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/event" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) @@ -227,3 +228,11 @@ func (km *Keymanager) Sign(ctx context.Context, req *validatorpb.SignRequest) (b } return bls.SignatureFromBytes(resp.Signature) } + +// SubscribeAccountChanges is currently NOT IMPLEMENTED for the remote keymanager. +// INVOKING THIS FUNCTION HAS NO EFFECT! +func (k *Keymanager) SubscribeAccountChanges(_ chan [][48]byte) event.Subscription { + return event.NewSubscription(func(i <-chan struct{}) error { + return nil + }) +} diff --git a/validator/keymanager/types.go b/validator/keymanager/types.go index f1778137bae..a03ae551608 100644 --- a/validator/keymanager/types.go +++ b/validator/keymanager/types.go @@ -6,16 +6,19 @@ import ( validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2" "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/event" ) // IKeymanager defines a general keymanager interface for Prysm wallets. type IKeymanager interface { - // FetchValidatingKeys fetches the list of active public keys that should be used to validate with. + // FetchValidatingPublicKeys fetches the list of active public keys that should be used to validate with. FetchValidatingPublicKeys(ctx context.Context) ([][48]byte, error) - // FetchAllValidatingKeys fetches the list of all public keys, including disabled ones. + // FetchAllValidatingPublicKeys fetches the list of all public keys, including disabled ones. FetchAllValidatingPublicKeys(ctx context.Context) ([][48]byte, error) // Sign signs a message using a validator key. Sign(context.Context, *validatorpb.SignRequest) (bls.Signature, error) + // SubscribeAccountChanges subscribes to changes made to the underlying keys. + SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription } // Keystore json file representation as a Go struct.