Skip to content

Commit 2fdfda2

Browse files
rkapkarauljordanprylabs-bulldozer[bot]
authored
Fix remote keymanager's dynamic key reload (#8817)
* Fix remote keymanager's dynamic key reload * wait for activation test * runner test * rename mock creation func * fix compile error Co-authored-by: Raul Jordan <raul@prysmaticlabs.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
1 parent a063952 commit 2fdfda2

File tree

5 files changed

+132
-37
lines changed

5 files changed

+132
-37
lines changed

validator/client/runner.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prysmaticlabs/prysm/shared/featureconfig"
1414
"github.com/prysmaticlabs/prysm/shared/params"
1515
"github.com/prysmaticlabs/prysm/validator/client/iface"
16+
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
1617
"go.opencensus.io/trace"
1718
"google.golang.org/grpc/codes"
1819
"google.golang.org/grpc/status"
@@ -135,6 +136,14 @@ func run(ctx context.Context, v iface.Validator) {
135136
case slot := <-v.NextSlot():
136137
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))
137138

139+
remoteKm, ok := v.GetKeymanager().(remote.RemoteKeymanager)
140+
if ok {
141+
_, err := remoteKm.ReloadPublicKeys(ctx)
142+
if err != nil {
143+
log.WithError(err).Error(msgCouldNotFetchKeys)
144+
}
145+
}
146+
138147
allExited, err := v.AllValidatorsAreExited(ctx)
139148
if err != nil {
140149
log.WithError(err).Error("Could not check if validators are exited")

validator/client/runner_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prysmaticlabs/prysm/shared/testutil/require"
1414
"github.com/prysmaticlabs/prysm/validator/client/iface"
1515
"github.com/prysmaticlabs/prysm/validator/client/testutil"
16+
"github.com/prysmaticlabs/prysm/validator/keymanager/remote"
1617
logTest "github.com/sirupsen/logrus/hooks/test"
1718
)
1819

@@ -238,3 +239,19 @@ func TestKeyReload_NoActiveKey(t *testing.T) {
238239
assert.Equal(t, true, v.HandleKeyReloadCalled)
239240
assert.Equal(t, 2, v.WaitForActivationCalled)
240241
}
242+
243+
func TestKeyReload_RemoteKeymanager(t *testing.T) {
244+
ctx, cancel := context.WithCancel(context.Background())
245+
km := remote.NewMock()
246+
v := &testutil.FakeValidator{Keymanager: &km}
247+
248+
ticker := make(chan types.Slot)
249+
v.NextSlotRet = ticker
250+
go func() {
251+
ticker <- types.Slot(55)
252+
253+
cancel()
254+
}()
255+
run(ctx, v)
256+
assert.Equal(t, true, km.ReloadPublicKeysCalled)
257+
}

