Skip to content

Commit

Permalink
Allow dynamic key reloading when having inactive keys (imported & der…
Browse files Browse the repository at this point in the history
…ived) (#8119)

* restart waiting for activation on key change

* test fixes

* wiat for activation comments

* regression test

* log fatal when validator cast fails

* derived keymanager

* review comments

* add buffer to channel

* simplify key refetch logic

* reload keys into empty wallet

* removed warning on wallet creation

* add empty line

* export AccountsKeystoreRepresentation type

* unit test for handleAccountsChanged

* test ctx cancellation

* add missing mockRemoteKeymanager interface function

* gazelle

* gzl

* fix panic inside goroutine during runner tests

* rename error message variables

* Update validator/accounts/accounts_list_test.go

* reorder imports

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
  • Loading branch information
3 people committed Jan 22, 2021
1 parent 229abed commit 8ffb95b
Show file tree
Hide file tree
Showing 23 changed files with 367 additions and 74 deletions.
1 change: 1 addition & 0 deletions validator/accounts/BUILD.bazel
Expand Up @@ -75,6 +75,7 @@ go_test(
"//proto/validator/accounts/v2:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/fileutil:go_default_library",
"//shared/mock:go_default_library",
"//shared/params:go_default_library",
Expand Down
5 changes: 2 additions & 3 deletions validator/accounts/accounts.go
Expand Up @@ -5,10 +5,9 @@ import (
"github.com/prysmaticlabs/prysm/validator/keymanager"
)

var msgKeymanagerNotSupported = "keymanager kind not supported: %s"

var (
// ErrCouldNotInitializeKeymanager informs about failed keymanager initialization
errKeymanagerNotSupported = "keymanager kind not supported: %s"
// MsgCouldNotInitializeKeymanager informs about failed keymanager initialization
ErrCouldNotInitializeKeymanager = "could not initialize keymanager"
)

Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/accounts_backup.go
Expand Up @@ -114,7 +114,7 @@ func BackupAccountsCli(cliCtx *cli.Context) error {
case keymanager.Remote:
return errors.New("backing up keys is not supported for a remote keymanager")
default:
return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind())
return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind())
}
return zipKeystoresToOutputDir(keystoresToBackup, backupDir)
}
Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/accounts_delete.go
Expand Up @@ -131,7 +131,7 @@ func DeleteAccount(ctx context.Context, cfg *AccountsConfig) error {
return errors.Wrap(err, "could not delete accounts")
}
default:
return fmt.Errorf(msgKeymanagerNotSupported, cfg.Wallet.KeymanagerKind())
return fmt.Errorf(errKeymanagerNotSupported, cfg.Wallet.KeymanagerKind())
}
return nil
}
2 changes: 1 addition & 1 deletion validator/accounts/accounts_list.go
Expand Up @@ -62,7 +62,7 @@ func ListAccountsCli(cliCtx *cli.Context) error {
return errors.Wrap(err, "could not list validator accounts with remote keymanager")
}
default:
return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind().String())
return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind().String())
}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions validator/accounts/accounts_list_test.go
Expand Up @@ -13,6 +13,7 @@ import (
validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/petnames"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
Expand Down Expand Up @@ -41,6 +42,10 @@ func (m *mockRemoteKeymanager) Sign(context.Context, *validatorpb.SignRequest) (
return nil, nil
}

func (m *mockRemoteKeymanager) SubscribeAccountChanges(_ chan [][48]byte) event.Subscription {
return nil
}

func createRandomKeystore(t testing.TB, password string) *keymanager.Keystore {
encryptor := keystorev4.New()
id, err := uuid.NewRandom()
Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/wallet_create.go
Expand Up @@ -116,7 +116,7 @@ func CreateWalletWithKeymanager(ctx context.Context, cfg *CreateWalletConfig) (*
"Successfully created wallet with remote keymanager configuration",
)
default:
return nil, errors.Wrapf(err, msgKeymanagerNotSupported, w.KeymanagerKind())
return nil, errors.Wrapf(err, errKeymanagerNotSupported, w.KeymanagerKind())
}
return w, nil
}
Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/wallet_edit.go
Expand Up @@ -50,7 +50,7 @@ func EditWalletConfigurationCli(cliCtx *cli.Context) error {
return errors.Wrap(err, "could not write config to disk")
}
default:
return fmt.Errorf(msgKeymanagerNotSupported, w.KeymanagerKind())
return fmt.Errorf(errKeymanagerNotSupported, w.KeymanagerKind())
}
return nil
}
4 changes: 4 additions & 0 deletions validator/client/BUILD.bazel
Expand Up @@ -103,8 +103,10 @@ go_test(
"//shared/testutil/assert:go_default_library",
"//shared/testutil/require:go_default_library",
"//shared/timeutils:go_default_library",
"//validator/accounts/testing:go_default_library",
"//validator/db/testing:go_default_library",
"//validator/graffiti:go_default_library",
"//validator/keymanager/derived:go_default_library",
"//validator/testing:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
Expand All @@ -114,6 +116,8 @@ go_test(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_tyler_smith_go_bip39//:go_default_library",
"@com_github_wealdtech_go_eth2_util//:go_default_library",
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
],
Expand Down
9 changes: 8 additions & 1 deletion validator/client/mock_validator.go
Expand Up @@ -6,6 +6,7 @@ import (

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/timeutils"
"github.com/prysmaticlabs/prysm/validator/keymanager"
)

var _ Validator = (*FakeValidator)(nil)
Expand Down Expand Up @@ -41,6 +42,7 @@ type FakeValidator struct {
IndexToPubkeyMap map[uint64][48]byte
PubkeyToIndexMap map[[48]byte]uint64
PubkeysToStatusesMap map[[48]byte]ethpb.ValidatorStatus
Keymanager keymanager.IKeymanager
}

type ctxKey string
Expand All @@ -65,7 +67,7 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
}

