Skip to content

Commit

Permalink
Make validator stable when beacon node goes offline (#8278)
Browse files Browse the repository at this point in the history
* Make validator stable POC

* fix feedback raul and nishant

* fix wait till first iteration

* fix imports

* retry tests

* fix init

* test retry receive blocks

* remove redundant return statement

* terence feedback

* terence feedback

* remove log

* to check for context after fist call

* remove fatal

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
shayzluf and rauljordan committed Feb 1, 2021
1 parent d53fdcf commit 4595789
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 39 deletions.
37 changes: 28 additions & 9 deletions validator/client/mock_validator.go
Expand Up @@ -15,12 +15,8 @@ var _ Validator = (*FakeValidator)(nil)
type FakeValidator struct {
DoneCalled bool
WaitForWalletInitializationCalled bool
WaitForActivationCalled bool
WaitForChainStartCalled bool
WaitForSyncCalled bool
SlasherReadyCalled bool
NextSlotCalled bool
CanonicalHeadSlotCalled bool
UpdateDutiesCalled bool
UpdateProtectionsCalled bool
RoleAtCalled bool
Expand All @@ -30,6 +26,12 @@ type FakeValidator struct {
SaveProtectionsCalled bool
DeleteProtectionCalled bool
SlotDeadlineCalled bool
WaitForChainStartCalled int
WaitForSyncCalled int
WaitForActivationCalled int
CanonicalHeadSlotCalled int
ReceiveBlocksCalled int
RetryTillSuccess int
ProposeBlockArg1 uint64
AttestToBlockHeadArg1 uint64
RoleAtArg1 uint64
Expand Down Expand Up @@ -62,19 +64,28 @@ func (fv *FakeValidator) WaitForWalletInitialization(_ context.Context) error {

// WaitForChainStart for mocking.
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
fv.WaitForChainStartCalled = true
fv.WaitForChainStartCalled++
if fv.RetryTillSuccess >= fv.WaitForChainStartCalled {
return errConnectionIssue
}
return nil
}

// WaitForActivation for mocking.
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ chan struct{}) error {
fv.WaitForActivationCalled = true
fv.WaitForActivationCalled++
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
return errConnectionIssue
}
return nil
}

// WaitForSync for mocking.
func (fv *FakeValidator) WaitForSync(_ context.Context) error {
fv.WaitForSyncCalled = true
fv.WaitForSyncCalled++
if fv.RetryTillSuccess >= fv.WaitForSyncCalled {
return errConnectionIssue
}
return nil
}

Expand All @@ -86,7 +97,10 @@ func (fv *FakeValidator) SlasherReady(_ context.Context) error {

// CanonicalHeadSlot for mocking.
func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (uint64, error) {
fv.CanonicalHeadSlotCalled = true
fv.CanonicalHeadSlotCalled++
if fv.RetryTillSuccess > fv.CanonicalHeadSlotCalled {
return 0, errConnectionIssue
}
return 0, nil
}

Expand Down Expand Up @@ -195,4 +209,9 @@ func (fv *FakeValidator) GetKeymanager() keymanager.IKeymanager {
}

// ReceiveBlocks for mocking
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context) {}
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error) {
fv.ReceiveBlocksCalled++
if fv.RetryTillSuccess > fv.ReceiveBlocksCalled {
connectionErrorChannel <- errConnectionIssue
}
}
87 changes: 69 additions & 18 deletions validator/client/runner.go
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
Expand All @@ -16,6 +17,10 @@ import (
"google.golang.org/grpc/status"
)

// time to wait before trying to reconnect with beacon node.
var backOffPeriod = 10 * time.Second
var errConnectionIssue = errors.New("could not connect")

// Validator interface defines the primary methods of a validator client.
type Validator interface {
Done()
Expand All @@ -38,7 +43,7 @@ type Validator interface {
WaitForWalletInitialization(ctx context.Context) error
AllValidatorsAreExited(ctx context.Context) (bool, error)
GetKeymanager() keymanager.IKeymanager
ReceiveBlocks(ctx context.Context)
ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error)
}

// Run the main validator routine. This routine exits if the context is
Expand All @@ -64,37 +69,79 @@ func run(ctx context.Context, v Validator) {
log.Fatalf("Slasher is not ready: %v", err)
}
}
if err := v.WaitForChainStart(ctx); err != nil {
log.Fatalf("Could not determine if beacon chain started: %v", err)
}
if err := v.WaitForSync(ctx); err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()

