-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathoption.go
305 lines (239 loc) · 8.47 KB
/
option.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package leafGooglePubsub
import (
"errors"
leafZap "github.com/paulusrobin/leaf-utilities/logger/integrations/zap"
leafLogger "github.com/paulusrobin/leaf-utilities/logger/logger"
"time"
google "cloud.google.com/go/pubsub"
)
type (
Option interface {
Apply(o *option)
}
option struct {
// Logger is interface used for logging purpose
// Logger default value is logs.DefaultLogger
logger leafLogger.Logger
// googleProject is used for connection project to google pubsub
// googleProject is required
googleProject string
// subscription is used for connection subscription to google pubsub
// subscription is required
// Map key is subscription topic
// Map value is subscription name
subscription map[string]string
// GoogleCredentialPath is used for connection project to google pubsub
googleCredentialPath string
// MaxRetry is used for retrying handler process when returning error
// before call error handler
//
// MaxRetry configuration can be disabled by specifying a
// number less than (or equal to) 0.
maxRetry int
// AckDeadline is the maximum period for which the subscription should
// automatically extend the ack deadline for each message.
//
// The subscription will automatically extend the ack deadline of all
// fetched Messages up to the duration specified. Automatic deadline
// extension beyond the initial receipt may be disabled by specifying a
// duration less than 0.
ackDeadline time.Duration
// MaxExtensionDeadline is the maximum duration by which to extend the ack
// deadline at a time. The ack deadline will continue to be extended by up
// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
// bounds the maximum amount of time before a message redelivery in the
// event the subscriber fails to extend the deadline.
//
// MaxExtensionDeadline configuration can be disabled by specifying a
// duration less than (or equal to) 0.
maxExtensionDeadline time.Duration
// MaxOutstandingMessages is the maximum number of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
// If the value is negative, then there will be no limit on the number of
// unprocessed messages.
maxOutstandingMessages int
// MaxOutstandingBytes is the maximum size of unprocessed messages
// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
// the value is negative, then there will be no limit on the number of bytes
// for unprocessed messages.
maxOutstandingBytes int
// BufSize is used for buffer channel to listen messages
// BufSize default value is 100
bufSize int
// If Asynchronous is false, then no more than MaxOutstandingMessages will be in
// memory at one time. (In contrast, when ASynchronous is true, more than
// MaxOutstandingMessages may have been received from the service and in memory
// before being processed.) MaxOutstandingBytes still refers to the total bytes
// processed, rather than in memory.
// The default is false.
//
// NumGoroutines is ignored when the Asynchronous is false (Synchronous)
asynchronous bool
// NumGoroutines only used if Asynchronous is true
//
// NumGoroutines is the number of goroutines that each data structure along
// the Receive path will spawn. Adjusting this value adjusts concurrency
// along the receive path.
//
// NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines.
//
// NumGoroutines does not limit the number of messages that can be processed
// concurrently. Even with one goroutine, many messages might be processed at
// once, because that goroutine may continually receive messages and invoke the
// function passed to Receive on them. To limit the number of messages being
// processed concurrently, set MaxOutstandingMessages.
numGoroutines int
failedDeadline time.Duration
slackNotification SlackNotification
}
SlackNotification struct {
Active bool
Hook string
Timeout time.Duration
}
)
func (o option) validate() error {
if o.googleProject == "" {
return errors.New("google project is required")
}
if o.slackNotification.Active && o.slackNotification.Hook == "" {
return errors.New("hook is required when slack notification is active")
}
if len(o.subscription) == 0 {
return errors.New("subscription is required")
}
for _, s := range o.subscription {
if s == "" {
return errors.New("subscription is required")
}
}
return nil
}
func defaultOption() option {
return option{
logger: leafZap.DefaultLog(),
maxRetry: 0,
ackDeadline: 60 * time.Second,
maxExtensionDeadline: 60 * time.Second,
failedDeadline: 60 * time.Second,
maxOutstandingMessages: google.DefaultReceiveSettings.MaxOutstandingMessages,
maxOutstandingBytes: google.DefaultReceiveSettings.MaxOutstandingBytes,
bufSize: 100,
asynchronous: false,
numGoroutines: google.DefaultReceiveSettings.NumGoroutines,
slackNotification: SlackNotification{
Active: true,
Hook: "https://hooks.slack.com/services/",
Timeout: 5 * time.Second,
},
}
}
type withLog struct{ leafLogger.Logger }
func WithLog(logger leafLogger.Logger) Option {
return withLog{logger}
}
func (w withLog) Apply(o *option) {
o.logger = w
}
type withGoogleProject string
func (w withGoogleProject) Apply(o *option) {
o.googleProject = string(w)
}
func WithGoogleProject(googleProject string) Option {
return withGoogleProject(googleProject)
}
type withSubscription map[string]string
func (w withSubscription) Apply(o *option) {
o.subscription = w
}
func WithSubscription(subscription map[string]string) Option {
return withSubscription(subscription)
}
type withGoogleCredentialPath string
func (w withGoogleCredentialPath) Apply(o *option) {
o.googleCredentialPath = string(w)
}
func WithGoogleCredentialPath(GoogleCredentialPath string) Option {
return withGoogleCredentialPath(GoogleCredentialPath)
}
type withMaxRetry int
func (w withMaxRetry) Apply(o *option) {
o.maxRetry = int(w)
}
func WithMaxRetry(MaxRetry int) Option {
return withMaxRetry(MaxRetry)
}
type withAckDeadline time.Duration
func (w withAckDeadline) Apply(o *option) {
o.ackDeadline = time.Duration(w)
}
func WithAckDeadline(AckDeadline time.Duration) Option {
return withAckDeadline(AckDeadline)
}
type withMaxExtensionDeadline time.Duration
func (w withMaxExtensionDeadline) Apply(o *option) {
o.maxExtensionDeadline = time.Duration(w)
}
func WithMaxExtensionDeadline(MaxExtensionDeadline time.Duration) Option {
return withMaxExtensionDeadline(MaxExtensionDeadline)
}
type withFailedDeadline time.Duration
func (w withFailedDeadline) Apply(o *option) {
o.failedDeadline = time.Duration(w)
}
func WithFailedDeadline(failedDeadline time.Duration) Option {
return withFailedDeadline(failedDeadline)
}
type withMaxOutstandingMessages int
func (w withMaxOutstandingMessages) Apply(o *option) {
o.maxOutstandingMessages = int(w)
}
func WithMaxOutstandingMessages(MaxOutstandingMessages int) Option {
return withMaxOutstandingMessages(MaxOutstandingMessages)
}
type withMaxOutstandingBytes int
func (w withMaxOutstandingBytes) Apply(o *option) {
o.maxOutstandingBytes = int(w)
}
func WithMaxOutstandingBytes(MaxOutstandingBytes int) Option {
return withMaxOutstandingBytes(MaxOutstandingBytes)
}
type withBufSize int
func (w withBufSize) Apply(o *option) {
o.bufSize = int(w)
}
func WithBufSize(BufSize int) Option {
return withBufSize(BufSize)
}
type withAsynchronous bool
func (w withAsynchronous) Apply(o *option) {
o.asynchronous = bool(w)
}
func WithAsynchronous(Asynchronous bool) Option {
return withAsynchronous(Asynchronous)
}
type withNumGoroutines int
func (w withNumGoroutines) Apply(o *option) {
o.numGoroutines = int(w)
}
func WithNumGoroutines(NumGoroutines int) Option {
return withNumGoroutines(NumGoroutines)
}
type withSlackNotification SlackNotification
func (w withSlackNotification) Apply(o *option) {
if o.slackNotification.Active {
o.slackNotification = SlackNotification(w)
}
}
func WithSlackNotification(notification SlackNotification) Option {
return withSlackNotification(notification)
}
type withoutSlackNotification bool
func (w withoutSlackNotification) Apply(o *option) {
o.slackNotification.Active = false
}
func WithoutSlackNotification() Option {
return withoutSlackNotification(true)
}