// WaitForActivation for mocking.
func (fv *FakeValidator) WaitForActivation(_ context.Context) error {
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ chan struct{}) error {
fv.WaitForActivationCalled = true
return nil
}
Expand Down Expand Up @@ -187,5 +189,10 @@ func (fv *FakeValidator) AllValidatorsAreExited(ctx context.Context) (bool, erro
return ctx.Value(allValidatorsAreExitedCtxKey).(bool), nil
}

// GetKeymanager for mocking
func (fv *FakeValidator) GetKeymanager() keymanager.IKeymanager {
return fv.Keymanager
}

// ReceiveBlocks for mocking
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context) {}
31 changes: 29 additions & 2 deletions validator/client/runner.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -20,7 +21,7 @@ type Validator interface {
Done()
WaitForChainStart(ctx context.Context) error
WaitForSync(ctx context.Context) error
WaitForActivation(ctx context.Context) error
WaitForActivation(ctx context.Context, accountsChangedChan chan struct{}) error
SlasherReady(ctx context.Context) error
CanonicalHeadSlot(ctx context.Context) (uint64, error)
NextSlot() <-chan uint64
Expand All @@ -36,6 +37,7 @@ type Validator interface {
UpdateDomainDataCaches(ctx context.Context, slot uint64)
WaitForWalletInitialization(ctx context.Context) error
AllValidatorsAreExited(ctx context.Context) (bool, error)
GetKeymanager() keymanager.IKeymanager
ReceiveBlocks(ctx context.Context)
}

Expand Down Expand Up @@ -68,9 +70,13 @@ func run(ctx context.Context, v Validator) {
if err := v.WaitForSync(ctx); err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
if err := v.WaitForActivation(ctx); err != nil {

accountsChangedChan := make(chan struct{}, 1)
go handleAccountsChanged(ctx, v, accountsChangedChan)
if err := v.WaitForActivation(ctx, accountsChangedChan); err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}

go v.ReceiveBlocks(ctx)

headSlot, err := v.CanonicalHeadSlot(ctx)
Expand Down Expand Up @@ -175,3 +181,24 @@ func handleAssignmentError(err error, slot uint64) {
log.WithField("error", err).Error("Failed to update assignments")
}
}

func handleAccountsChanged(ctx context.Context, v Validator, accountsChangedChan chan struct{}) {
validatingPubKeysChan := make(chan [][48]byte, 1)
var sub = v.GetKeymanager().SubscribeAccountChanges(validatingPubKeysChan)
defer func() {
sub.Unsubscribe()
close(validatingPubKeysChan)
}()

for {
select {
case <-validatingPubKeysChan:
accountsChangedChan <- struct{}{}
case err := <-sub.Err():
log.WithError(err).Error("accounts changed subscription failed")
return
case <-ctx.Done():
return
}
}
}
77 changes: 66 additions & 11 deletions validator/client/runner_test.go
Expand Up @@ -6,38 +6,41 @@ import (
"testing"
"time"

"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)

var walletPassword = "OhWOWthisisatest42!$"

func cancelledContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}

func TestCancelledContext_CleansUpValidator(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.DoneCalled, "Expected Done() to be called")
}

