Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve wait for activation #13448

Merged
merged 26 commits into from Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
723ad4b
removing timeout on wait for activation, instead switched to an event…
james-prysm Jan 10, 2024
c81fb14
fixing unit tests
james-prysm Jan 10, 2024
72d6962
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 10, 2024
c0a4afd
linting
james-prysm Jan 10, 2024
f79b411
simplifying return
james-prysm Jan 11, 2024
d8b3be4
adding sleep for the remaining slot to avoid cpu spikes
james-prysm Jan 11, 2024
5d141df
removing ifstatement on log
james-prysm Jan 11, 2024
9435b40
removing ifstatement on log
james-prysm Jan 11, 2024
58eab7b
improving switch statement
james-prysm Jan 11, 2024
e9d6615
removing the loop entirely
james-prysm Jan 11, 2024
4bf7cf9
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 11, 2024
d5326f5
fixing unit test
james-prysm Jan 11, 2024
a991302
fixing manu's reported issue with deletion of json file
james-prysm Jan 12, 2024
a587681
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 12, 2024
fc5180b
missed change around writefile at path
james-prysm Jan 12, 2024
02672d5
gofmt
james-prysm Jan 12, 2024
e3e63c6
fixing deepsource issue with reading file
james-prysm Jan 12, 2024
580cd9b
trying to clean file to avoid deepsource issue
james-prysm Jan 12, 2024
12186e3
still getting error trying a different approach
james-prysm Jan 12, 2024
1def8b8
fixing stream loop
james-prysm Jan 13, 2024
959e4a4
fixing unit test
james-prysm Jan 13, 2024
fd5cab0
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 15, 2024
cf1e5e4
Merge branch 'develop' into improve-wait-for-activation
nalepae Jan 16, 2024
90eec05
Update validator/keymanager/local/keymanager.go
james-prysm Jan 16, 2024
4fd773e
Merge branch 'develop' into improve-wait-for-activation
james-prysm Jan 16, 2024
3ac8251
fixing linting
james-prysm Jan 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 1 addition & 6 deletions validator/client/key_reload.go
Expand Up @@ -48,10 +48,5 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar
valCount = int64(valCounts[0].Count)
}

anyActive = v.checkAndLogValidatorStatus(statuses, valCount)
if anyActive {
logActiveValidatorStatus(statuses)
}

return anyActive, nil
return v.checkAndLogValidatorStatus(statuses, valCount), nil
}
19 changes: 6 additions & 13 deletions validator/client/validator.go
Expand Up @@ -53,7 +53,6 @@ import (
// keyFetchPeriod is the frequency that we try to refetch validating keys
// in case no keys were fetched previously.
var (
keyRefetchPeriod = 30 * time.Second
ErrBuilderValidatorRegistration = errors.New("Builder API validator registration unsuccessful")
ErrValidatorsAllExited = errors.New("All validators are exited, no more work to perform...")
)
Expand Down Expand Up @@ -387,6 +386,12 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
}
case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING:
validatorActivated = true
if status.status.Status == ethpb.ValidatorStatus_ACTIVE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe separate the two cases ethpb.ValidatorStatus_ACTIVE and ethpb.ValidatorStatus_EXITING to avoid the

if status.status.Status == ethpb.ValidatorStatus_ACTIVE

in the switch/case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably makes more sense just not having the if statement actually. activated should mean that it's using the account

log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)),
"index": status.index,
}).Info("Validator activated")
}
case ethpb.ValidatorStatus_EXITED:
log.Info("Validator exited")
case ethpb.ValidatorStatus_INVALID:
Expand All @@ -400,18 +405,6 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti
return validatorActivated
}

func logActiveValidatorStatus(statuses []*validatorStatus) {
for _, s := range statuses {
if s.status.Status != ethpb.ValidatorStatus_ACTIVE {
continue
}
log.WithFields(logrus.Fields{
"publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(s.publicKey)),
"index": s.index,
}).Info("Validator activated")
}
}

// CanonicalHeadSlot returns the slot of canonical block currently found in the
// beacon chain via RPC.
func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) {
Expand Down
90 changes: 33 additions & 57 deletions validator/client/wait_for_activation.go
Expand Up @@ -5,17 +5,16 @@ import (
"io"
"time"

validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"

"github.com/pkg/errors"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/math"
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"go.opencensus.io/trace"
)

Expand All @@ -33,18 +32,18 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
if err != nil {
return err
}
// subscribe to the channel if it's the first time
sub := km.SubscribeAccountChanges(accountsChangedChan)
defer func() {
sub.Unsubscribe()
close(accountsChangedChan)
}()
}

return v.internalWaitForActivation(ctx, accountsChangedChan)
}

// internalWaitForActivation performs the following:
// 1) While the key manager is empty, poll the key manager until some validator keys exist.
// 1) While the key manager is empty, subscribe to keymanager changes until some validator keys exist.
// 2) Open a server side stream for activation events against the given keys.
// 3) In another go routine, the key manager is monitored for updates and emits an update event on
// the accountsChangedChan. When an event signal is received, restart the internalWaitForActivation routine.
Expand All @@ -53,65 +52,41 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c
func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error {
ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation")
defer span.End()

validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, "could not fetch validating keys")
return errors.Wrap(err, msgCouldNotFetchKeys)
}
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)

ticker := time.NewTicker(keyRefetchPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx)
if err != nil {
return errors.Wrap(err, msgCouldNotFetchKeys)
}
if len(validatingKeys) == 0 {
log.Warn(msgNoKeysFetched)
continue
}
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
}
break
}
}

