-
Notifications
You must be signed in to change notification settings - Fork 4
/
grpc_server.go
268 lines (219 loc) · 9.41 KB
/
grpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package servenv
import (
"flag"
"fmt"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"math"
"time"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc/keepalive"
"gopkg.in/src-d/go-vitess.v1/vt/grpccommon"
"gopkg.in/src-d/go-vitess.v1/vt/log"
"gopkg.in/src-d/go-vitess.v1/vt/vttls"
)
// This file handles gRPC server, on its own port.
// Clients register servers, based on service map:
//
// servenv.RegisterGRPCFlags()
// servenv.OnRun(func() {
// if servenv.GRPCCheckServiceMap("XXX") {
// pb.RegisterXXX(servenv.GRPCServer, XXX)
// }
// }
//
// Note servenv.GRPCServer can only be used in servenv.OnRun,
// and not before, as it is initialized right before calling OnRun.
var (
// GRPCPort is the port to listen on for gRPC. If not set or zero, don't listen.
GRPCPort = flag.Int("grpc_port", 0, "Port to listen on for gRPC calls")
// GRPCCert is the cert to use if TLS is enabled
GRPCCert = flag.String("grpc_cert", "", "certificate to use, requires grpc_key, enables TLS")
// GRPCKey is the key to use if TLS is enabled
GRPCKey = flag.String("grpc_key", "", "key to use, requires grpc_cert, enables TLS")
// GRPCCA is the CA to use if TLS is enabled
GRPCCA = flag.String("grpc_ca", "", "ca to use, requires TLS, and enforces client cert check")
// GRPCAuth which auth plugin to use (at the moment now only static is supported)
GRPCAuth = flag.String("grpc_auth_mode", "", "Which auth plugin implementation to use (eg: static)")
// GRPCServer is the global server to serve gRPC.
GRPCServer *grpc.Server
// GRPCMaxConnectionAge is the maximum age of a client connection, before GoAway is sent.
// This is useful for L4 loadbalancing to ensure rebalancing after scaling.
GRPCMaxConnectionAge = flag.Duration("grpc_max_connection_age", time.Duration(math.MaxInt64), "Maximum age of a client connection before GoAway is sent.")
// GRPCMaxConnectionAgeGrace is an additional grace period after GRPCMaxConnectionAge, after which
// connections are forcibly closed.
GRPCMaxConnectionAgeGrace = flag.Duration("grpc_max_connection_age_grace", time.Duration(math.MaxInt64), "Additional grace period after grpc_max_connection_age, after which connections are forcibly closed.")
// GRPCInitialConnWindowSize ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
GRPCInitialConnWindowSize = flag.Int("grpc_server_initial_conn_window_size", 0, "grpc server initial connection window size")
// GRPCInitialWindowSize ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
GRPCInitialWindowSize = flag.Int("grpc_server_initial_window_size", 0, "grpc server initial window size")
// EnforcementPolicy MinTime that sets the keepalive enforcement policy on the server.
// This is the minimum amount of time a client should wait before sending a keepalive ping.
GRPCKeepAliveEnforcementPolicyMinTime = flag.Duration("grpc_server_keepalive_enforcement_policy_min_time", 5*time.Minute, "grpc server minimum keepalive time")
authPlugin Authenticator
)
// isGRPCEnabled returns true if gRPC server is set
func isGRPCEnabled() bool {
if GRPCPort != nil && *GRPCPort != 0 {
return true
}
if SocketFile != nil && *SocketFile != "" {
return true
}
return false
}
// createGRPCServer create the gRPC server we will be using.
// It has to be called after flags are parsed, but before
// services register themselves.
func createGRPCServer() {
// skip if not registered
if !isGRPCEnabled() {
log.Infof("Skipping gRPC server creation")
return
}
grpccommon.EnableTracingOpt()
var opts []grpc.ServerOption
if GRPCPort != nil && *GRPCCert != "" && *GRPCKey != "" {
config, err := vttls.ServerConfig(*GRPCCert, *GRPCKey, *GRPCCA)
if err != nil {
log.Exitf("Failed to log gRPC cert/key/ca: %v", err)
}
// create the creds server options
creds := credentials.NewTLS(config)
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
// Override the default max message size for both send and receive
// (which is 4 MiB in gRPC 1.0.0).
// Large messages can occur when users try to insert or fetch very big
// rows. If they hit the limit, they'll see the following error:
// grpc: received message length XXXXXXX exceeding the max size 4194304
// Note: For gRPC 1.0.0 it's sufficient to set the limit on the server only
// because it's not enforced on the client side.
log.Infof("Setting grpc max message size to %d", *grpccommon.MaxMessageSize)
opts = append(opts, grpc.MaxRecvMsgSize(*grpccommon.MaxMessageSize))
opts = append(opts, grpc.MaxSendMsgSize(*grpccommon.MaxMessageSize))
if *GRPCInitialConnWindowSize != 0 {
log.Infof("Setting grpc server initial conn window size to %d", int32(*GRPCInitialConnWindowSize))
opts = append(opts, grpc.InitialConnWindowSize(int32(*GRPCInitialConnWindowSize)))
}
if *GRPCInitialWindowSize != 0 {
log.Infof("Setting grpc server initial window size to %d", int32(*GRPCInitialWindowSize))
opts = append(opts, grpc.InitialWindowSize(int32(*GRPCInitialWindowSize)))
}
ep := keepalive.EnforcementPolicy{
MinTime: *GRPCKeepAliveEnforcementPolicyMinTime,
}
opts = append(opts, grpc.KeepaliveEnforcementPolicy(ep))
if GRPCMaxConnectionAge != nil {
ka := keepalive.ServerParameters{
MaxConnectionAge: *GRPCMaxConnectionAge,
}
if GRPCMaxConnectionAgeGrace != nil {
ka.MaxConnectionAgeGrace = *GRPCMaxConnectionAgeGrace
}
opts = append(opts, grpc.KeepaliveParams(ka))
}
if *GRPCAuth != "" {
log.Infof("enabling auth plugin %v", *GRPCAuth)
pluginInitializer := GetAuthenticator(*GRPCAuth)
authPluginImpl, err := pluginInitializer()
if err != nil {
log.Fatalf("Failed to load auth plugin: %v", err)
}
authPlugin = authPluginImpl
opts = append(opts, grpc.StreamInterceptor(streamInterceptor))
opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor))
}
if *grpccommon.EnableGRPCPrometheus {
opts = append(opts, grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor))
opts = append(opts, grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor))
}
GRPCServer = grpc.NewServer(opts...)
}
func serveGRPC() {
if *grpccommon.EnableGRPCPrometheus {
grpc_prometheus.Register(GRPCServer)
grpc_prometheus.EnableHandlingTimeHistogram()
}
// skip if not registered
if GRPCPort == nil || *GRPCPort == 0 {
return
}
// listen on the port
log.Infof("Listening for gRPC calls on port %v", *GRPCPort)
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", *GRPCPort))
if err != nil {
log.Exitf("Cannot listen on port %v for gRPC: %v", *GRPCPort, err)
}
// and serve on it
// NOTE: Before we call Serve(), all services must have registered themselves
// with "GRPCServer". This is the case because go/vt/servenv/run.go
// runs all OnRun() hooks after createGRPCServer() and before
// serveGRPC(). If this was not the case, the binary would crash with
// the error "grpc: Server.RegisterService after Server.Serve".
go GRPCServer.Serve(listener)
OnTermSync(func() {
log.Info("Initiated graceful stop of gRPC server")
GRPCServer.GracefulStop()
log.Info("gRPC server stopped")
})
}
// GRPCCheckServiceMap returns if we should register a gRPC service
// (and also logs how to enable / disable it)
func GRPCCheckServiceMap(name string) bool {
// Silently fail individual services if gRPC is not enabled in
// the first place (either on a grpc port or on the socket file)
if !isGRPCEnabled() {
return false
}
// then check ServiceMap
return CheckServiceMap("grpc", name)
}
func streamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
newCtx, err := authPlugin.Authenticate(stream.Context(), info.FullMethod)
if err != nil {
return err
}
wrapped := WrapServerStream(stream)
wrapped.WrappedContext = newCtx
return handler(srv, wrapped)
}
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
newCtx, err := authPlugin.Authenticate(ctx, info.FullMethod)
if err != nil {
return nil, err
}
return handler(newCtx, req)
}
// WrappedServerStream is based on the service stream wrapper from: https://github.com/grpc-ecosystem/go-grpc-middleware
type WrappedServerStream struct {
grpc.ServerStream
WrappedContext context.Context
}
// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
func (w *WrappedServerStream) Context() context.Context {
return w.WrappedContext
}
// WrapServerStream returns a ServerStream that has the ability to overwrite context.
func WrapServerStream(stream grpc.ServerStream) *WrappedServerStream {
if existing, ok := stream.(*WrappedServerStream); ok {
return existing
}
return &WrappedServerStream{ServerStream: stream, WrappedContext: stream.Context()}
}