Skip to content

Commit

Permalink
Move grpc dialing out of status.go
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelhly committed May 19, 2020
1 parent a15de80 commit 5472aac
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 136 deletions.
6 changes: 6 additions & 0 deletions validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ go_library(
"//shared/params:go_default_library",
"//shared/version:go_default_library",
"//validator/accounts:go_default_library",
"//validator/client:go_default_library",
"//validator/flags:go_default_library",
"//validator/node:go_default_library",
"@com_github_joonix_log//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_x_cray_logrus_prefixed_formatter//:go_default_library",
"@in_gopkg_urfave_cli_v2//:go_default_library",
"@in_gopkg_urfave_cli_v2//altsrc:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_uber_go_automaxprocs//:go_default_library",
],
)
Expand Down Expand Up @@ -60,13 +63,16 @@ go_image(
"//shared/params:go_default_library",
"//shared/version:go_default_library",
"//validator/accounts:go_default_library",
"//validator/client:go_default_library",
"//validator/flags:go_default_library",
"//validator/node:go_default_library",
"@com_github_joonix_log//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_x_cray_logrus_prefixed_formatter//:go_default_library",
"@in_gopkg_urfave_cli_v2//:go_default_library",
"@in_gopkg_urfave_cli_v2//altsrc:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_uber_go_automaxprocs//:go_default_library",
],
)
Expand Down
5 changes: 0 additions & 5 deletions validator/accounts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ go_library(
"//shared/keystore:go_default_library",
"//shared/params:go_default_library",
"//validator/flags:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@in_gopkg_urfave_cli_v2//:go_default_library",
"@io_opencensus_go//plugin/ocgrpc:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
],
)

Expand Down
75 changes: 2 additions & 73 deletions validator/accounts/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,12 @@ import (
"encoding/hex"
"fmt"
"sort"
"strings"
"time"

grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

// ValidatorStatusMetadata holds all status information about a validator.
Expand All @@ -27,81 +21,16 @@ type ValidatorStatusMetadata struct {

// RunStatusCommand is the entry point to the `validator status` command.
func RunStatusCommand(
pubkeys [][]byte,
withCert string,
endpoint string,
maxCallRecvMsgSize int,
grpcRetries uint,
grpcHeaders []string) error {
dialOpts, err := constructDialOptions(maxCallRecvMsgSize, withCert, grpcHeaders, grpcRetries)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second /* Cancel if cannot connect to beacon node in 10 seconds. */)
defer cancel()
conn, err := grpc.DialContext(ctx, endpoint, dialOpts...)
if err != nil {
return errors.Wrapf(err, "Failed to dial beacon node endpoint at %s", endpoint)
}
pubkeys [][]byte, beaconNodeRPCProvider ethpb.BeaconNodeValidatorClient) error {
statuses, err := FetchAccountStatuses(
context.Background(), ethpb.NewBeaconNodeValidatorClient(conn), pubkeys)
if e := conn.Close(); e != nil {
log.WithError(e).Error("Could not close connection to beacon node")
}
context.Background(), beaconNodeRPCProvider, pubkeys)
if err != nil {
return errors.Wrap(err, "Could not fetch account statuses from the beacon node")
}
printStatuses(statuses)
return nil
}

func constructDialOptions(
maxCallRecvMsgSize int,
withCert string,
grpcHeaders []string,
grpcRetries uint) ([]grpc.DialOption, error) {
var transportSecurity grpc.DialOption
if withCert != "" {
creds, err := credentials.NewClientTLSFromFile(withCert, "")
if err != nil {
return nil, errors.Wrapf(err, "Could not get valid credentials: %v", err)
}
transportSecurity = grpc.WithTransportCredentials(creds)
} else {
transportSecurity = grpc.WithInsecure()
log.Warn(
"You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}

if maxCallRecvMsgSize == 0 {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}

md := make(metadata.MD)
for _, hdr := range grpcHeaders {
if hdr != "" {
ss := strings.Split(hdr, "=")
if len(ss) != 2 {
log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr)
continue
}
md.Set(ss[0], ss[1])
}
}

return []grpc.DialOption{
transportSecurity,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(grpcRetries),
grpc.Header(&md),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithBlock(),
}, nil
}

// FetchAccountStatuses fetches validator statuses from the BeaconNodeValidatorClient
// for each validator public key.
func FetchAccountStatuses(
Expand Down
123 changes: 70 additions & 53 deletions validator/client/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,60 +86,17 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
// Start the validator service. Launches the main go routine for the validator
// client.
func (v *ValidatorService) Start() {
var dialOpt grpc.DialOption
var maxCallRecvMsgSize int

if v.withCert != "" {
creds, err := credentials.NewClientTLSFromFile(v.withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return
}
dialOpt = grpc.WithTransportCredentials(creds)
} else {
dialOpt = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}

if v.maxCallRecvMsgSize != 0 {
maxCallRecvMsgSize = v.maxCallRecvMsgSize
} else {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}

md := make(metadata.MD)
for _, hdr := range v.grpcHeaders {
if hdr != "" {
ss := strings.Split(hdr, "=")
if len(ss) != 2 {
log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr)
continue
}
md.Set(ss[0], ss[1])
}
}

opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(v.grpcRetries),
grpc.Header(&md),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_retry.StreamClientInterceptor(),
)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
streamInterceptor := grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_retry.StreamClientInterceptor(),
))
dialOpts := ConstructDialOptions(
v.maxCallRecvMsgSize, v.withCert, v.grpcHeaders, v.grpcRetries, streamInterceptor)
if dialOpts == nil {
return
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
conn, err := grpc.DialContext(v.ctx, v.endpoint, dialOpts...)
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", v.endpoint, err)
return
Expand Down Expand Up @@ -226,3 +183,63 @@ func (v *validator) signObject(pubKey [48]byte, object interface{}, domain []byt
}
return v.keyManager.Sign(pubKey, root)
}

