-
Notifications
You must be signed in to change notification settings - Fork 489
/
factory.go
259 lines (217 loc) · 7.21 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package factory
import (
"crypto/tls"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"path/filepath"
"runtime/debug"
"strconv"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
_ "github.com/talos-systems/talos/pkg/grpc/codec" // register codec
grpclog "github.com/talos-systems/talos/pkg/grpc/middleware/log"
)
// Registrator describes the set of methods required in order for a concrete
// type to be used with the Listen function.
type Registrator interface {
Register(*grpc.Server)
}
// Options is the functional options struct.
type Options struct {
Address string
Port int
SocketPath string
Network string
Config *tls.Config
ServerOptions []grpc.ServerOption
UnaryInterceptors []grpc.UnaryServerInterceptor
StreamInterceptors []grpc.StreamServerInterceptor
Reflection bool
logPrefix string
logDestination io.Writer
}
// Option is the functional option func.
type Option func(*Options)
// Address sets the listen address of the server.
func Address(a string) Option {
return func(args *Options) {
args.Address = a
}
}
// Port sets the listen port of the server.
func Port(o int) Option {
return func(args *Options) {
args.Port = o
}
}
// SocketPath sets the listen unix file socket path of the server.
func SocketPath(o string) Option {
return func(args *Options) {
args.SocketPath = o
}
}
// Network sets the network type of the listener.
func Network(o string) Option {
return func(args *Options) {
args.Network = o
}
}
// Config sets the listen port of the server.
func Config(o *tls.Config) Option {
return func(args *Options) {
args.Config = o
}
}
// ServerOptions appends to the gRPC server options of the server.
func ServerOptions(o ...grpc.ServerOption) Option {
return func(args *Options) {
args.ServerOptions = append(args.ServerOptions, o...)
}
}
// WithUnaryInterceptor appends to the list of gRPC server unary interceptors.
func WithUnaryInterceptor(i grpc.UnaryServerInterceptor) Option {
return func(args *Options) {
args.UnaryInterceptors = append(args.UnaryInterceptors, i)
}
}
// WithStreamInterceptor appends to the list of gRPC server stream interceptors.
func WithStreamInterceptor(i grpc.StreamServerInterceptor) Option {
return func(args *Options) {
args.StreamInterceptors = append(args.StreamInterceptors, i)
}
}
// WithLog sets up request logging to specified destination.
func WithLog(prefix string, w io.Writer) Option {
return func(args *Options) {
args.logPrefix = prefix
args.logDestination = w
}
}
// WithDefaultLog sets up request logging to default destination.
func WithDefaultLog() Option {
return func(args *Options) {
args.logDestination = log.Writer()
}
}
// WithReflection enables gRPC reflection APIs: https://github.com/grpc/grpc/blob/master/doc/server-reflection.md
func WithReflection() Option {
return func(args *Options) {
args.Reflection = true
}
}
func recoveryHandler(logger *log.Logger) grpc_recovery.RecoveryHandlerFunc {
return func(p interface{}) error {
if logger != nil {
logger.Printf("panic: %v\n%s", p, string(debug.Stack()))
}
return status.Errorf(codes.Internal, "%v", p)
}
}
// NewDefaultOptions initializes the Options struct with default values.
func NewDefaultOptions(setters ...Option) *Options {
opts := &Options{
Network: "tcp",
SocketPath: "/run/factory/factory.sock",
}
for _, setter := range setters {
setter(opts)
}
var logger *log.Logger
if opts.logDestination != nil {
logger = log.New(opts.logDestination, opts.logPrefix, log.Flags())
}
// Recovery is installed as the the first middleware in the chain to handle panics (via defer and recover()) in all subsequent middlewares.
recoveryOpt := grpc_recovery.WithRecoveryHandler(recoveryHandler(logger))
opts.UnaryInterceptors = append(
[]grpc.UnaryServerInterceptor{grpc_recovery.UnaryServerInterceptor(recoveryOpt)},
opts.UnaryInterceptors...,
)
opts.StreamInterceptors = append(
[]grpc.StreamServerInterceptor{grpc_recovery.StreamServerInterceptor(recoveryOpt)},
opts.StreamInterceptors...,
)
if logger != nil {
// Logging is installed as the first middleware (even before recovery middleware) in the chain
// so that request in the form it was received and status sent on the wire is logged (error/success).
// It also tracks the whole duration of the request, including other middleware overhead.
logMiddleware := grpclog.NewMiddleware(logger)
opts.UnaryInterceptors = append(
[]grpc.UnaryServerInterceptor{logMiddleware.UnaryInterceptor()},
opts.UnaryInterceptors...,
)
opts.StreamInterceptors = append(
[]grpc.StreamServerInterceptor{logMiddleware.StreamInterceptor()},
opts.StreamInterceptors...,
)
}
opts.ServerOptions = append(
opts.ServerOptions,
grpc.InitialWindowSize(65535*32),
grpc.InitialConnWindowSize(65535*16),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(opts.UnaryInterceptors...)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(opts.StreamInterceptors...)),
)
return opts
}
// NewServer builds grpc server and binds it to the Registrator.
func NewServer(r Registrator, setters ...Option) *grpc.Server {
opts := NewDefaultOptions(setters...)
server := grpc.NewServer(opts.ServerOptions...)
r.Register(server)
if opts.Reflection {
reflection.Register(server)
}
return server
}
// NewListener builds listener for grpc server.
func NewListener(setters ...Option) (net.Listener, error) {
opts := NewDefaultOptions(setters...)
if opts.Network == "tcp" && opts.Port == 0 {
return nil, errors.New("a port is required for TCP listener")
}
var address string
switch opts.Network {
case "unix":
address = opts.SocketPath
// Unlink the address or we will get the error:
// bind: address already in use.
if _, err := os.Stat(address); err == nil {
if err := os.Remove(address); err != nil {
return nil, err
}
}
// Make any dirs on the path to the listening socket.
if err := os.MkdirAll(filepath.Dir(address), 0o700); err != nil {
return nil, fmt.Errorf("error creating containing directory for the file socket; %w", err)
}
case "tcp":
address = net.JoinHostPort(opts.Address, strconv.Itoa(opts.Port))
default:
return nil, fmt.Errorf("unknown network: %s", opts.Network)
}
return net.Listen(opts.Network, address)
}
// ListenAndServe configures TLS for mutual authentication by loading the CA into a
// CertPool and configuring the server's policy for TLS Client Authentication.
// Once TLS is configured, the gRPC options are built to make use of the TLS
// configuration and the receiver (Server) is registered to the gRPC server.
// Finally the gRPC server is started.
func ListenAndServe(r Registrator, setters ...Option) (err error) {
server := NewServer(r, setters...)
listener, err := NewListener(setters...)
if err != nil {
return err
}
return server.Serve(listener)
}