Skip to content

Commit

Permalink
Improve wait for activation (#13448)
Browse files Browse the repository at this point in the history
* removing timeout on wait for activation, instead switched to an event driven approach

* fixing unit tests

* linting

* simplifying return

* adding sleep for the remaining slot to avoid cpu spikes

* removing ifstatement on log

* removing ifstatement on log

* improving switch statement

* removing the loop entirely

* fixing unit test

* fixing manu's reported issue with deletion of json file

* missed change around writefile at path

* gofmt

* fixing deepsource issue with reading file

* trying to clean file to avoid deepsource issue

* still getting error trying a different approach

* fixing stream loop

* fixing unit test

* Update validator/keymanager/local/keymanager.go

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* fixing linting

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
  • Loading branch information
james-prysm and nalepae committed Jan 16, 2024
1 parent 46387a9 commit 790a09f
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 160 deletions.
2 changes: 1 addition & 1 deletion validator/accounts/iface/wallet.go
Expand Up @@ -23,7 +23,7 @@ type Wallet interface {
// Read methods for important wallet and accounts-related files.
ReadFileAtPath(ctx context.Context, filePath string, fileName string) ([]byte, error)
// Write methods to persist important wallet and accounts-related files to disk.
WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) error
WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) (bool, error)
// Method for initializing a new keymanager.
InitializeKeymanager(ctx context.Context, cfg InitKeymanagerConfig) (keymanager.IKeymanager, error)
}
6 changes: 3 additions & 3 deletions validator/accounts/testing/mock.go
Expand Up @@ -55,19 +55,19 @@ func (w *Wallet) Password() string {
}

// WriteFileAtPath --
func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) error {
func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) (bool, error) {
w.lock.Lock()
defer w.lock.Unlock()
if w.HasWriteFileError {
// reset the flag to not contaminate other tests
w.HasWriteFileError = false
return errors.New("could not write keystore file for accounts")
return false, errors.New("could not write keystore file for accounts")
}
if w.Files[pathName] == nil {
w.Files[pathName] = make(map[string][]byte)
}
w.Files[pathName][fileName] = data
return nil
return true, nil
}

// ReadFileAtPath --
Expand Down
11 changes: 6 additions & 5 deletions validator/accounts/wallet/wallet.go
Expand Up @@ -366,26 +366,27 @@ func (w *Wallet) InitializeKeymanager(ctx context.Context, cfg iface.InitKeymana
}

// WriteFileAtPath within the wallet directory given the desired path, filename, and raw data.
func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) error {
func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) (bool /* exited previously */, error) {
accountPath := filepath.Join(w.accountsPath, filePath)
hasDir, err := file.HasDir(accountPath)
if err != nil {
return err
return false, err
}
if !hasDir {
if err := file.MkdirAll(accountPath); err != nil {
return errors.Wrapf(err, "could not create path: %s", accountPath)
return false, errors.Wrapf(err, "could not create path: %s", accountPath)
}
}
fullPath := filepath.Join(accountPath, fileName)
existedPreviously := file.Exists(fullPath)
if err := file.WriteFile(fullPath, data); err != nil {
return errors.Wrapf(err, "could not write %s", filePath)
return false, errors.Wrapf(err, "could not write %s", filePath)
}
log.WithFields(logrus.Fields{
"path": fullPath,
"fileName": fileName,
}).Debug("Wrote new file at path")
return nil
return existedPreviously, nil
}

// ReadFileAtPath within the wallet directory given the desired path and filename.
Expand Down
3 changes: 2 additions & 1 deletion validator/accounts/wallet_create.go
Expand Up @@ -32,7 +32,8 @@ func (acm *CLIManager) WalletCreate(ctx context.Context) (*wallet.Wallet, error)
if err != nil {
return nil, err
}
if err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); err != nil {
_, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts)
if err != nil {
return nil, err
}
log.WithField("--wallet-dir", acm.walletDir).Info(
Expand Down
7 changes: 1 addition & 6 deletions validator/client/key_reload.go
Expand Up @@ -48,10 +48,5 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar
valCount = int64(valCounts[0].Count)
}

anyActive = v.checkAndLogValidatorStatus(statuses, valCount)
if anyActive {
logActiveValidatorStatus(statuses)
}

