From 2ab3a6b967d22f1239294124c6105accece9f346 Mon Sep 17 00:00:00 2001 From: michaelhly Date: Sun, 10 May 2020 18:17:18 -0500 Subject: [PATCH] Add all necessary dial options --- validator/BUILD.bazel | 1 + validator/client/service.go | 112 +++++++++++++++++++----------------- validator/main.go | 18 ++++-- 3 files changed, 74 insertions(+), 57 deletions(-) diff --git a/validator/BUILD.bazel b/validator/BUILD.bazel index 1e297c933520..bd479d06bc03 100644 --- a/validator/BUILD.bazel +++ b/validator/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//shared/params:go_default_library", "//shared/version:go_default_library", "//validator/accounts:go_default_library", + "//validator/client:go_default_library", "//validator/flags:go_default_library", "//validator/node:go_default_library", "@com_github_joonix_log//:go_default_library", diff --git a/validator/client/service.go b/validator/client/service.go index 4a3f437cd54a..1f4eafae5696 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -86,60 +86,17 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e // Start the validator service. Launches the main go routine for the validator // client. func (v *ValidatorService) Start() { - var dialOpt grpc.DialOption - var maxCallRecvMsgSize int - - if v.withCert != "" { - creds, err := credentials.NewClientTLSFromFile(v.withCert, "") - if err != nil { - log.Errorf("Could not get valid credentials: %v", err) - return - } - dialOpt = grpc.WithTransportCredentials(creds) - } else { - dialOpt = grpc.WithInsecure() - log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.") - } - - if v.maxCallRecvMsgSize != 0 { - maxCallRecvMsgSize = v.maxCallRecvMsgSize - } else { - maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb - } - - md := make(metadata.MD) - for _, hdr := range v.grpcHeaders { - if hdr != "" { - ss := strings.Split(hdr, "=") - if len(ss) != 2 { - log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr) - continue - } - md.Set(ss[0], ss[1]) - } - } - - opts := []grpc.DialOption{ - dialOpt, - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize), - grpc_retry.WithMax(v.grpcRetries), - grpc.Header(&md), - ), - grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), - grpc.WithStreamInterceptor(middleware.ChainStreamClient( - grpc_opentracing.StreamClientInterceptor(), - grpc_prometheus.StreamClientInterceptor, - grpc_retry.StreamClientInterceptor(), - )), - grpc.WithUnaryInterceptor(middleware.ChainUnaryClient( - grpc_opentracing.UnaryClientInterceptor(), - grpc_prometheus.UnaryClientInterceptor, - grpc_retry.UnaryClientInterceptor(), - logDebugRequestInfoUnaryInterceptor, - )), + dialOpts := ConstructDialOptions(v.maxCallRecvMsgSize, v.withCert, v.grpcHeaders, v.grpcRetries) + if dialOpts == nil { + return } - conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...) + streamInterceptor := grpc.WithStreamInterceptor(middleware.ChainStreamClient( + grpc_opentracing.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpc_retry.StreamClientInterceptor(), + )) + dialOpts = append(dialOpts, streamInterceptor) + conn, err := grpc.DialContext(v.ctx, v.endpoint, dialOpts...) if err != nil { log.Errorf("Could not dial endpoint: %s, %v", v.endpoint, err) return @@ -226,3 +183,52 @@ func (v *validator) signObject(pubKey [48]byte, object interface{}, domain []byt } return v.keyManager.Sign(pubKey, root) } + +// Construct standard set of grpc dial options +func ConstructDialOptions( + maxCallRecvMsgSize int, withCert string, grpcHeaders []string, grpcRetries uint) []grpc.DialOption { + var dialOpt grpc.DialOption + if withCert != "" { + creds, err := credentials.NewClientTLSFromFile(withCert, "") + if err != nil { + log.Errorf("Could not get valid credentials: %v", err) + return nil + } + dialOpt = grpc.WithTransportCredentials(creds) + } else { + dialOpt = grpc.WithInsecure() + log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.") + } + + if maxCallRecvMsgSize == 0 { + maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb + } + + md := make(metadata.MD) + for _, hdr := range grpcHeaders { + if hdr != "" { + ss := strings.Split(hdr, "=") + if len(ss) != 2 { + log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr) + continue + } + md.Set(ss[0], ss[1]) + } + } + + return []grpc.DialOption{ + dialOpt, + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize), + grpc_retry.WithMax(grpcRetries), + grpc.Header(&md), + ), + grpc.WithStatsHandler(&ocgrpc.ClientHandler{}), + grpc.WithUnaryInterceptor(middleware.ChainUnaryClient( + grpc_opentracing.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpc_retry.UnaryClientInterceptor(), + logDebugRequestInfoUnaryInterceptor, + )), + } +} diff --git a/validator/main.go b/validator/main.go index c9ccf0216d9f..927a22535d9f 100644 --- a/validator/main.go +++ b/validator/main.go @@ -8,6 +8,8 @@ import ( "os" "runtime" runtimeDebug "runtime/debug" + "strings" + "time" joonix "github.com/joonix/log" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -18,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/version" "github.com/prysmaticlabs/prysm/validator/accounts" + "github.com/prysmaticlabs/prysm/validator/client" "github.com/prysmaticlabs/prysm/validator/flags" "github.com/prysmaticlabs/prysm/validator/node" "github.com/sirupsen/logrus" @@ -153,11 +156,18 @@ contract in order to activate the validator client`, if err != nil { log.WithError(err).Errorf("Could not list private and public keys in path %s", keystorePath) } - - conn, err := grpc.Dial( - flags.BeaconRPCProviderFlag.Value, grpc.WithInsecure(), grpc.WithBlock()) + endpoint := cliCtx.String(flags.BeaconRPCProviderFlag.Name) + cert := cliCtx.String(flags.CertFlag.Name) + maxCallRecvMsgSize := cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name) + grpcHeaders := strings.Split(cliCtx.String(flags.GrpcHeadersFlag.Name), ",") + grpcRetries := cliCtx.Uint(flags.GrpcRetriesFlag.Name) + dialOpts := client.ConstructDialOptions(maxCallRecvMsgSize, cert, grpcHeaders, grpcRetries) + dialOpts = append(dialOpts, grpc.WithBlock()) + dialOpts = append(dialOpts, grpc.WithTimeout( + 10*time.Second /* Block for 10 seconds to see if we can connect to beacon node */)) + conn, err := grpc.DialContext(cliCtx, endpoint, dialOpts...) if err != nil { - log.WithError(err).Fatal("Could not connect to beacon node.") + log.WithError(err).Fatalf("Could not dial beacon node endpoint %s", endpoint) } beaconNodeRPC := ethpb.NewBeaconNodeValidatorClient(conn) statuses, err := accounts.FetchAccountStatuses(cliCtx, beaconNodeRPC, keyPairs)