-
Notifications
You must be signed in to change notification settings - Fork 179
/
engine.go
164 lines (137 loc) · 5.04 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Package rpc implements accepting transactions into the system.
// It implements a subset of the Observation API.
package rpc
import (
"context"
"fmt"
"net"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/rpc"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
)
// Backend defines the core functionality required by the RPC API.
type Backend interface {
// ProcessTransaction handles validating and ingesting a new transaction,
// ultimately for inclusion in a future collection.
ProcessTransaction(*flow.TransactionBody) error
}
// Config defines the configurable options for the ingress server.
type Config struct {
ListenAddr string
MaxMsgSize uint // in bytes
RpcMetricsEnabled bool // enable GRPC metrics
}
// Engine implements a gRPC server with a simplified version of the Observation
// API to enable receiving transactions into the system.
type Engine struct {
unit *engine.Unit
log zerolog.Logger
handler *handler // the gRPC service implementation
server *grpc.Server // the gRPC server
config Config
}
// New returns a new ingress server.
func New(
config Config,
backend Backend,
log zerolog.Logger,
chainID flow.ChainID,
apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the gRPC API e.g. Ping->100, ExecuteScriptAtBlockID->300
apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the gRPC API e.g. Ping->50, ExecuteScriptAtBlockID->10
) *Engine {
// create a GRPC server to serve GRPC clients
grpcOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(int(config.MaxMsgSize)),
grpc.MaxSendMsgSize(int(config.MaxMsgSize)),
}
var interceptors []grpc.UnaryServerInterceptor // ordered list of interceptors
// if rpc metrics is enabled, add the grpc metrics interceptor as a server option
if config.RpcMetricsEnabled {
interceptors = append(interceptors, grpc_prometheus.UnaryServerInterceptor)
}
if len(apiRatelimits) > 0 {
// create a rate limit interceptor
rateLimitInterceptor := rpc.NewRateLimiterInterceptor(log, apiRatelimits, apiBurstLimits).UnaryServerInterceptor
// append the rate limit interceptor to the list of interceptors
interceptors = append(interceptors, rateLimitInterceptor)
}
// create a chained unary interceptor
chainedInterceptors := grpc.ChainUnaryInterceptor(interceptors...)
grpcOpts = append(grpcOpts, chainedInterceptors)
server := grpc.NewServer(grpcOpts...)
e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "collection_rpc").Logger(),
handler: &handler{
UnimplementedAccessAPIServer: access.UnimplementedAccessAPIServer{},
backend: backend,
chainID: chainID,
},
server: server,
config: config,
}
if config.RpcMetricsEnabled {
grpc_prometheus.EnableHandlingTimeHistogram()
grpc_prometheus.Register(server)
}
access.RegisterAccessAPIServer(e.server, e.handler)
return e
}
// Ready returns a ready channel that is closed once the module has fully
// started. The ingress module is ready when the gRPC server has successfully
// started.
func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.serve)
return e.unit.Ready()
}
// Done returns a done channel that is closed once the module has fully stopped.
// It sends a signal to stop the gRPC server, then closes the channel.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done(e.server.GracefulStop)
}
// serve starts the gRPC server .
//
// When this function returns, the server is considered ready.
func (e *Engine) serve() {
e.log.Info().Msgf("starting server on address %s", e.config.ListenAddr)
l, err := net.Listen("tcp", e.config.ListenAddr)
if err != nil {
e.log.Fatal().Err(err).Msg("failed to start server")
return
}
err = e.server.Serve(l)
if err != nil {
e.log.Error().Err(err).Msg("fatal error in server")
}
}
// handler implements a subset of the Observation API.
type handler struct {
access.UnimplementedAccessAPIServer
backend Backend
chainID flow.ChainID
}
// Ping responds to requests when the server is up.
func (h *handler) Ping(_ context.Context, _ *access.PingRequest) (*access.PingResponse, error) {
return &access.PingResponse{}, nil
}
// SendTransaction accepts new transactions and inputs them to the ingress
// engine for validation and routing.
func (h *handler) SendTransaction(_ context.Context, req *access.SendTransactionRequest) (*access.SendTransactionResponse, error) {
tx, err := convert.MessageToTransaction(req.Transaction, h.chainID.Chain())
if err != nil {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to convert transaction: %v", err))
}
err = h.backend.ProcessTransaction(&tx)
if err != nil {
return nil, err
}
txID := tx.ID()
return &access.SendTransactionResponse{Id: txID[:]}, nil
}