/
service.go
179 lines (163 loc) · 5.25 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package client
import (
"context"
"github.com/dgraph-io/ristretto"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/validator/db"
"github.com/prysmaticlabs/prysm/validator/keymanager"
"github.com/sirupsen/logrus"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var log = logrus.WithField("prefix", "validator")
// ValidatorService represents a service to manage the validator client
// routine.
type ValidatorService struct {
ctx context.Context
cancel context.CancelFunc
validator Validator
graffiti []byte
conn *grpc.ClientConn
endpoint string
withCert string
dataDir string
keyManager keymanager.KeyManager
logValidatorBalances bool
emitAccountMetrics bool
maxCallRecvMsgSize int
grpcRetries uint
}
// Config for the validator service.
type Config struct {
Endpoint string
DataDir string
CertFlag string
GraffitiFlag string
KeyManager keymanager.KeyManager
LogValidatorBalances bool
EmitAccountMetrics bool
GrpcMaxCallRecvMsgSizeFlag int
GrpcRetriesFlag uint
}
// NewValidatorService creates a new validator service for the service
// registry.
func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, error) {
ctx, cancel := context.WithCancel(ctx)
return &ValidatorService{
ctx: ctx,
cancel: cancel,
endpoint: cfg.Endpoint,
withCert: cfg.CertFlag,
dataDir: cfg.DataDir,
graffiti: []byte(cfg.GraffitiFlag),
keyManager: cfg.KeyManager,
logValidatorBalances: cfg.LogValidatorBalances,
emitAccountMetrics: cfg.EmitAccountMetrics,
maxCallRecvMsgSize: cfg.GrpcMaxCallRecvMsgSizeFlag,
grpcRetries: cfg.GrpcRetriesFlag,
}, nil
}
// Start the validator service. Launches the main go routine for the validator
// client.
func (v *ValidatorService) Start() {
var dialOpt grpc.DialOption
var maxCallRecvMsgSize int
if v.withCert != "" {
creds, err := credentials.NewClientTLSFromFile(v.withCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
return
}
dialOpt = grpc.WithTransportCredentials(creds)
} else {
dialOpt = grpc.WithInsecure()
log.Warn("You are using an insecure gRPC connection! Please provide a certificate and key to use a secure connection.")
}
if v.maxCallRecvMsgSize != 0 {
maxCallRecvMsgSize = v.maxCallRecvMsgSize
} else {
maxCallRecvMsgSize = 10 * 5 << 20 // Default 50Mb
}
opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxCallRecvMsgSize),
grpc_retry.WithMax(v.grpcRetries),
),
grpc.WithStatsHandler(&ocgrpc.ClientHandler{}),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(
grpc_opentracing.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_retry.StreamClientInterceptor(),
)),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(
grpc_opentracing.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_retry.UnaryClientInterceptor(),
logDebugRequestInfoUnaryInterceptor,
)),
}
conn, err := grpc.DialContext(v.ctx, v.endpoint, opts...)
if err != nil {
log.Errorf("Could not dial endpoint: %s, %v", v.endpoint, err)
return
}
log.Info("Successfully started gRPC connection")
pubkeys, err := v.keyManager.FetchValidatingKeys()
if err != nil {
log.Errorf("Could not get validating keys: %v", err)
return
}
valDB, err := db.NewKVStore(v.dataDir, pubkeys)
if err != nil {
log.Errorf("Could not initialize db: %v", err)
return
}
v.conn = conn
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1280, // number of keys to track.
MaxCost: 128, // maximum cost of cache, 1 item = 1 cost.
BufferItems: 64, // number of keys per Get buffer.
})
if err != nil {
panic(err)
}
v.validator = &validator{
db: valDB,
validatorClient: ethpb.NewBeaconNodeValidatorClient(v.conn),
beaconClient: ethpb.NewBeaconChainClient(v.conn),
node: ethpb.NewNodeClient(v.conn),
keyManager: v.keyManager,
graffiti: v.graffiti,
logValidatorBalances: v.logValidatorBalances,
emitAccountMetrics: v.emitAccountMetrics,
prevBalance: make(map[[48]byte]uint64),
attLogs: make(map[[32]byte]*attSubmitted),
domainDataCache: cache,
}
go run(v.ctx, v.validator)
}
// Stop the validator service.
func (v *ValidatorService) Stop() error {
v.cancel()
log.Info("Stopping service")
if v.conn != nil {
return v.conn.Close()
}
return nil
}
// Status ...
//
// WIP - not done.
func (v *ValidatorService) Status() error {
if v.conn == nil {
return errors.New("no connection to beacon RPC")
}
return nil
}