Skip to content

Commit

Permalink
Fix validator activation monitoring with inactive keys (#8558)
Browse files Browse the repository at this point in the history
* refactor / move waiting for activation updates

* Commentary

* Update test to follow the full code path

* gofmt and goimports

* manual imports fixes

* Apply suggestions from code review

typo fixes

* Remove redundant handleAccountsChanged and chan. Thanks @nisdas

* var sub = to sub :=

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
  • Loading branch information
prestonvanloon and rkapka committed Mar 5, 2021
1 parent edd86fd commit f074c5e
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 101 deletions.
2 changes: 1 addition & 1 deletion validator/accounts/accounts_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"archive/zip"
"encoding/hex"
"encoding/json"
constant "github.com/prysmaticlabs/prysm/validator/testing"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -21,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/validator/accounts/wallet"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
constant "github.com/prysmaticlabs/prysm/validator/testing"
)

func TestBackupAccounts_Noninteractive_Derived(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion validator/accounts/accounts_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package accounts
import (
"context"
"fmt"
constant "github.com/prysmaticlabs/prysm/validator/testing"
"io/ioutil"
"math"
"os"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
"github.com/prysmaticlabs/prysm/validator/keymanager/imported"
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
constant "github.com/prysmaticlabs/prysm/validator/testing"
keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4"
)

Expand Down
2 changes: 1 addition & 1 deletion validator/client/mock_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
}

