-
Notifications
You must be signed in to change notification settings - Fork 175
/
engine.go
355 lines (302 loc) · 11.8 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
package rpc
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"time"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
accessproto "github.com/onflow/flow/protobuf/go/flow/access"
legacyaccessproto "github.com/onflow/flow/protobuf/go/flow/legacy/access"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/onflow/flow-go/access"
legacyaccess "github.com/onflow/flow-go/access/legacy"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/grpcutils"
)
// Config defines the configurable options for the access node server
// A secure GRPC server here implies a server that presents a self-signed TLS certificate and a client that authenticates
// the server via a pre-shared public key
type Config struct {
UnsecureGRPCListenAddr string // the non-secure GRPC server address as ip:port
SecureGRPCListenAddr string // the secure GRPC server address as ip:port
TransportCredentials credentials.TransportCredentials // the secure GRPC credentials
HTTPListenAddr string // the HTTP web proxy address as ip:port
RESTListenAddr string // the REST server address as ip:port (if empty the REST server will not be started)
CollectionAddr string // the address of the upstream collection node
HistoricalAccessAddrs string // the list of all access nodes from previous spork
MaxMsgSize int // GRPC max message size
ExecutionClientTimeout time.Duration // execution API GRPC client timeout
CollectionClientTimeout time.Duration // collection API GRPC client timeout
MaxHeightRange uint // max size of height range requests
PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs
FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node node ID can be chosen from the PreferredExecutionNodeIDs
}
// Engine exposes the server with a simplified version of the Access API.
// An unsecured GRPC server (default port 9000), a secure GRPC server (default port 9001) and an HTTP Web proxy (default
// port 8000) are brought up.
type Engine struct {
unit *engine.Unit
log zerolog.Logger
backend *backend.Backend // the gRPC service implementation
unsecureGrpcServer *grpc.Server // the unsecure gRPC server
secureGrpcServer *grpc.Server // the secure gRPC server
httpServer *http.Server
restServer *http.Server
config Config
unsecureGrpcAddress net.Addr
secureGrpcAddress net.Addr
restAPIAddress net.Addr
}
// New returns a new RPC engine.
func New(log zerolog.Logger,
state protocol.State,
config Config,
collectionRPC accessproto.AccessAPIClient,
historicalAccessNodes []accessproto.AccessAPIClient,
blocks storage.Blocks,
headers storage.Headers,
collections storage.Collections,
transactions storage.Transactions,
executionReceipts storage.ExecutionReceipts,
executionResults storage.ExecutionResults,
chainID flow.ChainID,
transactionMetrics module.TransactionMetrics,
collectionGRPCPort uint,
executionGRPCPort uint,
retryEnabled bool,
rpcMetricsEnabled bool,
apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the Access API e.g. Ping->100, GetTransaction->300
apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the Access API e.g. Ping->50, GetTransaction->10
) *Engine {
log = log.With().Str("engine", "rpc").Logger()
if config.MaxMsgSize == 0 {
config.MaxMsgSize = grpcutils.DefaultMaxMsgSize
}
// create a GRPC server to serve GRPC clients
grpcOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(config.MaxMsgSize),
grpc.MaxSendMsgSize(config.MaxMsgSize),
}
var interceptors []grpc.UnaryServerInterceptor // ordered list of interceptors
// if rpc metrics is enabled, first create the grpc metrics interceptor
if rpcMetricsEnabled {
interceptors = append(interceptors, grpc_prometheus.UnaryServerInterceptor)
}
// add the logging interceptor
interceptors = append(interceptors, loggingInterceptor(log)...)
if len(apiRatelimits) > 0 {
// create a rate limit interceptor
rateLimitInterceptor := NewRateLimiterInterceptor(log, apiRatelimits, apiBurstLimits).unaryServerInterceptor
// append the rate limit interceptor to the list of interceptors
interceptors = append(interceptors, rateLimitInterceptor)
}
if len(interceptors) > 0 {
// create a chained unary interceptor
chainedInterceptors := grpc.ChainUnaryInterceptor(interceptors...)
grpcOpts = append(grpcOpts, chainedInterceptors)
}
// create an unsecured grpc server
unsecureGrpcServer := grpc.NewServer(grpcOpts...)
// create a secure server server by using the secure grpc credentials that are passed in as part of config
grpcOpts = append(grpcOpts, grpc.Creds(config.TransportCredentials))
secureGrpcServer := grpc.NewServer(grpcOpts...)
// wrap the unsecured server with an HTTP proxy server to serve HTTP clients
httpServer := NewHTTPServer(unsecureGrpcServer, config.HTTPListenAddr)
connectionFactory := &backend.ConnectionFactoryImpl{
CollectionGRPCPort: collectionGRPCPort,
ExecutionGRPCPort: executionGRPCPort,
CollectionNodeGRPCTimeout: config.CollectionClientTimeout,
ExecutionNodeGRPCTimeout: config.ExecutionClientTimeout,
}
backend := backend.New(
state,
collectionRPC,
historicalAccessNodes,
blocks,
headers,
collections,
transactions,
executionReceipts,
executionResults,
chainID,
transactionMetrics,
connectionFactory,
retryEnabled,
config.MaxHeightRange,
config.PreferredExecutionNodeIDs,
config.FixedExecutionNodeIDs,
log,
)
eng := &Engine{
log: log,
unit: engine.NewUnit(),
backend: backend,
unsecureGrpcServer: unsecureGrpcServer,
secureGrpcServer: secureGrpcServer,
httpServer: httpServer,
config: config,
}
accessproto.RegisterAccessAPIServer(
eng.unsecureGrpcServer,
access.NewHandler(backend, chainID.Chain()),
)
accessproto.RegisterAccessAPIServer(
eng.secureGrpcServer,
access.NewHandler(backend, chainID.Chain()),
)
if rpcMetricsEnabled {
// Not interested in legacy metrics, so initialize here
grpc_prometheus.EnableHandlingTimeHistogram()
grpc_prometheus.Register(unsecureGrpcServer)
grpc_prometheus.Register(secureGrpcServer)
}
// Register legacy gRPC handlers for backwards compatibility, to be removed at a later date
legacyaccessproto.RegisterAccessAPIServer(
eng.unsecureGrpcServer,
legacyaccess.NewHandler(backend, chainID.Chain()),
)
legacyaccessproto.RegisterAccessAPIServer(
eng.secureGrpcServer,
legacyaccess.NewHandler(backend, chainID.Chain()),
)
return eng
}
// Ready returns a ready channel that is closed once the engine has fully
// started. The RPC engine is ready when the gRPC server has successfully
// started.
func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.serveUnsecureGRPC)
e.unit.Launch(e.serveSecureGRPC)
e.unit.Launch(e.serveGRPCWebProxy)
if e.config.RESTListenAddr != "" {
e.unit.Launch(e.serveREST)
}
return e.unit.Ready()
}
// Done returns a done channel that is closed once the engine has fully stopped.
// It sends a signal to stop the gRPC server, then closes the channel.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done(
e.unsecureGrpcServer.GracefulStop,
e.secureGrpcServer.GracefulStop,
func() {
err := e.httpServer.Shutdown(context.Background())
if err != nil {
e.log.Error().Err(err).Msg("error stopping http server")
}
},
func() {
if e.restServer != nil {
err := e.restServer.Shutdown(context.Background())
if err != nil {
e.log.Error().Err(err).Msg("error stopping http REST server")
}
}
})
}
// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
e.unit.Launch(func() {
err := e.process(event)
if err != nil {
e.log.Error().Err(err).Msg("could not process submitted event")
}
})
}
func (e *Engine) UnsecureGRPCAddress() net.Addr {
return e.unsecureGrpcAddress
}
func (e *Engine) SecureGRPCAddress() net.Addr {
return e.secureGrpcAddress
}
func (e *Engine) RestApiAddress() net.Addr {
return e.restAPIAddress
}
// process processes the given ingestion engine event. Events that are given
// to this function originate within the expulsion engine on the node with the
// given origin ID.
func (e *Engine) process(event interface{}) error {
switch entity := event.(type) {
case *flow.Block:
e.backend.NotifyFinalizedBlockHeight(entity.Header.Height)
return nil
default:
return fmt.Errorf("invalid event type (%T)", event)
}
}
// serveUnsecureGRPC starts the unsecure gRPC server
// When this function returns, the server is considered ready.
func (e *Engine) serveUnsecureGRPC() {
e.log.Info().Str("grpc_address", e.config.UnsecureGRPCListenAddr).Msg("starting grpc server on address")
l, err := net.Listen("tcp", e.config.UnsecureGRPCListenAddr)
if err != nil {
e.log.Err(err).Msg("failed to start the grpc server")
return
}
// save the actual address on which we are listening (may be different from e.config.UnsecureGRPCListenAddr if not port
// was specified)
e.unsecureGrpcAddress = l.Addr()
e.log.Debug().Str("unsecure_grpc_address", e.unsecureGrpcAddress.String()).Msg("listening on port")
err = e.unsecureGrpcServer.Serve(l) // blocking call
if err != nil {
e.log.Fatal().Err(err).Msg("fatal error in unsecure grpc server")
}
}
// serveSecureGRPC starts the secure gRPC server
// When this function returns, the server is considered ready.
func (e *Engine) serveSecureGRPC() {
e.log.Info().Str("secure_grpc_address", e.config.SecureGRPCListenAddr).Msg("starting grpc server on address")
l, err := net.Listen("tcp", e.config.SecureGRPCListenAddr)
if err != nil {
e.log.Err(err).Msg("failed to start the grpc server")
return
}
e.secureGrpcAddress = l.Addr()
e.log.Debug().Str("secure_grpc_address", e.secureGrpcAddress.String()).Msg("listening on port")
err = e.secureGrpcServer.Serve(l) // blocking call
if err != nil {
e.log.Fatal().Err(err).Msg("fatal error in secure grpc server")
}
}
// serveGRPCWebProxy starts the gRPC web proxy server
func (e *Engine) serveGRPCWebProxy() {
log := e.log.With().Str("http_proxy_address", e.config.HTTPListenAddr).Logger()
log.Info().Msg("starting http proxy server on address")
err := e.httpServer.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return
}
if err != nil {
e.log.Err(err).Msg("failed to start the http proxy server")
}
}
// serveREST starts the HTTP REST server
func (e *Engine) serveREST() {
e.log.Info().Str("rest_api_address", e.config.RESTListenAddr).Msg("starting REST server on address")
restAPIHandler := rest.NewHandlers(e.backend, e.log)
e.restServer = rest.NewServer(restAPIHandler, e.config.RESTListenAddr, e.log)
l, err := net.Listen("tcp", e.config.RESTListenAddr)
if err != nil {
e.log.Err(err).Msg("failed to start the REST server")
return
}
e.restAPIAddress = l.Addr()
e.log.Debug().Str("rest_api_address", e.restAPIAddress.String()).Msg("listening on port")
err = e.restServer.Serve(l) // blocking call
if err != nil {
if errors.Is(err, http.ErrServerClosed) {
return
}
e.log.Error().Err(err).Msg("fatal error in REST server")
}
}