req := &ethpb.ValidatorActivationRequest{
PublicKeys: bytesutil.FromBytes48Array(validatingKeys),
}
stream, err := v.validatorClient.WaitForActivation(ctx, req)
if err != nil {
tracing.AnnotateError(span, err)
attempts := streamAttempts(ctx)
log.WithError(err).WithField("attempts", attempts).
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60)))
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
}

if err = v.handleAccountsChanged(ctx, accountsChangedChan, &stream, span); err != nil {
return err
}

v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
return nil
}

func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte, stream *ethpb.BeaconNodeValidator_WaitForActivationClient, span *trace.Span) error {
// loop while there are no validator keys ...
for {
select {
case <-ctx.Done():
log.Debug("Context closed, exiting fetching validating keys")
return ctx.Err()
case <-accountsChangedChan:
// Accounts (keys) changed, restart the process.
// if the accounts changed try it again
return v.internalWaitForActivation(ctx, accountsChangedChan)
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default case doesn't do anything, we can remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually improved the efficiency as I believe golang efficiently handles the loop if this is the case

res, err := (*stream).Recv()
if len(validatingKeys) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validatingKeys is updated only once, before the for loop.
Should it not be updated inside the for loop as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the account changes the function is recursive. my understanding is that this loop only really needs to run if you have 0 active keys anyways. once you have 1 active key it exits and proceeds and subsequent key changes are handled in the HandleKeyReload function call which is the loop in runner.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I missed the recursivity.

continue
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need maybe some small sleep here? we are here in the endless for loop and if channels won't be filled for some longer period I think we can experience some CPU spike here?

stream, err := v.validatorClient.WaitForActivation(ctx, &ethpb.ValidatorActivationRequest{
PublicKeys: bytesutil.FromBytes48Array(validatingKeys),
})
if err != nil {
tracing.AnnotateError(span, err)
attempts := streamAttempts(ctx)
log.WithError(err).WithField("attempts", attempts).
Error("Stream broken while waiting for activation. Reconnecting...")
// Reconnection attempt backoff, up to 60s.
time.Sleep(time.Second * time.Duration(math.Min(uint64(attempts), 60)))
return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan)
}

// Recv polls for validator statuses
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
if errors.Is(err, io.EOF) {
break
Expand Down Expand Up @@ -150,15 +125,16 @@ func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedCh
valCount = int64(valCounts[0].Count)
}

valActivated := v.checkAndLogValidatorStatus(statuses, valCount)
if valActivated {
logActiveValidatorStatus(statuses)
} else {
if !v.checkAndLogValidatorStatus(statuses, valCount) {
continue
}
}
// If a validator is active, break out of this loop
break
}

// reset the ticker when they are all active
v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this code? The ticker is already set here:

v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it, i don't think it has a usecase if it's set elsewhere, i wish it was more clear where it was set however

return nil
}

Expand Down
41 changes: 26 additions & 15 deletions validator/client/wait_for_activation_test.go
Expand Up @@ -39,7 +39,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
beaconClient: beaconClient,
}
clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl)

ctx, cancel := context.WithCancel(context.Background())
validatorClient.EXPECT().WaitForActivation(
gomock.Any(),
&ethpb.ValidatorActivationRequest{
Expand All @@ -49,9 +49,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) {
clientStream.EXPECT().Recv().Return(
&ethpb.ValidatorActivationResponse{},
nil,
)
ctx, cancel := context.WithCancel(context.Background())
cancel()
).Do(func() { cancel() })
assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, nil))
}

Expand Down Expand Up @@ -193,12 +191,6 @@ func TestWaitForActivation_Exiting(t *testing.T) {
}

func TestWaitForActivation_RefetchKeys(t *testing.T) {
originalPeriod := keyRefetchPeriod
defer func() {
keyRefetchPeriod = originalPeriod
}()
keyRefetchPeriod = 1 * time.Second

hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand All @@ -207,8 +199,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl)

kp := randKeypair(t)
km := newMockKeymanager(t, kp)
km.fetchNoKeys = true
km := newMockKeymanager(t)

v := validator{
validatorClient: validatorClient,
Expand All @@ -233,7 +224,19 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) {
clientStream.EXPECT().Recv().Return(
resp,
nil)
assert.NoError(t, v.internalWaitForActivation(context.Background(), make(chan [][fieldparams.BLSPubkeyLength]byte)), "Could not wait for activation")
accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte)
sub := km.SubscribeAccountChanges(accountChan)
defer func() {
sub.Unsubscribe()
close(accountChan)
}()
// update the accounts after a delay
go func() {
time.Sleep(2 * time.Second)
require.NoError(t, km.add(kp))
km.SimulateAccountChanges([][48]byte{kp.pub})
}()
assert.NoError(t, v.internalWaitForActivation(context.Background(), accountChan), "Could not wait for activation")
assert.LogsContain(t, hook, msgNoKeysFetched)
assert.LogsContain(t, hook, "Validator activated")
}
Expand Down Expand Up @@ -265,7 +268,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactive.pub[:]},
},
).Return(inactiveClientStream, nil)
).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) {
//delay a bit so that other key can be added
time.Sleep(time.Second * 2)
return inactiveClientStream, nil
})
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
Expand Down Expand Up @@ -353,7 +360,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) {
&ethpb.ValidatorActivationRequest{
PublicKeys: [][]byte{inactivePubKey[:]},
},
).Return(inactiveClientStream, nil)
).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) {
//delay a bit so that other key can be added
time.Sleep(time.Second * 2)
return inactiveClientStream, nil
})
prysmBeaconClient.EXPECT().GetValidatorCount(
gomock.Any(),
"head",
Expand Down