Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make validator stable when beacon node goes offline #8278

Merged
merged 21 commits into from Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 28 additions & 9 deletions validator/client/mock_validator.go
Expand Up @@ -15,12 +15,8 @@ var _ Validator = (*FakeValidator)(nil)
type FakeValidator struct {
DoneCalled bool
WaitForWalletInitializationCalled bool
WaitForActivationCalled bool
WaitForChainStartCalled bool
WaitForSyncCalled bool
SlasherReadyCalled bool
NextSlotCalled bool
CanonicalHeadSlotCalled bool
UpdateDutiesCalled bool
UpdateProtectionsCalled bool
RoleAtCalled bool
Expand All @@ -30,6 +26,12 @@ type FakeValidator struct {
SaveProtectionsCalled bool
DeleteProtectionCalled bool
SlotDeadlineCalled bool
WaitForChainStartCalled int
WaitForSyncCalled int
WaitForActivationCalled int
CanonicalHeadSlotCalled int
ReceiveBlocksCalled int
RetryTillSuccess int
ProposeBlockArg1 uint64
AttestToBlockHeadArg1 uint64
RoleAtArg1 uint64
Expand Down Expand Up @@ -62,19 +64,28 @@ func (fv *FakeValidator) WaitForWalletInitialization(_ context.Context) error {

// WaitForChainStart for mocking.
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
fv.WaitForChainStartCalled = true
fv.WaitForChainStartCalled++
if fv.RetryTillSuccess >= fv.WaitForChainStartCalled {
return errConnectionIssue
}
return nil
}

// WaitForActivation for mocking.
func (fv *FakeValidator) WaitForActivation(_ context.Context, _ chan struct{}) error {
fv.WaitForActivationCalled = true
fv.WaitForActivationCalled++
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
return errConnectionIssue
}
return nil
}

// WaitForSync for mocking.
func (fv *FakeValidator) WaitForSync(_ context.Context) error {
fv.WaitForSyncCalled = true
fv.WaitForSyncCalled++
if fv.RetryTillSuccess >= fv.WaitForSyncCalled {
return errConnectionIssue
}
return nil
}

Expand All @@ -86,7 +97,10 @@ func (fv *FakeValidator) SlasherReady(_ context.Context) error {

// CanonicalHeadSlot for mocking.
func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (uint64, error) {
fv.CanonicalHeadSlotCalled = true
fv.CanonicalHeadSlotCalled++
if fv.RetryTillSuccess > fv.CanonicalHeadSlotCalled {
return 0, errConnectionIssue
}
return 0, nil
}

Expand Down Expand Up @@ -195,4 +209,9 @@ func (fv *FakeValidator) GetKeymanager() keymanager.IKeymanager {
}

// ReceiveBlocks for mocking
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context) {}
func (fv *FakeValidator) ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error) {
fv.ReceiveBlocksCalled++
if fv.RetryTillSuccess > fv.ReceiveBlocksCalled {
connectionErrorChannel <- errConnectionIssue
}
}
86 changes: 67 additions & 19 deletions validator/client/runner.go
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
Expand All @@ -16,6 +17,10 @@ import (
"google.golang.org/grpc/status"
)

// time to wait before trying to reconnect with the eth1 node.
var backOffPeriod = 10 * time.Second
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
var errConnectionIssue = errors.New("connection issue, beacon node might be offline...")
shayzluf marked this conversation as resolved.
Show resolved Hide resolved

// Validator interface defines the primary methods of a validator client.
type Validator interface {
Done()
Expand All @@ -38,7 +43,7 @@ type Validator interface {
WaitForWalletInitialization(ctx context.Context) error
AllValidatorsAreExited(ctx context.Context) (bool, error)
GetKeymanager() keymanager.IKeymanager
ReceiveBlocks(ctx context.Context)
ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error)
}

// Run the main validator routine. This routine exits if the context is
Expand All @@ -64,37 +69,80 @@ func run(ctx context.Context, v Validator) {
log.Fatalf("Slasher is not ready: %v", err)
}
}
if err := v.WaitForChainStart(ctx); err != nil {
log.Fatalf("Could not determine if beacon chain started: %v", err)
}
if err := v.WaitForSync(ctx); err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}

