Skip to content

Commit 446bfca

Browse files
authored
Slasher redial (#6889)
* Restart streams on internal error * debug instead of fatal on retry * Merge branch 'master' of github.com:prysmaticlabs/prysm into slasher_redial * goimports * conn status fix * Merge branch 'master' into slasher_redial
1 parent 492944d commit 446bfca

File tree

3 files changed

+27
-2
lines changed

3 files changed

+27
-2
lines changed

slasher/beaconclient/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ go_library(
3737
"@io_opencensus_go//trace:go_default_library",
3838
"@org_golang_google_grpc//:go_default_library",
3939
"@org_golang_google_grpc//codes:go_default_library",
40+
"@org_golang_google_grpc//connectivity:go_default_library",
4041
"@org_golang_google_grpc//credentials:go_default_library",
4142
"@org_golang_google_grpc//status:go_default_library",
4243
],

slasher/beaconclient/receivers.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"github.com/prysmaticlabs/prysm/shared/slotutil"
1414
"github.com/sirupsen/logrus"
1515
"go.opencensus.io/trace"
16+
"google.golang.org/grpc"
1617
"google.golang.org/grpc/codes"
18+
"google.golang.org/grpc/connectivity"
1719
"google.golang.org/grpc/status"
1820
)
1921

@@ -46,7 +48,7 @@ func (bs *Service) ReceiveBlocks(ctx context.Context) {
4648
if err != nil {
4749
if e, ok := status.FromError(err); ok {
4850
switch e.Code() {
49-
case codes.Canceled:
51+
case codes.Canceled, codes.Internal:
5052
stream, err = bs.restartBlockStream(ctx)
5153
if err != nil {
5254
log.WithError(err).Error("Could not restart stream")
@@ -109,7 +111,7 @@ func (bs *Service) ReceiveAttestations(ctx context.Context) {
109111
if err != nil {
110112
if e, ok := status.FromError(err); ok {
111113
switch e.Code() {
112-
case codes.Canceled:
114+
case codes.Canceled, codes.Internal:
113115
stream, err = bs.restartIndexedAttestationStream(ctx)
114116
if err != nil {
115117
log.WithError(err).Error("Could not restart stream")
@@ -179,6 +181,16 @@ func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.B
179181
select {
180182
case <-ticker.C:
181183
log.Info("Context closed, attempting to restart attestation stream")
184+
conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...)
185+
if err != nil {
186+
log.Debug("Failed to dial beacon node")
187+
continue
188+
}
189+
log.Debugf("connection status %v", conn.GetState())
190+
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle {
191+
log.Debug("Beacon node is still down")
192+
continue
193+
}
182194
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
183195
if err != nil {
184196
continue
@@ -199,6 +211,16 @@ func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_St
199211
select {
200212
case <-ticker.C:
201213
log.Info("Context closed, attempting to restart block stream")
214+
conn, err := grpc.DialContext(bs.ctx, bs.provider, bs.beaconDialOptions...)
215+
if err != nil {
216+
log.Debug("Failed to dial beacon node")
217+
continue
218+
}
219+
log.Debugf("connection status %v", conn.GetState())
220+
if conn.GetState() == connectivity.TransientFailure || conn.GetState() == connectivity.Idle {
221+
log.Debug("Beacon node is still down")
222+
continue
223+
}
202224
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
203225
if err != nil {
204226
continue

slasher/beaconclient/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Service struct {
5959
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
6060
publicKeyCache *cache.PublicKeyCache
6161
genesisValidatorRoot []byte
62+
beaconDialOptions []grpc.DialOption
6263
}
6364

6465
// Config options for the beaconclient service.
@@ -173,6 +174,7 @@ func (bs *Service) Start() {
173174
if err != nil {
174175
log.Fatalf("Could not dial endpoint: %s, %v", bs.provider, err)
175176
}
177+
bs.beaconDialOptions = beaconOpts
176178
log.Info("Successfully started gRPC connection")
177179
bs.conn = conn
178180
bs.beaconClient = ethpb.NewBeaconChainClient(bs.conn)

0 commit comments

Comments
 (0)