// WaitForActivation for mocking.
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ <-chan struct{}) error {
func (fv *FakeValidator) WaitForActivation(_ context.Context) error {
fv.WaitForActivationCalled++
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
return errConnectionIssue
Expand Down
27 changes: 2 additions & 25 deletions validator/client/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Validator interface {
Done()
WaitForChainStart(ctx context.Context) error
WaitForSync(ctx context.Context) error
WaitForActivation(ctx context.Context, accountsChangedChan <-chan struct{}) error
WaitForActivation(ctx context.Context) error
SlasherReady(ctx context.Context) error
CanonicalHeadSlot(ctx context.Context) (types.Slot, error)
NextSlot() <-chan types.Slot
Expand Down Expand Up @@ -75,7 +75,6 @@ func run(ctx context.Context, v Validator) {

var headSlot types.Slot
firstTime := true
accountsChangedChan := make(chan struct{}, 1)
for {
if !firstTime {
if ctx.Err() != nil {
Expand All @@ -102,7 +101,7 @@ func run(ctx context.Context, v Validator) {
if err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
err = v.WaitForActivation(ctx, accountsChangedChan)
err = v.WaitForActivation(ctx)
if isConnectionError(err) {
log.Warnf("Could not wait for validator activation: %v", err)
continue
Expand All @@ -121,7 +120,6 @@ func run(ctx context.Context, v Validator) {
break
}

go handleAccountsChanged(ctx, v, accountsChangedChan)
connectionErrorChannel := make(chan error, 1)
go v.ReceiveBlocks(ctx, connectionErrorChannel)
if err := v.UpdateDuties(ctx, headSlot); err != nil {
Expand Down Expand Up @@ -233,24 +231,3 @@ func handleAssignmentError(err error, slot types.Slot) {
log.WithField("error", err).Error("Failed to update assignments")
}
}

func handleAccountsChanged(ctx context.Context, v Validator, accountsChangedChan chan<- struct{}) {
validatingPubKeysChan := make(chan [][48]byte, 1)
var sub = v.GetKeymanager().SubscribeAccountChanges(validatingPubKeysChan)
defer func() {
sub.Unsubscribe()
close(validatingPubKeysChan)
}()

for {
select {
case <-validatingPubKeysChan:
accountsChangedChan <- struct{}{}
case err := <-sub.Err():
log.WithError(err).Error("accounts changed subscription failed")
return
case <-ctx.Done():
return
}
}
}
52 changes: 0 additions & 52 deletions validator/client/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,55 +206,3 @@ func TestAllValidatorsAreExited_NextSlot(t *testing.T) {
run(ctx, v)
assert.LogsContain(t, hook, "All validators are exited")
}

func TestHandleAccountsChanged_Ok(t *testing.T) {
ctx := context.Background()
defer ctx.Done()

km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
v := &FakeValidator{Keymanager: km}
channel := make(chan struct{})
go handleAccountsChanged(ctx, v, channel)
time.Sleep(time.Second) // Allow time for subscribing to changes.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.

select {
case _, ok := <-channel:
if !ok {
t.Error("Account changed channel is closed")
}
default:
t.Error("Accounts changed channel is empty")
}
}

func TestHandleAccountsChanged_CtxCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

km := &mockKeymanager{accountsChangedFeed: &event.Feed{}}
v := &FakeValidator{Keymanager: km}
channel := make(chan struct{}, 2)
go handleAccountsChanged(ctx, v, channel)
time.Sleep(time.Second) // Allow time for subscribing to changes.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.

cancel()
time.Sleep(time.Second) // Allow time for handling cancellation.
km.SimulateAccountChanges()
time.Sleep(time.Second) // Allow time for handling subscribed changes.

var values int
for loop := true; loop == true; {
select {
case _, ok := <-channel:
if ok {
values++
}
default:
loop = false
}
}
assert.Equal(t, 1, values, "Incorrect number of values were passed to the channel")
}
7 changes: 5 additions & 2 deletions validator/client/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (m *mockKeymanager) Sign(ctx context.Context, req *validatorpb.SignRequest)
}

func (m *mockKeymanager) SubscribeAccountChanges(pubKeysChan chan [][48]byte) event.Subscription {
if m.accountsChangedFeed == nil {
m.accountsChangedFeed = &event.Feed{}
}
return m.accountsChangedFeed.Subscribe(pubKeysChan)
}

Expand Down Expand Up @@ -362,7 +365,7 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
resp,
nil,
)
require.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
require.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
require.LogsContain(t, hook, "Validator activated")
}

Expand Down Expand Up @@ -400,7 +403,7 @@ func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
}

func TestWaitSync_ContextCanceled(t *testing.T) {
Expand Down
30 changes: 25 additions & 5 deletions validator/client/wait_for_activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,28 @@ import (

// WaitForActivation checks whether the validator pubkey is in the active
// validator set. If not, this operation will block until an activation message is
// received.
func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <-chan struct{}) error {
// received. This method also monitors the keymanager for updates while waiting for an activation
// from the gRPC server.
func (v *validator) WaitForActivation(ctx context.Context) error {
// Monitor the key manager for updates.
accountsChangedChan := make(chan [][48]byte)
sub := v.GetKeymanager().SubscribeAccountChanges(accountsChangedChan)
defer func() {
sub.Unsubscribe()
close(accountsChangedChan)
}()

return v.waitForActivation(ctx, accountsChangedChan)
}

// waitForActivation performs the following:
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
// 2) Open a server side stream for activation events against the given keys.
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
// the accountsChangedChan. When an event signal is received, restart the waitForActivation routine.
// 4) If the stream is reset in error, restart the routine.
// 5) If the stream returns a response indicating one or more validators are active, exit the routine.
func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <-chan [][48]byte) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()

Expand Down Expand Up @@ -63,13 +83,13 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan)
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
}
for {
select {
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.WaitForActivation(ctx, accountsChangedChan)
return v.waitForActivation(ctx, accountsChangedChan)
default:
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
Expand All @@ -87,7 +107,7 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan <
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(mathutil.Min(uint64(attempts), 60)))
return v.WaitForActivation(incrementRetries(ctx), accountsChangedChan)
return v.waitForActivation(incrementRetries(ctx), accountsChangedChan)
}
valActivated := v.checkAndLogValidatorStatus(res.Statuses)

Expand Down
25 changes: 12 additions & 13 deletions validator/client/wait_for_activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package client
import (
"context"
"fmt"
constant "github.com/prysmaticlabs/prysm/validator/testing"
"testing"
"time"

Expand All @@ -16,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil/require"
walletMock "github.com/prysmaticlabs/prysm/validator/accounts/testing"
"github.com/prysmaticlabs/prysm/validator/keymanager/derived"
constant "github.com/prysmaticlabs/prysm/validator/testing"
logTest "github.com/sirupsen/logrus/hooks/test"
"github.com/tyler-smith/go-bip39"
util "github.com/wealdtech/go-eth2-util"
Expand Down Expand Up @@ -52,7 +52,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, make(chan struct{})))
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx))
}

