Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve wait for activation #13448

Merged
merged 26 commits into from Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
723ad4b
removing timeout on wait for activation, instead switched to an event…
james-prysm Jan 10, 2024
c81fb14
fixing unit tests
james-prysm Jan 10, 2024
72d6962
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 10, 2024
c0a4afd
linting
james-prysm Jan 10, 2024
f79b411
simplifying return
james-prysm Jan 11, 2024
d8b3be4
adding sleep for the remaining slot to avoid cpu spikes
james-prysm Jan 11, 2024
5d141df
removing ifstatement on log
james-prysm Jan 11, 2024
9435b40
removing ifstatement on log
james-prysm Jan 11, 2024
58eab7b
improving switch statement
james-prysm Jan 11, 2024
e9d6615
removing the loop entirely
james-prysm Jan 11, 2024
4bf7cf9
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 11, 2024
d5326f5
fixing unit test
james-prysm Jan 11, 2024
a991302
fixing manu's reported issue with deletion of json file
james-prysm Jan 12, 2024
a587681
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 12, 2024
fc5180b
missed change around writefile at path
james-prysm Jan 12, 2024
02672d5
gofmt
james-prysm Jan 12, 2024
e3e63c6
fixing deepsource issue with reading file
james-prysm Jan 12, 2024
580cd9b
trying to clean file to avoid deepsource issue
james-prysm Jan 12, 2024
12186e3
still getting error trying a different approach
james-prysm Jan 12, 2024
1def8b8
fixing stream loop
james-prysm Jan 13, 2024
959e4a4
fixing unit test
james-prysm Jan 13, 2024
fd5cab0
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 15, 2024
cf1e5e4
Merge branch 'develop' into improve-wait-for-activation
nalepae Jan 16, 2024
90eec05
Update validator/keymanager/local/keymanager.go
james-prysm Jan 16, 2024
4fd773e
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 16, 2024
3ac8251
fixing linting
james-prysm Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion validator/accounts/iface/wallet.go
Expand Up @@ -23,7 +23,7 @@ type Wallet interface {
// Read methods for important wallet and accounts-related files.
ReadFileAtPath(ctx context.Context, filePath string, fileName string) ([]byte, error)
// Write methods to persist important wallet and accounts-related files to disk.
WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) error
WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) (bool, error)
// Method for initializing a new keymanager.
InitializeKeymanager(ctx context.Context, cfg InitKeymanagerConfig) (keymanager.IKeymanager, error)
}
6 changes: 3 additions & 3 deletions validator/accounts/testing/mock.go
Expand Up @@ -55,19 +55,19 @@ func (w *Wallet) Password() string {
}

// WriteFileAtPath --
func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) error {
func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) (bool, error) {
w.lock.Lock()
defer w.lock.Unlock()
if w.HasWriteFileError {
// reset the flag to not contaminate other tests
w.HasWriteFileError = false
return errors.New("could not write keystore file for accounts")
return false, errors.New("could not write keystore file for accounts")
}
if w.Files[pathName] == nil {
w.Files[pathName] = make(map[string][]byte)
}
w.Files[pathName][fileName] = data
return nil
return true, nil
}

// ReadFileAtPath --
Expand Down
11 changes: 6 additions & 5 deletions validator/accounts/wallet/wallet.go
Expand Up @@ -366,26 +366,27 @@ func (w *Wallet) InitializeKeymanager(ctx context.Context, cfg iface.InitKeymana
}

// WriteFileAtPath within the wallet directory given the desired path, filename, and raw data.
func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) error {
func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) (bool /* exited previously */, error) {
accountPath := filepath.Join(w.accountsPath, filePath)
hasDir, err := file.HasDir(accountPath)
if err != nil {
return err
return false, err
}
if !hasDir {
if err := file.MkdirAll(accountPath); err != nil {
return errors.Wrapf(err, "could not create path: %s", accountPath)
return false, errors.Wrapf(err, "could not create path: %s", accountPath)
}
}
fullPath := filepath.Join(accountPath, fileName)
existedPreviously := file.Exists(fullPath)
if err := file.WriteFile(fullPath, data); err != nil {
return errors.Wrapf(err, "could not write %s", filePath)
return false, errors.Wrapf(err, "could not write %s", filePath)
}
log.WithFields(logrus.Fields{
"path": fullPath,
"fileName": fileName,
}).Debug("Wrote new file at path")
return nil
return existedPreviously, nil
}

// ReadFileAtPath within the wallet directory given the desired path and filename.
Expand Down
3 changes: 2 additions & 1 deletion validator/accounts/wallet_create.go
Expand Up @@ -32,7 +32,8 @@ func (acm *CLIManager) WalletCreate(ctx context.Context) (*wallet.Wallet, error)
if err != nil {
return nil, err
}
if err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); err != nil {
_, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts)
if err != nil {
return nil, err
}
log.WithField("--wallet-dir", acm.walletDir).Info(
Expand Down
7 changes: 1 addition & 6 deletions validator/client/key_reload.go
Expand Up @@ -48,10 +48,5 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar
valCount = int64(valCounts[0].Count)
}

anyActive = v.checkAndLogValidatorStatus(statuses, valCount)
if anyActive {
logActiveValidatorStatus(statuses)
}