ticker := time.NewTicker(backOffPeriod)
var headSlot uint64
defer ticker.Stop()
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
accountsChangedChan := make(chan struct{}, 1)
go handleAccountsChanged(ctx, v, accountsChangedChan)
if err := v.WaitForActivation(ctx, accountsChangedChan); err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
for {
var done bool
err := v.WaitForChainStart(ctx)
if err != nil && errors.Is(err, errConnectionIssue) {
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
log.Warnf("Could not determine if beacon chain started: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not determine if beacon chain started: %v", err)
}
err = v.WaitForSync(ctx)
if err != nil && errors.Is(err, errConnectionIssue) {
log.Warnf("Could not determine if beacon chain started: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not determine if beacon node synced: %v", err)
}
err = v.WaitForActivation(ctx, accountsChangedChan)
if err != nil && errors.Is(err, errConnectionIssue) {
log.Warnf("Could not wait for validator activation: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not wait for validator activation: %v", err)
}
headSlot, err = v.CanonicalHeadSlot(ctx)
if err != nil && errors.Is(err, errConnectionIssue) {
log.Warnf("Could not get current canonical head slot: %v", err)
continue
}
if err != nil {
log.Fatalf("Could not get current canonical head slot: %v", err)
}
done = true
select {
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
log.Info("Context canceled, stopping validator")
return // Exit if context is canceled.
case <-ticker.C:
if done {
break
}
continue
}
break
}

go v.ReceiveBlocks(ctx)

headSlot, err := v.CanonicalHeadSlot(ctx)
if err != nil {
log.Fatalf("Could not get current canonical head slot: %v", err)
}
go handleAccountsChanged(ctx, v, accountsChangedChan)
connectionErrorChannel := make(chan error, 1)
go v.ReceiveBlocks(ctx, connectionErrorChannel)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was no error handling for ReceiveBlocks while it was running in a separate goroutine

if err := v.UpdateDuties(ctx, headSlot); err != nil {
handleAssignmentError(err, headSlot)
}

