Skip to content

Commit

Permalink
Add all necessary dial options
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelhly committed May 10, 2020
1 parent 7b7efef commit 3bc7daf
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 57 deletions.
1 change: 1 addition & 0 deletions validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//shared/params:go_default_library",
"//shared/version:go_default_library",
"//validator/accounts:go_default_library",
"//validator/client:go_default_library",
"//validator/flags:go_default_library",
"//validator/node:go_default_library",
"@com_github_joonix_log//:go_default_library",
Expand Down
111 changes: 58 additions & 53 deletions validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,60 +86,17 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
// Start the validator service. Launches the main go routine for the validator
// client.
func (v *ValidatorService) Start() {
var dialOpt grpc.DialOption
var maxCallRecvMsgSize int

if v.withCert != "" {
creds, err := credentials.NewClientTLSFromFile(v.withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return
}
dialOpt = grpc.WithTransportCredentials(creds)
} else {
dialOpt = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}

if v.maxCallRecvMsgSize != 0 {
maxCallRecvMsgSize = v.maxCallRecvMsgSize
} else {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}

md := make(metadata.MD)
for _, hdr := range v.grpcHeaders {
if hdr != "" {
ss := strings.Split(hdr, "=")
if len(ss) != 2 {
log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr)
continue
}
md.Set(ss[0], ss[1])
}
}

opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(v.grpcRetries),
grpc.Header(&md),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_retry.StreamClientInterceptor(),
)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
dialOpts := ConstructDialOptions(v.maxCallRecvMsgSize, v.withCert, v.grpcHeaders, v.grpcRetries)
if dialOpts == nil {
return
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
streamInterceptor := grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_retry.StreamClientInterceptor(),
))
dialOpts = append(dialOpts, streamInterceptor)
conn, err := grpc.DialContext(v.ctx, v.endpoint, dialOpts...)
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", v.endpoint, err)
return
Expand Down Expand Up @@ -226,3 +183,51 @@ func (v *validator) signObject(pubKey [48]byte, object interface{}, domain []byt
}
return v.keyManager.Sign(pubKey, root)
}

func ConstructDialOptions(
maxCallRecvMsgSize int, withCert string, grpcHeaders []string, grpcRetries uint) []grpc.DialOption {
var dialOpt grpc.DialOption
if withCert != "" {
creds, err := credentials.NewClientTLSFromFile(withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return nil
}
dialOpt = grpc.WithTransportCredentials(creds)
} else {
dialOpt = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}

if maxCallRecvMsgSize == 0 {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}

md := make(metadata.MD)
for _, hdr := range grpcHeaders {
if hdr != "" {
ss := strings.Split(hdr, "=")
if len(ss) != 2 {
log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr)
continue
}
md.Set(ss[0], ss[1])
}
}

return []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(grpcRetries),
grpc.Header(&md),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
}
}
19 changes: 15 additions & 4 deletions validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"os"
"runtime"
runtimeDebug "runtime/debug"
"strings"
"time"

joonix "github.com/joonix/log"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/version"
"github.com/prysmaticlabs/prysm/validator/accounts"
"github.com/prysmaticlabs/prysm/validator/client"
"github.com/prysmaticlabs/prysm/validator/flags"
"github.com/prysmaticlabs/prysm/validator/node"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -153,11 +156,19 @@ contract in order to activate the validator client`,
if err != nil {
log.WithError(err).Errorf("Could not list private and public keys in path %s", keystorePath)
}

conn, err := grpc.Dial(
flags.BeaconRPCProviderFlag.Value, grpc.WithInsecure(), grpc.WithBlock())
endpoint := cliCtx.String(flags.BeaconRPCProviderFlag.Name)
fmt.Println(endpoint)
cert := cliCtx.String(flags.CertFlag.Name)
maxCallRecvMsgSize := cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name)
grpcHeaders := strings.Split(cliCtx.String(flags.GrpcHeadersFlag.Name), ",")
grpcRetries := cliCtx.Uint(flags.GrpcRetriesFlag.Name)
dialOpts := client.ConstructDialOptions(maxCallRecvMsgSize, cert, grpcHeaders, grpcRetries)
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, grpc.WithTimeout(
10*time.Second /* Block for 10 seconds to see if we can connect to beacon node */))
conn, err := grpc.DialContext(cliCtx, endpoint, dialOpts...)
if err != nil {
log.WithError(err).Fatal("Could not connect to beacon node.")
log.WithError(err).Fatalf("Could not dial beacon node endpoint %s", endpoint)
}
beaconNodeRPC := ethpb.NewBeaconNodeValidatorClient(conn)
statuses, err := accounts.FetchAccountStatuses(cliCtx, beaconNodeRPC, keyPairs)
Expand Down

0 comments on commit 3bc7daf

Please sign in to comment.