var headSlot uint64
firstTime := true
accountsChangedChan := make(chan struct{}, 1)
go handleAccountsChanged(ctx, v, accountsChangedChan)
if err := v.WaitForActivation(ctx, accountsChangedChan); err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
for {
if !firstTime {
if ctx.Err() != nil {
log.Info("Context canceled, stopping validator")
return // Exit if context is canceled.
}
<-ticker.C
} else {
firstTime = false
}
err := v.WaitForChainStart(ctx)
if isConnectionError(err) {
log.Warnf("Could not determine if beacon chain started: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not determine if beacon chain started: %v", err)
}
err = v.WaitForSync(ctx)
if isConnectionError(err) {
log.Warnf("Could not determine if beacon chain started: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
err = v.WaitForActivation(ctx, accountsChangedChan)
if isConnectionError(err) {
log.Warnf("Could not wait for validator activation: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) {
log.Warnf("Could not get current canonical head slot: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not get current canonical head slot: %v", err)
}
break
}

go v.ReceiveBlocks(ctx)

headSlot, err := v.CanonicalHeadSlot(ctx)
if err != nil {
log.Fatalf("Could not get current canonical head slot: %v", err)
}
go handleAccountsChanged(ctx, v, accountsChangedChan)
connectionErrorChannel := make(chan error, 1)
go v.ReceiveBlocks(ctx, connectionErrorChannel)
if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
}

for {
slotCtx, cancel := context.WithCancel(ctx)
ctx, span := trace.StartSpan(ctx, "validator.processSlot")

select {
case <-ctx.Done():
log.Info("Context canceled, stopping validator")
span.End()
cancel()
return // Exit if context is canceled.
case blocksError := <-connectionErrorChannel:
if blocksError != nil {
log.WithError(blocksError).Warn("block stream interrupted")
go v.ReceiveBlocks(ctx, connectionErrorChannel)
continue
}
case slot := <-v.NextSlot():
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))

Expand All @@ -108,7 +155,7 @@ func run(ctx context.Context, v Validator) {
}

deadline := v.SlotDeadline(slot)
slotCtx, cancel := context.WithDeadline(ctx, deadline)
slotCtx, cancel = context.WithDeadline(ctx, deadline)
log := log.WithField("slot", slot)
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")

Expand Down Expand Up @@ -172,6 +219,10 @@ func run(ctx context.Context, v Validator) {
}
}

func isConnectionError(err error) bool {
return err != nil && errors.Is(err, errConnectionIssue)
}

func handleAssignmentError(err error, slot uint64) {
if errCode, ok := status.FromError(err); ok && errCode.Code() == codes.NotFound {
log.WithField(
Expand Down
25 changes: 23 additions & 2 deletions validator/client/runner_test.go
Expand Up @@ -30,13 +30,34 @@ func TestCancelledContext_CleansUpValidator(t *testing.T) {
func TestCancelledContext_WaitsForChainStart(t *testing.T) {
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
assert.Equal(t, 1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
}

func TestRetry_On_ConnectionError(t *testing.T) {
retry := 10
v := &FakeValidator{
Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
RetryTillSuccess: retry,
}
backOffPeriod = 10 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
go run(ctx, v)
// each step will fail (retry times)=10 this sleep times will wait more then
// the time it takes for all steps to succeed before main loop.
time.Sleep(time.Duration(retry*6) * backOffPeriod)
cancel()
// every call will fail retry=10 times so first one will be called 4 * retry=10.
assert.Equal(t, retry*4, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
assert.Equal(t, retry*3, v.WaitForSyncCalled, "Expected WaitForSync() to be called")
assert.Equal(t, retry*2, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, retry, v.CanonicalHeadSlotCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, retry, v.ReceiveBlocksCalled, "Expected WaitForActivation() to be called")
}

func TestCancelledContext_WaitsForActivation(t *testing.T) {
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
}

func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
Expand Down
20 changes: 12 additions & 8 deletions validator/client/validator.go
Expand Up @@ -138,7 +138,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
// First, check if the beacon chain has started.
stream, err := v.validatorClient.WaitForChainStart(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "could not setup beacon chain ChainStart streaming client")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not setup beacon chain ChainStart streaming client").Error())
}

log.Info("Waiting for beacon chain start log from the ETH 1.0 deposit contract")
Expand All @@ -148,7 +148,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
}
if err != nil {
return errors.Wrap(err, "could not receive ChainStart from stream")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not receive ChainStart from stream").Error())
}
v.genesisTime = chainStartRes.GenesisTime
curGenValRoot, err := v.db.GenesisValidatorsRoot(ctx)
Expand All @@ -172,6 +172,8 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
)
}
}
} else {
return errConnectionIssue
}

// Once the ChainStart log is received, we update the genesis time of the validator client
Expand All @@ -188,7 +190,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {

s, err := v.node.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "could not get sync status")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
}
if !s.Syncing {
return nil
Expand All @@ -200,7 +202,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
case <-time.After(slotutil.DivideSlotBy(2 /* twice per slot */)):
s, err := v.node.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "could not get sync status")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
}
if !s.Syncing {
return nil
Expand Down Expand Up @@ -246,10 +248,11 @@ func (v *validator) SlasherReady(ctx context.Context) error {
// ReceiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other usages to subscribe to.
func (v *validator) ReceiveBlocks(ctx context.Context) {
func (v *validator) ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error) {
stream, err := v.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{VerifiedOnly: true})
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks stream")
log.WithError(err).Error("Failed to retrieve blocks stream, " + errConnectionIssue.Error())
connectionErrorChannel <- errors.Wrap(errConnectionIssue, err.Error())
return
}

Expand All @@ -260,7 +263,8 @@ func (v *validator) ReceiveBlocks(ctx context.Context) {
}
res, err := stream.Recv()
if err != nil {
log.WithError(err).Error("Could not receive blocks from beacon node")
log.WithError(err).Error("Could not receive blocks from beacon node, " + errConnectionIssue.Error())
connectionErrorChannel <- errors.Wrap(errConnectionIssue, err.Error())
return
}
if res == nil || res.Block == nil {
Expand Down Expand Up @@ -331,7 +335,7 @@ func (v *validator) CanonicalHeadSlot(ctx context.Context) (uint64, error) {
defer span.End()
head, err := v.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil {
return 0, err
return 0, errors.Wrap(errConnectionIssue, err.Error())
}
return head.HeadSlot, nil
}
Expand Down
6 changes: 4 additions & 2 deletions validator/client/validator_test.go
Expand Up @@ -912,7 +912,8 @@ func TestService_ReceiveBlocks_NilBlock(t *testing.T) {
).Do(func() {
cancel()
})
v.ReceiveBlocks(ctx)
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, uint64(0), v.highestValidSlot)
}

Expand All @@ -939,6 +940,7 @@ func TestService_ReceiveBlocks_SetHighest(t *testing.T) {
).Do(func() {
cancel()
})
v.ReceiveBlocks(ctx)
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, slot, v.highestValidSlot)
}

0 comments on commit 4595789

Please sign in to comment.