for {
slotCtx, cancel := context.WithCancel(ctx)
ctx, span := trace.StartSpan(ctx, "validator.processSlot")

select {
case <-ctx.Done():
log.Info("Context canceled, stopping validator")
span.End()
cancel()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing call to cancel? @rauljordan goland detected a possible context leak

return // Exit if context is canceled.
case blocksError := <-connectionErrorChannel:
if blocksError != nil && errors.Is(blocksError, errConnectionIssue) {
go v.ReceiveBlocks(ctx, connectionErrorChannel)
continue
}
log.WithError(blocksError).Fatal("block stream interrupted")
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
case slot := <-v.NextSlot():
span.AddAttributes(trace.Int64Attribute("slot", int64(slot)))

Expand All @@ -108,7 +156,7 @@ func run(ctx context.Context, v Validator) {
}

deadline := v.SlotDeadline(slot)
slotCtx, cancel := context.WithDeadline(ctx, deadline)
slotCtx, cancel = context.WithDeadline(ctx, deadline)
log := log.WithField("slot", slot)
log.WithField("deadline", deadline).Debug("Set deadline for proposals and attestations")

Expand Down
25 changes: 23 additions & 2 deletions validator/client/runner_test.go
Expand Up @@ -30,13 +30,34 @@ func TestCancelledContext_CleansUpValidator(t *testing.T) {
func TestCancelledContext_WaitsForChainStart(t *testing.T) {
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
assert.Equal(t, 1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
}

func TestRetry_On_ConnectionError(t *testing.T) {
retry := 10
v := &FakeValidator{
Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
RetryTillSuccess: retry,
}
backOffPeriod = 10 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
go run(ctx, v)
// each step will fail (retry times)=10 this sleep times will wait more then
// the time it takes for all steps to succeed before main loop.
time.Sleep(time.Duration(retry*6) * backOffPeriod)
cancel()
// every call will fail retry=10 times so first one will be called 4 * retry=10.
assert.Equal(t, retry*4, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
assert.Equal(t, retry*3, v.WaitForSyncCalled, "Expected WaitForSync() to be called")
assert.Equal(t, retry*2, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, retry, v.CanonicalHeadSlotCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, retry, v.ReceiveBlocksCalled, "Expected WaitForActivation() to be called")
}

func TestCancelledContext_WaitsForActivation(t *testing.T) {
v := &FakeValidator{Keymanager: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
run(cancelledContext(), v)
assert.Equal(t, true, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
}

func TestCancelledContext_ChecksSlasherReady(t *testing.T) {
Expand Down
20 changes: 12 additions & 8 deletions validator/client/validator.go
Expand Up @@ -135,7 +135,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
// First, check if the beacon chain has started.
stream, err := v.validatorClient.WaitForChainStart(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "could not setup beacon chain ChainStart streaming client")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not setup beacon chain ChainStart streaming client").Error())
}

log.Info("Waiting for beacon chain start log from the ETH 1.0 deposit contract")
Expand All @@ -145,7 +145,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
return errors.Wrap(ctx.Err(), "context has been canceled so shutting down the loop")
}
if err != nil {
return errors.Wrap(err, "could not receive ChainStart from stream")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not receive ChainStart from stream").Error())
}
v.genesisTime = chainStartRes.GenesisTime
curGenValRoot, err := v.db.GenesisValidatorsRoot(ctx)
Expand All @@ -169,6 +169,8 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
)
}
}
} else {
return errConnectionIssue
}

// Once the ChainStart log is received, we update the genesis time of the validator client
Expand All @@ -185,7 +187,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {

s, err := v.node.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "could not get sync status")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
}
if !s.Syncing {
return nil
Expand All @@ -197,7 +199,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
case <-time.After(slotutil.DivideSlotBy(2 /* twice per slot */)):
s, err := v.node.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
return errors.Wrap(err, "could not get sync status")
return errors.Wrap(errConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
}
if !s.Syncing {
return nil
Expand Down Expand Up @@ -243,10 +245,11 @@ func (v *validator) SlasherReady(ctx context.Context) error {
// ReceiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other usages to subscribe to.
func (v *validator) ReceiveBlocks(ctx context.Context) {
func (v *validator) ReceiveBlocks(ctx context.Context, connectionErrorChannel chan error) {
stream, err := v.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{VerifiedOnly: true})
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks stream")
log.WithError(err).Error("Failed to retrieve blocks stream, " + errConnectionIssue.Error())
connectionErrorChannel <- errors.Wrap(errConnectionIssue, err.Error())
return
}

Expand All @@ -257,7 +260,8 @@ func (v *validator) ReceiveBlocks(ctx context.Context) {
}
res, err := stream.Recv()
if err != nil {
log.WithError(err).Error("Could not receive blocks from beacon node")
log.WithError(err).Error("Could not receive blocks from beacon node, " + errConnectionIssue.Error())
connectionErrorChannel <- errors.Wrap(errConnectionIssue, err.Error())
return
}
if res == nil || res.Block == nil {
Expand Down Expand Up @@ -328,7 +332,7 @@ func (v *validator) CanonicalHeadSlot(ctx context.Context) (uint64, error) {
defer span.End()
head, err := v.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil {
return 0, err
return 0, errors.Wrap(errConnectionIssue, err.Error())
}
return head.HeadSlot, nil
}
Expand Down
6 changes: 4 additions & 2 deletions validator/client/validator_test.go
Expand Up @@ -912,7 +912,8 @@ func TestService_ReceiveBlocks_NilBlock(t *testing.T) {
).Do(func() {
cancel()
})
v.ReceiveBlocks(ctx)
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, uint64(0), v.highestValidSlot)
}

Expand All @@ -939,6 +940,7 @@ func TestService_ReceiveBlocks_SetHighest(t *testing.T) {
).Do(func() {
cancel()
})
v.ReceiveBlocks(ctx)
connectionErrorChannel := make(chan error)
v.ReceiveBlocks(ctx, connectionErrorChannel)
require.Equal(t, slot, v.highestValidSlot)
}