func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestWaitActivation_StreamSetupFails_AttemptsToReconnect(t *testing.T) {
resp := generateMockStatusResponse([][]byte{pubKey[:]})
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
clientStream.EXPECT().Recv().Return(resp, nil)
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
assert.NoError(t, v.WaitForActivation(context.Background()))
}

func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testing.T) {
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestWaitForActivation_ReceiveErrorFromStream_AttemptsReconnection(t *testin
nil,
errors.New("fails"),
).Return(resp, nil)
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
assert.NoError(t, v.WaitForActivation(context.Background()))
}

func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestWaitActivation_LogsActivationEpochOK(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
assert.NoError(t, v.WaitForActivation(context.Background()), "Could not wait for activation")
assert.LogsContain(t, hook, "Validator activated")
}

Expand Down Expand Up @@ -188,7 +188,7 @@ func TestWaitForActivation_Exiting(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})))
assert.NoError(t, v.WaitForActivation(context.Background()))
}

func TestWaitForActivation_RefetchKeys(t *testing.T) {
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), make(chan struct{})), "Could not wait for activation")
assert.NoError(t, v.waitForActivation(context.Background(), make(chan [][48]byte)), "Could not wait for activation")
assert.LogsContain(t, hook, msgNoKeysFetched)
assert.LogsContain(t, hook, "Validator activated")
}
Expand Down Expand Up @@ -291,15 +291,14 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
nil,
)

channel := make(chan struct{})
go func() {
// We add the active key into the keymanager and simulate a key refresh.
time.Sleep(time.Second * 1)
km.keysMap[activePubKey] = activePrivKey
channel <- struct{}{}
km.SimulateAccountChanges()
}()

assert.NoError(t, v.WaitForActivation(context.Background(), channel))
assert.NoError(t, v.WaitForActivation(context.Background()))
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
assert.LogsContain(t, hook, "Validator activated")
})
Expand Down Expand Up @@ -365,16 +364,16 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
nil,
)

channel := make(chan struct{})
channel := make(chan [][48]byte)
go func() {
// We add the active key into the keymanager and simulate a key refresh.
time.Sleep(time.Second * 1)
err = km.RecoverAccountsFromMnemonic(ctx, constant.TestMnemonic, "", 2)
require.NoError(t, err)
channel <- struct{}{}
channel <- [][48]byte{}
}()

assert.NoError(t, v.WaitForActivation(context.Background(), channel))
assert.NoError(t, v.waitForActivation(context.Background(), channel))
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
assert.LogsContain(t, hook, "Validator activated")
})
Expand Down
2 changes: 1 addition & 1 deletion validator/keymanager/derived/keymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package derived
import (
"context"
"fmt"
constant "github.com/prysmaticlabs/prysm/validator/testing"
"testing"

validatorpb "github.com/prysmaticlabs/prysm/proto/validator/accounts/v2"
Expand All @@ -12,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
mock "github.com/prysmaticlabs/prysm/validator/accounts/testing"
constant "github.com/prysmaticlabs/prysm/validator/testing"
"github.com/tyler-smith/go-bip39"
util "github.com/wealdtech/go-eth2-util"
)
Expand Down

0 comments on commit f074c5e

Please sign in to comment.