func TestCancelledContext_WaitsForChainStart(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
}

func TestCancelledContext_WaitsForActivation(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
}

func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
cfg := &featureconfig.Flags{
SlasherProtection: true,
}
Expand All @@ -48,7 +51,7 @@ func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
}

func TestUpdateDuties_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
Expand All @@ -68,7 +71,7 @@ func TestUpdateDuties_NextSlot(t *testing.T) {

func TestUpdateDuties_HandlesError(t *testing.T) {
hook := logTest.NewGlobal()
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
Expand All @@ -87,7 +90,7 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
}

func TestRoleAt_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
Expand All @@ -106,7 +109,7 @@ func TestRoleAt_NextSlot(t *testing.T) {
}

func TestAttests_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
Expand All @@ -126,7 +129,7 @@ func TestAttests_NextSlot(t *testing.T) {
}

func TestProposes_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
Expand All @@ -146,7 +149,7 @@ func TestProposes_NextSlot(t *testing.T) {
}

func TestBothProposesAndAttests_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.Background())

slot := uint64(55)
Expand All @@ -168,7 +171,7 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) {
}

func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
v := &FakeValidator{}
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
ctx, cancel := context.WithCancel(context.WithValue(context.Background(), allValidatorsAreExitedCtxKey, true))
hook := logTest.NewGlobal()

Expand All @@ -183,3 +186,55 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
run(ctx, v)
assert.LogsContain(t, hook, "All validators are exited")
}

func TestHandleAccountsChanged_Ok(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
v := &FakeValidator{Keymanager: km}
channel := make(chan struct{})
go handleAccountsChanged(ctx, v, channel)
time.Sleep(time.Second) // Allow time for subscribing to changes.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.

select {
case _, ok := <-channel:
if !ok {
t.Error("Account changed channel is closed")
}
default:
t.Error("Accounts changed channel is empty")
}
}

func TestHandleAccountsChanged_CtxCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
v := &FakeValidator{Keymanager: km}
channel := make(chan struct{}, 2)
go handleAccountsChanged(ctx, v, channel)
time.Sleep(time.Second) // Allow time for subscribing to changes.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.

cancel()
time.Sleep(time.Second) // Allow time for handling cancellation.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.

var values int
for loop := true; loop == true; {
select {
case _, ok := <-channel:
if ok {
values++
}
default:
loop = false
}
}
assert.Equal(t, 1, values, "Incorrect number of values were passed to the channel")
}
5 changes: 4 additions & 1 deletion validator/client/service.go
Expand Up @@ -330,7 +330,10 @@ func recheckValidatingKeysBucket(ctx context.Context, valDB db.Database, km keym
}
validatingPubKeysChan := make(chan [][48]byte, 1)
sub := importedKeymanager.SubscribeAccountChanges(validatingPubKeysChan)
defer sub.Unsubscribe()
defer func() {
sub.Unsubscribe()
close(validatingPubKeysChan)
}()
for {
select {
case keys := <-validatingPubKeysChan:
Expand Down
5 changes: 5 additions & 0 deletions validator/client/validator.go
Expand Up @@ -491,6 +491,11 @@ func (v *validator) RolesAt(ctx context.Context, slot uint64) (map[[48]byte][]Va
return rolesAt, nil
}

// GetKeymanager returns the underlying validator's keymanager.
func (v *validator) GetKeymanager() keymanager.IKeymanager {
return v.keyManager
}

// isAggregator checks if a validator is an aggregator of a given slot, it uses the selection algorithm outlined in:
// https://github.com/ethereum/eth2.0-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#aggregation-selection
func (v *validator) isAggregator(ctx context.Context, committee []uint64, slot uint64, pubKey [48]byte) (bool, error) {
Expand Down

0 comments on commit 8ffb95b

Please sign in to comment.