validator/client/wait_for_activation.go

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -91,39 +91,49 @@ func (v *validator) waitForActivation(ctx context.Context, accountsChangedChan <
9191

9292
remoteKm, ok := v.keyManager.(remote.RemoteKeymanager)
9393
if ok {
94-
for range v.NextSlot() {
95-
if ctx.Err() == context.Canceled {
96-
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore")
97-
}
94+
for {
95+
select {
96+
case <-accountsChangedChan:
97+
// Accounts (keys) changed, restart the process.
98+
return v.waitForActivation(ctx, accountsChangedChan)
99+
case <-v.NextSlot():
100+
if ctx.Err() == context.Canceled {
101+
return errors.Wrap(ctx.Err(), "context canceled, not waiting for activation anymore")
102+
}
98103

99-
validatingKeys, err = remoteKm.ReloadPublicKeys(ctx)
100-
if err != nil {
101-
return errors.Wrap(err, msgCouldNotFetchKeys)
102-
}
103-
statusRequestKeys := make([][]byte, len(validatingKeys))
104-
for i := range validatingKeys {
105-
statusRequestKeys[i] = validatingKeys[i][:]
106-
}
107-
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{
108-
PublicKeys: statusRequestKeys,
109-
})
110-
if err != nil {
111-
return err
112-
}
113-
statuses := make([]*validatorStatus, len(resp.Statuses))
114-
for i, s := range resp.Statuses {
115-
statuses[i] = &validatorStatus{
116-
publicKey: resp.PublicKeys[i],
117-
status: s,
118-
index: resp.Indices[i],
104+
log.Error("Before ReloadPublicKeys")
105+
validatingKeys, err = remoteKm.ReloadPublicKeys(ctx)
106+
if err != nil {
107+
return errors.Wrap(err, msgCouldNotFetchKeys)
108+
}
109+
log.Error("After ReloadPublicKeys")
110+
statusRequestKeys := make([][]byte, len(validatingKeys))
111+
for i := range validatingKeys {
112+
statusRequestKeys[i] = validatingKeys[i][:]
113+
}
114+
resp, err := v.validatorClient.MultipleValidatorStatus(ctx, &ethpb.MultipleValidatorStatusRequest{
115+
PublicKeys: statusRequestKeys,
116+
})
117+
if err != nil {
118+
return err
119+
}
120+
statuses := make([]*validatorStatus, len(resp.Statuses))
121+
for i, s := range resp.Statuses {
122+
statuses[i] = &validatorStatus{
123+
publicKey: resp.PublicKeys[i],
124+
status: s,
125+
index: resp.Indices[i],
126+
}
119127
}
120-
}
121128

122-
valActivated := v.checkAndLogValidatorStatus(statuses)
123-
if valActivated {
124-
logActiveValidatorStatus(statuses)
125-
break
129+
valActivated := v.checkAndLogValidatorStatus(statuses)
130+
if valActivated {
131+
logActiveValidatorStatus(statuses)
132+
} else {
133+
continue
134+
}
126135
}
136+
break
127137
}
128138
} else {
129139
for {

validator/client/wait_for_activation_test.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,8 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
397397

398398
inactiveKey := bytesutil.ToBytes48([]byte("inactive"))
399399
activeKey := bytesutil.ToBytes48([]byte("active"))
400-
km := &remote.MockKeymanager{
401-
PublicKeys: [][48]byte{inactiveKey, activeKey},
402-
}
400+
km := remote.NewMock()
401+
km.PublicKeys = [][48]byte{inactiveKey, activeKey}
403402
slot := types.Slot(0)
404403

405404
t.Run("activated", func(t *testing.T) {
@@ -412,7 +411,7 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
412411
}
413412
v := validator{
414413
validatorClient: client,
415-
keyManager: km,
414+
keyManager: &km,
416415
ticker: ticker,
417416
}
418417
go func() {
@@ -447,7 +446,7 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
447446
}
448447
v := validator{
449448
validatorClient: client,
450-
keyManager: km,
449+
keyManager: &km,
451450
ticker: ticker,
452451
}
453452
go func() {
@@ -458,4 +457,52 @@ func TestWaitForActivation_RemoteKeymanager(t *testing.T) {
458457
err := v.waitForActivation(ctx, nil /* accountsChangedChan */)
459458
assert.ErrorContains(t, "context canceled, not waiting for activation anymore", err)
460459
})
460+
t.Run("reloaded", func(t *testing.T) {
461+
ctx, cancel := context.WithCancel(context.Background())
462+
hook := logTest.NewGlobal()
463+
remoteKm := remote.NewMock()
464+
remoteKm.PublicKeys = [][48]byte{inactiveKey}
465+
466+
tickerChan := make(chan types.Slot)
467+
ticker := &slotutilmock.MockTicker{
468+
Channel: tickerChan,
469+
}
470+
v := validator{
471+
validatorClient: client,
472+
keyManager: &remoteKm,
473+
ticker: ticker,
474+
}
475+
go func() {
476+
tickerChan <- slot
477+
time.Sleep(time.Second)
478+
remoteKm.PublicKeys = [][48]byte{inactiveKey, activeKey}
479+
tickerChan <- slot
480+
// Cancel after timeout to avoid waiting on channel forever in case test goes wrong.
481+
time.Sleep(time.Second)
482+
cancel()
483+
}()
484+
485+
resp := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:]})
486+
resp.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
487+
client.EXPECT().MultipleValidatorStatus(
488+
gomock.Any(),
489+
&ethpb.MultipleValidatorStatusRequest{
490+
PublicKeys: [][]byte{inactiveKey[:]},
491+
},
492+
).Return(resp, nil /* err */)
493+
resp2 := testutil.GenerateMultipleValidatorStatusResponse([][]byte{inactiveKey[:], activeKey[:]})
494+
resp2.Statuses[0].Status = ethpb.ValidatorStatus_UNKNOWN_STATUS
495+
resp2.Statuses[1].Status = ethpb.ValidatorStatus_ACTIVE
496+
client.EXPECT().MultipleValidatorStatus(
497+
gomock.Any(),
498+
&ethpb.MultipleValidatorStatusRequest{
499+
PublicKeys: [][]byte{inactiveKey[:], activeKey[:]},
500+
},
501+
).Return(resp2, nil /* err */)
502+
503+
err := v.waitForActivation(ctx, remoteKm.ReloadPublicKeysChan /* accountsChangedChan */)
504+
require.NoError(t, err)
505+
assert.LogsContain(t, hook, "Waiting for deposit to be observed by beacon node")
506+
assert.LogsContain(t, hook, "Validator activated")
507+
})
461508
}

validator/keymanager/remote/mock_keymanager.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,17 @@ import (
1010

1111
// MockKeymanager --
1212
type MockKeymanager struct {
13-
PublicKeys [][48]byte
13+
PublicKeys [][48]byte
14+
ReloadPublicKeysChan chan [][48]byte
15+
ReloadPublicKeysCalled bool
16+
accountsChangedFeed *event.Feed
17+
}
18+
19+
func NewMock() MockKeymanager {
20+
return MockKeymanager{
21+
accountsChangedFeed: new(event.Feed),
22+
ReloadPublicKeysChan: make(chan [][48]byte, 1),
23+
}
1424
}
1525

1626
// FetchValidatingPublicKeys --
@@ -24,11 +34,13 @@ func (*MockKeymanager) Sign(context.Context, *validatorpb.SignRequest) (bls.Sign
2434
}
2535

2636
// SubscribeAccountChanges --
27-
func (*MockKeymanager) SubscribeAccountChanges(chan [][48]byte) event.Subscription {
28-
panic("implement me")
37+
func (m *MockKeymanager) SubscribeAccountChanges(chan [][48]byte) event.Subscription {
38+
return m.accountsChangedFeed.Subscribe(m.ReloadPublicKeysChan)
2939
}
3040

3141
// ReloadPublicKeys --
3242
func (m *MockKeymanager) ReloadPublicKeys(context.Context) ([][48]byte, error) {
43+
m.ReloadPublicKeysCalled = true
44+
m.ReloadPublicKeysChan <- m.PublicKeys
3345
return m.PublicKeys, nil
3446
}

0 commit comments

Comments
 (0)