// ConstructDialOptions constructs a list of grpc dial options
func ConstructDialOptions(
maxCallRecvMsgSize int,
withCert string,
grpcHeaders []string,
grpcRetries uint,
extraOpts ...grpc.DialOption,
) []grpc.DialOption {
var transportSecurity grpc.DialOption
if withCert != "" {
creds, err := credentials.NewClientTLSFromFile(withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return nil
}
transportSecurity = grpc.WithTransportCredentials(creds)
} else {
transportSecurity = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}

if maxCallRecvMsgSize == 0 {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}

md := make(metadata.MD)
for _, hdr := range grpcHeaders {
if hdr != "" {
ss := strings.Split(hdr, "=")
if len(ss) != 2 {
log.Warnf("Incorrect gRPC header flag format. Skipping %v", hdr)
continue
}
md.Set(ss[0], ss[1])
}
}

dialOpts := []grpc.DialOption{
transportSecurity,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(grpcRetries),
grpc.Header(&md),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
}

for _, opt := range extraOpts {
dialOpts = append(dialOpts, opt)
}

return dialOpts
}
28 changes: 23 additions & 5 deletions validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
package main

import (
"context"
"fmt"
"os"
"runtime"
runtimeDebug "runtime/debug"
"strings"
"time"

joonix "github.com/joonix/log"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/debug"
Expand All @@ -19,11 +22,13 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/version"
"github.com/prysmaticlabs/prysm/validator/accounts"
"github.com/prysmaticlabs/prysm/validator/client"
"github.com/prysmaticlabs/prysm/validator/flags"
"github.com/prysmaticlabs/prysm/validator/node"
"github.com/sirupsen/logrus"
prefixed "github.com/x-cray/logrus-prefixed-formatter"
_ "go.uber.org/automaxprocs"
"google.golang.org/grpc"
"gopkg.in/urfave/cli.v2"
"gopkg.in/urfave/cli.v2/altsrc"
)
Expand Down Expand Up @@ -165,13 +170,26 @@ contract in order to activate the validator client`,
if err != nil {
return err
}
return accounts.RunStatusCommand(
pubKeys,
cliCtx.String(flags.CertFlag.Name),
cliCtx.String(flags.BeaconRPCProviderFlag.Name),
ctx, cancel := context.WithTimeout(
context.Background(), 10*time.Second /* Cancel if cannot connect to beacon node in 10 seconds. */)
defer cancel()
dialOpts := client.ConstructDialOptions(
cliCtx.Int(cmd.GrpcMaxCallRecvMsgSizeFlag.Name),
cliCtx.String(flags.CertFlag.Name),
strings.Split(cliCtx.String(flags.GrpcHeadersFlag.Name), ","),
cliCtx.Uint(flags.GrpcRetriesFlag.Name),
strings.Split(cliCtx.String(flags.GrpcHeadersFlag.Name), ","))
grpc.WithBlock())
endpoint := cliCtx.String(flags.BeaconRPCProviderFlag.Name)
conn, err := grpc.DialContext(ctx, endpoint, dialOpts...)
if err != nil {
log.WithError(err).Fatalf("Failed to dial beacon node endpoint at %s", endpoint)
return err
}
err = accounts.RunStatusCommand(pubKeys, ethpb.NewBeaconNodeValidatorClient(conn))
if closed := conn.Close(); closed != nil {
log.WithError(closed).Error("Could not close connection to beacon node")
}
return err
},
},
{
Expand Down

0 comments on commit 5472aac

Please sign in to comment.