/
options.go
459 lines (401 loc) · 11 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
package server
import (
"context"
"crypto/tls"
"net"
"sync"
"time"
"github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/tracer"
)
// Option func
type Option func(*Options)
// Options server struct
type Options struct {
// Context holds the external options and can be used for server shutdown
Context context.Context
// Broker holds the server broker
Broker broker.Broker
// Register holds the register
Register register.Register
// Tracer holds the tracer
Tracer tracer.Tracer
// Auth holds the auth
Auth auth.Auth
// Logger holds the logger
Logger logger.Logger
// Meter holds the meter
Meter meter.Meter
// Transport holds the transport
Transport transport.Transport
/*
// Router for requests
Router Router
*/
// Listener may be passed if already created
Listener net.Listener
// Wait group
Wait *sync.WaitGroup
// TLSConfig specifies tls.Config for secure serving
TLSConfig *tls.Config
// Metadata holds the server metadata
Metadata metadata.Metadata
// RegisterCheck run before register server
RegisterCheck func(context.Context) error
// Codecs map to handle content-type
Codecs map[string]codec.Codec
// ID holds the id of the server
ID string
// Namespace for te server
Namespace string
// Name holds the server name
Name string
// Address holds the server address
Address string
// Advertise holds the advertise address
Advertise string
// Version holds the server version
Version string
// SubWrappers holds the server subscribe wrappers
SubWrappers []SubscriberWrapper
// BatchSubWrappers holds the server batch subscribe wrappers
BatchSubWrappers []BatchSubscriberWrapper
// HdlrWrappers holds the handler wrappers
HdlrWrappers []HandlerWrapper
// RegisterAttempts holds the number of register attempts before error
RegisterAttempts int
// RegisterInterval holds he interval for re-register
RegisterInterval time.Duration
// RegisterTTL specifies TTL for register record
RegisterTTL time.Duration
// MaxConn limits number of connections
MaxConn int
// DeregisterAttempts holds the number of deregister attempts before error
DeregisterAttempts int
}
// NewOptions returns new options struct with default or passed values
func NewOptions(opts ...Option) Options {
options := Options{
Auth: auth.DefaultAuth,
Codecs: make(map[string]codec.Codec),
Context: context.Background(),
Metadata: metadata.New(0),
RegisterInterval: DefaultRegisterInterval,
RegisterTTL: DefaultRegisterTTL,
RegisterCheck: DefaultRegisterCheck,
Logger: logger.DefaultLogger,
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
Broker: broker.DefaultBroker,
Register: register.DefaultRegister,
Transport: transport.DefaultTransport,
Address: DefaultAddress,
Name: DefaultName,
Version: DefaultVersion,
ID: DefaultID,
Namespace: DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
return options
}
// Name sets the server name option
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// Namespace to register handlers in
func Namespace(n string) Option {
return func(o *Options) {
o.Namespace = n
}
}
// Logger sets the logger option
func Logger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
// Meter sets the meter option
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
// ID unique server id
func ID(id string) Option {
return func(o *Options) {
o.ID = id
}
}
// Version of the service
func Version(v string) Option {
return func(o *Options) {
o.Version = v
}
}
// Address to bind to - host:port
func Address(a string) Option {
return func(o *Options) {
o.Address = a
}
}
// Advertise the address to advertise for discovery - host:port
func Advertise(a string) Option {
return func(o *Options) {
o.Advertise = a
}
}
// Broker to use for pub/sub
func Broker(b broker.Broker) Option {
return func(o *Options) {
o.Broker = b
}
}
// Codec to use to encode/decode requests for a given content type
func Codec(contentType string, c codec.Codec) Option {
return func(o *Options) {
o.Codecs[contentType] = c
}
}
// Context specifies a context for the service.
// Can be used to signal shutdown of the service
// Can be used for extra option values.
func Context(ctx context.Context) Option {
return func(o *Options) {
o.Context = ctx
}
}
// Register used for discovery
func Register(r register.Register) Option {
return func(o *Options) {
o.Register = r
}
}
// Tracer mechanism for distributed tracking
func Tracer(t tracer.Tracer) Option {
return func(o *Options) {
o.Tracer = t
}
}
// Auth mechanism for role based access control
func Auth(a auth.Auth) Option {
return func(o *Options) {
o.Auth = a
}
}
// Transport mechanism for communication e.g http, rabbitmq, etc
func Transport(t transport.Transport) Option {
return func(o *Options) {
o.Transport = t
}
}
// Metadata associated with the server
func Metadata(md metadata.Metadata) Option {
return func(o *Options) {
o.Metadata = metadata.Copy(md)
}
}
// RegisterCheck run func before register service
func RegisterCheck(fn func(context.Context) error) Option {
return func(o *Options) {
o.RegisterCheck = fn
}
}
// RegisterTTL registers service with a TTL
func RegisterTTL(t time.Duration) Option {
return func(o *Options) {
o.RegisterTTL = t
}
}
// RegisterInterval registers service with at interval
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// TLSConfig specifies a *tls.Config
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
// set the internal tls
o.TLSConfig = t
// set the default transport if one is not
// already set. Required for Init call below.
// set the transport tls
_ = o.Transport.Init(
transport.TLSConfig(t),
)
}
}
/*
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {
o.Router = r
}
}
*/
// Wait tells the server to wait for requests to finish before exiting
// If `wg` is nil, server only wait for completion of rpc handler.
// For user need finer grained control, pass a concrete `wg` here, server will
// wait against it on stop.
func Wait(wg *sync.WaitGroup) Option {
return func(o *Options) {
if wg == nil {
wg = new(sync.WaitGroup)
}
o.Wait = wg
}
}
// WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w HandlerWrapper) Option {
return func(o *Options) {
o.HdlrWrappers = append(o.HdlrWrappers, w)
}
}
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w SubscriberWrapper) Option {
return func(o *Options) {
o.SubWrappers = append(o.SubWrappers, w)
}
}
// WrapBatchSubscriber adds a batch subscriber Wrapper to a list of options passed into the server
func WrapBatchSubscriber(w BatchSubscriberWrapper) Option {
return func(o *Options) {
o.BatchSubWrappers = append(o.BatchSubWrappers, w)
}
}
// MaxConn specifies maximum number of max simultaneous connections to server
func MaxConn(n int) Option {
return func(o *Options) {
o.MaxConn = n
}
}
// Listener specifies the net.Listener to use instead of the default
func Listener(l net.Listener) Option {
return func(o *Options) {
o.Listener = l
}
}
// HandlerOption func
type HandlerOption func(*HandlerOptions)
// HandlerOptions struct
type HandlerOptions struct {
// Context holds external options
Context context.Context
// Metadata for hondler
Metadata map[string]metadata.Metadata
}
// NewHandlerOptions creates new HandlerOptions
func NewHandlerOptions(opts ...HandlerOption) HandlerOptions {
options := HandlerOptions{
Context: context.Background(),
Metadata: make(map[string]metadata.Metadata),
}
for _, o := range opts {
o(&options)
}
return options
}
// SubscriberOption func
type SubscriberOption func(*SubscriberOptions)
// SubscriberOptions struct
type SubscriberOptions struct {
// Context holds the external options
Context context.Context
// Queue holds the subscription queue
Queue string
// AutoAck flag for auto ack messages after processing
AutoAck bool
// BodyOnly flag specifies that message without headers
BodyOnly bool
// Batch flag specifies that message processed in batches
Batch bool
// BatchSize flag specifies max size of batch
BatchSize int
// BatchWait flag specifies max wait time for batch filling
BatchWait time.Duration
}
// NewSubscriberOptions create new SubscriberOptions
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
options := SubscriberOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints.
func EndpointMetadata(name string, md metadata.Metadata) HandlerOption {
return func(o *HandlerOptions) {
o.Metadata[name] = metadata.Copy(md)
}
}
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = false
}
}
// SubscriberQueue sets the shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberGroup sets the shared group name distributed messages across subscribers
func SubscriberGroup(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberBodyOnly says broker that message contains raw data with absence of micro broker.Message format
func SubscriberBodyOnly(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.BodyOnly = b
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}
// SubscriberAck control auto ack processing for handler
func SubscriberAck(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = b
}
}
// SubscriberBatch control batch processing for handler
func SubscriberBatch(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Batch = b
}
}
// SubscriberBatchSize control batch filling size for handler
// Batch filling max waiting time controlled by SubscriberBatchWait
func SubscriberBatchSize(n int) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchSize = n
}
}
// SubscriberBatchWait control batch filling wait time for handler
func SubscriberBatchWait(td time.Duration) SubscriberOption {
return func(o *SubscriberOptions) {
o.BatchWait = td
}
}