/
service.go
147 lines (135 loc) · 4.32 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package client
import (
"context"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var log = logrus.WithField("prefix", "validator")
// ValidatorService represents a service to manage the validator client
// routine.
type ValidatorService struct {
ctx context.Context
cancel context.CancelFunc
validator Validator
graffiti []byte
conn *grpc.ClientConn
endpoint string
withCert string
dataDir string
keyManager keymanager.KeyManager
logValidatorBalances bool
}
// Config for the validator service.
type Config struct {
Endpoint string
DataDir string
CertFlag string
GraffitiFlag string
KeyManager keymanager.KeyManager
LogValidatorBalances bool
}
// NewValidatorService creates a new validator service for the service
// registry.
func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, error) {
ctx, cancel := context.WithCancel(ctx)
return &ValidatorService{
ctx: ctx,
cancel: cancel,
endpoint: cfg.Endpoint,
withCert: cfg.CertFlag,
dataDir: cfg.DataDir,
graffiti: []byte(cfg.GraffitiFlag),
keyManager: cfg.KeyManager,
logValidatorBalances: cfg.LogValidatorBalances,
}, nil
}
// Start the validator service. Launches the main go routine for the validator
// client.
func (v *ValidatorService) Start() {
var dialOpt grpc.DialOption
if v.withCert != "" {
creds, err := credentials.NewClientTLSFromFile(v.withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return
}
dialOpt = grpc.WithTransportCredentials(creds)
} else {
dialOpt = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}
opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10 * 5 << 20), // 10Mb
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
)),
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", v.endpoint, err)
return
}
log.Info("Successfully started gRPC connection")
pubkeys, err := v.keyManager.FetchValidatingKeys()
if err != nil {
log.Errorf("Could not get validating keys: %v", err)
return
}
valDB, err := db.NewKVStore(v.dataDir, pubkeys)
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
}
v.conn = conn
v.validator = &validator{
db: valDB,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
beaconClient: ethpb.NewBeaconChainClient(v.conn),
aggregatorClient: pb.NewAggregatorServiceClient(v.conn),
node: ethpb.NewNodeClient(v.conn),
keyManager: v.keyManager,
graffiti: v.graffiti,
logValidatorBalances: v.logValidatorBalances,
prevBalance: make(map[[48]byte]uint64),
attLogs: make(map[[32]byte]*attSubmitted),
pubKeyToID: make(map[[48]byte]uint64),
}
go run(v.ctx, v.validator)
}
// Stop the validator service.
func (v *ValidatorService) Stop() error {
v.cancel()
log.Info("Stopping service")
if v.conn != nil {
return v.conn.Close()
}
return nil
}
// Status ...
//
// WIP - not done.
func (v *ValidatorService) Status() error {
if v.conn == nil {
return errors.New("no connection to beacon RPC")
}
return nil
}