return anyActive, nil
return v.checkAndLogValidatorStatus(statuses, valCount), nil
}
19 changes: 5 additions & 14 deletions validator/client/validator.go
Expand Up @@ -54,14 +54,13 @@ import (
// keyFetchPeriod is the frequency that we try to refetch validating keys
// in case no keys were fetched previously.
var (
keyRefetchPeriod = 30 * time.Second
ErrBuilderValidatorRegistration = errors.New("Builder API validator registration unsuccessful")
ErrValidatorsAllExited = errors.New("All validators are exited, no more work to perform...")
)

var (
msgCouldNotFetchKeys = "could not fetch validating keys"
msgNoKeysFetched = "No validating keys fetched. Trying again"
msgNoKeysFetched = "No validating keys fetched. Waiting for keys..."
)

type validator struct {
Expand Down Expand Up @@ -403,6 +402,10 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
}
case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING:
validatorActivated = true
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)),
"index": status.index,
}).Info("Validator activated")
case ethpb.ValidatorStatus_EXITED:
log.Info("Validator exited")
case ethpb.ValidatorStatus_INVALID:
Expand All @@ -416,18 +419,6 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
return validatorActivated
}

func logActiveValidatorStatus(statuses []*validatorStatus) {
for _, s := range statuses {
if s.status.Status != ethpb.ValidatorStatus_ACTIVE {
continue
}
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(s.publicKey)),
"index": s.index,
}).Info("Validator activated")
}
}

// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) {
Expand Down
37 changes: 0 additions & 37 deletions validator/client/validator_test.go
Expand Up @@ -388,43 +388,6 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
require.LogsContain(t, hook, "Validator activated")
}

func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
validatorClient := validatormock.NewMockValidatorClient(ctrl)
beaconClient := validatormock.NewMockBeaconChainClient(ctrl)
prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl)

kp := randKeypair(t)
v := validator{
validatorClient: validatorClient,
keyManager: newMockKeymanager(t, kp),
beaconClient: beaconClient,
prysmBeaconClient: prysmBeaconClient,
}
resp := generateMockStatusResponse([][]byte{kp.pub[:]})
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
clientStream := mock2.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
validatorClient.EXPECT().WaitForActivation(
gomock.Any(),
gomock.Any(),
).Return(clientStream, nil)
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
[]validatorType.Status{validatorType.Active},
).Return([]iface.ValidatorCount{}, nil).Times(2)
clientStream.EXPECT().Recv().Return(
&ethpb.ValidatorActivationResponse{},
nil,
)
clientStream.EXPECT().Recv().Return(
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation")
}

func TestWaitSync_ContextCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
72 changes: 23 additions & 49 deletions validator/client/wait_for_activation.go
Expand Up @@ -5,17 +5,14 @@ import (
"io"
"time"

validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"

"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/math"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"go.opencensus.io/trace"
)

Expand All @@ -33,18 +30,18 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
if err != nil {
return err
}
// subscribe to the channel if it's the first time
sub := km.SubscribeAccountChanges(accountsChangedChan)
defer func() {
sub.Unsubscribe()
close(accountsChangedChan)
}()
}

return v.internalWaitForActivation(ctx, accountsChangedChan)
}

// internalWaitForActivation performs the following:
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
// 1) While the key manager is empty, subscribe to keymanager changes until some validator keys exist.
// 2) Open a server side stream for activation events against the given keys.
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
// the accountsChangedChan. When an event signal is received, restart the internalWaitForActivation routine.
Expand All @@ -53,39 +50,26 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()

validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch validating keys")
return errors.Wrap(err, msgCouldNotFetchKeys)
}
// if there are no validating keys, wait for some
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)

ticker := time.NewTicker(keyRefetchPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)
continue
}
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
}
break
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// if the accounts changed try it again
return v.internalWaitForActivation(ctx, accountsChangedChan)
}
}

req := &ethpb.ValidatorActivationRequest{
stream, err := v.validatorClient.WaitForActivation(ctx, &ethpb.ValidatorActivationRequest{
PublicKeys: bytesutil.FromBytes48Array(validatingKeys),
}
stream, err := v.validatorClient.WaitForActivation(ctx, req)
})
if err != nil {
tracing.AnnotateError(span, err)
attempts := streamAttempts(ctx)
Expand All @@ -96,22 +80,17 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
}

if err = v.handleAccountsChanged(ctx, accountsChangedChan, &stream, span); err != nil {
return err
}

v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
return nil
}

func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte, stream *ethpb.BeaconNodeValidator_WaitForActivationClient, span *trace.Span) error {
for {
someAreActive := false
for !someAreActive {
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.internalWaitForActivation(ctx, accountsChangedChan)
default:
res, err := (*stream).Recv()
res, err := (stream).Recv() // retrieve from stream one loop at a time
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
Expand Down Expand Up @@ -150,15 +129,10 @@ func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedCh
valCount = int64(valCounts[0].Count)
}

valActivated := v.checkAndLogValidatorStatus(statuses, valCount)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
continue
}
someAreActive = v.checkAndLogValidatorStatus(statuses, valCount)
}
break
}

return nil
}

Expand Down

0 comments on commit 790a09f

Please sign in to comment.