return anyActive, nil
return v.checkAndLogValidatorStatus(statuses, valCount), nil
}
19 changes: 5 additions & 14 deletions validator/client/validator.go
Expand Up @@ -54,14 +54,13 @@ import (
// keyFetchPeriod is the frequency that we try to refetch validating keys
// in case no keys were fetched previously.
var (
keyRefetchPeriod = 30 * time.Second
ErrBuilderValidatorRegistration = errors.New("Builder API validator registration unsuccessful")
ErrValidatorsAllExited = errors.New("All validators are exited, no more work to perform...")
)

var (
msgCouldNotFetchKeys = "could not fetch validating keys"
msgNoKeysFetched = "No validating keys fetched. Trying again"
msgNoKeysFetched = "No validating keys fetched. Waiting for keys..."
)

type validator struct {
Expand Down Expand Up @@ -403,6 +402,10 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
}
case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING:
validatorActivated = true
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)),
"index": status.index,
}).Info("Validator activated")
case ethpb.ValidatorStatus_EXITED:
log.Info("Validator exited")
case ethpb.ValidatorStatus_INVALID:
Expand All @@ -416,18 +419,6 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
return validatorActivated
}

func logActiveValidatorStatus(statuses []*validatorStatus) {
for _, s := range statuses {
if s.status.Status != ethpb.ValidatorStatus_ACTIVE {
continue
}
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(s.publicKey)),
"index": s.index,
}).Info("Validator activated")
}
}

// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) {
Expand Down
37 changes: 0 additions & 37 deletions validator/client/validator_test.go
Expand Up @@ -388,43 +388,6 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) {
require.LogsContain(t, hook, "Validator activated")
}

func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
validatorClient := validatormock.NewMockValidatorClient(ctrl)
beaconClient := validatormock.NewMockBeaconChainClient(ctrl)
prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl)

kp := randKeypair(t)
v := validator{
validatorClient: validatorClient,
keyManager: newMockKeymanager(t, kp),
beaconClient: beaconClient,
prysmBeaconClient: prysmBeaconClient,
}
resp := generateMockStatusResponse([][]byte{kp.pub[:]})
resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE
clientStream := mock2.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)
validatorClient.EXPECT().WaitForActivation(
gomock.Any(),
gomock.Any(),
).Return(clientStream, nil)
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
[]validatorType.Status{validatorType.Active},
).Return([]iface.ValidatorCount{}, nil).Times(2)
clientStream.EXPECT().Recv().Return(
&ethpb.ValidatorActivationResponse{},
nil,
)
clientStream.EXPECT().Recv().Return(
resp,
nil,
)
assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation")
}

func TestWaitSync_ContextCanceled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
72 changes: 23 additions & 49 deletions validator/client/wait_for_activation.go
Expand Up @@ -5,17 +5,14 @@ import (
"io"
"time"

validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"

"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/math"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"go.opencensus.io/trace"
)

Expand All @@ -33,18 +30,18 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
if err != nil {
return err
}
// subscribe to the channel if it's the first time
sub := km.SubscribeAccountChanges(accountsChangedChan)
defer func() {
sub.Unsubscribe()
close(accountsChangedChan)
}()
}

return v.internalWaitForActivation(ctx, accountsChangedChan)
}

// internalWaitForActivation performs the following:
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
// 1) While the key manager is empty, subscribe to keymanager changes until some validator keys exist.
// 2) Open a server side stream for activation events against the given keys.
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
// the accountsChangedChan. When an event signal is received, restart the internalWaitForActivation routine.
Expand All @@ -53,39 +50,26 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()

validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch validating keys")
return errors.Wrap(err, msgCouldNotFetchKeys)
}
// if there are no validating keys, wait for some
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)

ticker := time.NewTicker(keyRefetchPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)
continue
}
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
}
break
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// if the accounts changed try it again
return v.internalWaitForActivation(ctx, accountsChangedChan)
}
}

req := &ethpb.ValidatorActivationRequest{
stream, err := v.validatorClient.WaitForActivation(ctx, &ethpb.ValidatorActivationRequest{
PublicKeys: bytesutil.FromBytes48Array(validatingKeys),
}
stream, err := v.validatorClient.WaitForActivation(ctx, req)
})
if err != nil {
tracing.AnnotateError(span, err)
attempts := streamAttempts(ctx)
Expand All @@ -96,22 +80,17 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
}

if err = v.handleAccountsChanged(ctx, accountsChangedChan, &stream, span); err != nil {
return err
}

v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
return nil
}

func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte, stream *ethpb.BeaconNodeValidator_WaitForActivationClient, span *trace.Span) error {
for {
someAreActive := false
for !someAreActive {
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
return v.internalWaitForActivation(ctx, accountsChangedChan)
default:
res, err := (*stream).Recv()
res, err := (stream).Recv() // retrieve from stream one loop at a time
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
Expand Down Expand Up @@ -150,15 +129,10 @@ func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedCh
valCount = int64(valCounts[0].Count)
}

valActivated := v.checkAndLogValidatorStatus(statuses, valCount)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
continue
}
someAreActive = v.checkAndLogValidatorStatus(statuses, valCount)
}
break
}

return nil
}

Expand Down