forked from lovoo/goka
/
options.go
440 lines (371 loc) · 13.4 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
package goka
import (
"fmt"
"path/filepath"
"time"
"github.com/lovoo/goka/kafka"
"github.com/lovoo/goka/storage"
metrics "github.com/rcrowley/go-metrics"
)
// UpdateCallback is invoked upon arrival of a message for a table partition.
// The partition storage shall be updated in the callback.
type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error
// StorageBuilder creates a local storage (a persistent cache) for a topic
// table. StorageBuilder creates one storage for each partition of the topic.
type StorageBuilder func(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error)
///////////////////////////////////////////////////////////////////////////////
// default values
///////////////////////////////////////////////////////////////////////////////
const (
defaultClientID = "goka"
)
// DefaultUpdate is the default callback used to update the local storage with
// from the table topic in Kafka. It is called for every message received
// during recovery of processors and during the normal operation of views.
// DefaultUpdate can be used in the function passed to WithUpdateCallback and
// WithViewCallback.
func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error {
return s.SetEncoded(key, value)
}
type consumerBuilder func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error)
type producerBuilder func(brokers []string, registry metrics.Registry) (kafka.Producer, error)
type topicmgrBuilder func(brokers []string) (kafka.TopicManager, error)
func defaultConsumerBuilder(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
return kafka.NewSaramaConsumer(brokers, group, registry)
}
func defaultProducerBuilder(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
return kafka.NewProducer(brokers, registry)
}
func defaultTopicManagerBuilder(brokers []string) (kafka.TopicManager, error) {
return kafka.NewSaramaTopicManager(brokers)
}
///////////////////////////////////////////////////////////////////////////////
// processor options
///////////////////////////////////////////////////////////////////////////////
// ProcessorOption defines a configuration option to be used when creating a processor.
type ProcessorOption func(*poptions)
// processor options
type poptions struct {
clientID string
updateCallback UpdateCallback
storagePath string
storageSnapshotInterval time.Duration
registry metrics.Registry
partitionChannelSize int
builders struct {
storage StorageBuilder
consumer consumerBuilder
producer producerBuilder
topicmgr topicmgrBuilder
}
gokaRegistry metrics.Registry
kafkaRegistry metrics.Registry
}
// WithUpdateCallback defines the callback called upon recovering a message
// from the log.
func WithUpdateCallback(cb UpdateCallback) ProcessorOption {
return func(o *poptions) {
o.updateCallback = cb
}
}
// WithClientID defines the client ID used to identify with kafka.
func WithClientID(clientID string) ProcessorOption {
return func(o *poptions) {
o.clientID = clientID
}
}
// WithStorageBuilder defines a builder for the storage of each partition.
func WithStorageBuilder(sb StorageBuilder) ProcessorOption {
return func(o *poptions) {
o.builders.storage = sb
}
}
// WithTopicManager defines a topic manager.
func WithTopicManager(tm kafka.TopicManager) ProcessorOption {
return func(o *poptions) {
o.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) {
if tm == nil {
return nil, fmt.Errorf("TopicManager cannot be nil")
}
return tm, nil
}
}
}
// WithConsumer replaces goka's default consumer. Mainly for testing.
func WithConsumer(c kafka.Consumer) ProcessorOption {
return func(o *poptions) {
o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
if c == nil {
return nil, fmt.Errorf("consumer cannot be nil")
}
return c, nil
}
}
}
// WithProducer replaces goka'S default producer. Mainly for testing.
func WithProducer(p kafka.Producer) ProcessorOption {
return func(o *poptions) {
o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
if p == nil {
return nil, fmt.Errorf("producer cannot be nil")
}
return p, nil
}
}
}
// WithStoragePath defines the base path for the local storage on disk
func WithStoragePath(storagePath string) ProcessorOption {
return func(o *poptions) {
o.storagePath = storagePath
}
}
// WithKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
func WithKafkaMetrics(registry metrics.Registry) ProcessorOption {
return func(o *poptions) {
o.kafkaRegistry = registry
}
}
// WithPartitionChannelSize replaces the default partition channel size.
// This is mostly used for testing by setting it to 0 to have synchronous behavior
// of goka.
func WithPartitionChannelSize(size int) ProcessorOption {
return func(o *poptions) {
o.partitionChannelSize = size
}
}
// WithStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all)
// Greater interval -> less writes to disk, more memory usage
// Smaller interval -> more writes to disk, less memory usage
func WithStorageSnapshotInterval(interval time.Duration) ProcessorOption {
return func(o *poptions) {
o.storageSnapshotInterval = interval
}
}
func (opt *poptions) applyOptions(group string, opts ...ProcessorOption) error {
opt.clientID = defaultClientID
for _, o := range opts {
o(opt)
}
// config not set, use default one
if opt.builders.storage == nil {
opt.builders.storage = opt.defaultStorageBuilder
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
}
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
}
if opt.storageSnapshotInterval == 0 {
opt.storageSnapshotInterval = storage.DefaultStorageSnapshotInterval
}
opt.registry = metrics.NewRegistry()
// prefix registry
opt.gokaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("goka.processor-%s.", group))
// Set a default registry to pass it to the kafka producer/consumer builders.
if opt.kafkaRegistry == nil {
opt.kafkaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("kafka.processor-%s.", group))
}
return nil
}
func (opt *poptions) storagePathForPartition(topic string, partitionID int32) string {
return filepath.Join(opt.storagePath, "processor", fmt.Sprintf("%s.%d", topic, partitionID))
}
func (opt *poptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return storage.New(opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
}
///////////////////////////////////////////////////////////////////////////////
// view options
///////////////////////////////////////////////////////////////////////////////
// ViewOption defines a configuration option to be used when creating a view.
type ViewOption func(*voptions)
type voptions struct {
tableCodec Codec
updateCallback UpdateCallback
storagePath string
storageSnapshotInterval time.Duration
registry metrics.Registry
partitionChannelSize int
builders struct {
storage StorageBuilder
consumer consumerBuilder
topicmgr topicmgrBuilder
}
gokaRegistry metrics.Registry
kafkaRegistry metrics.Registry
}
// WithViewCallback defines the callback called upon recovering a message
// from the log.
func WithViewCallback(cb UpdateCallback) ViewOption {
return func(o *voptions) {
o.updateCallback = cb
}
}
// WithViewStorageBuilder defines a builder for the storage of each partition.
func WithViewStorageBuilder(sb StorageBuilder) ViewOption {
return func(o *voptions) {
o.builders.storage = sb
}
}
// WithViewConsumer replaces goka's default view consumer. Mainly for testing.
func WithViewConsumer(c kafka.Consumer) ViewOption {
return func(o *voptions) {
o.builders.consumer = func(brokers []string, group string, registry metrics.Registry) (kafka.Consumer, error) {
if c == nil {
return nil, fmt.Errorf("consumer cannot be nil")
}
return c, nil
}
}
}
// WithViewTopicManager defines a topic manager.
func WithViewTopicManager(tm kafka.TopicManager) ViewOption {
return func(o *voptions) {
o.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) {
if tm == nil {
return nil, fmt.Errorf("TopicManager cannot be nil")
}
return tm, nil
}
}
}
// WithViewStoragePath defines the base path for the local storage on disk
func WithViewStoragePath(storagePath string) ViewOption {
return func(o *voptions) {
o.storagePath = storagePath
}
}
// WithViewStorageSnapshotInterval sets the interval in which the storage will snapshot to disk (if it is supported by the storage at all)
// Greater interval -> less writes to disk, more memory usage
// Smaller interval -> more writes to disk, less memory usage
func WithViewStorageSnapshotInterval(interval time.Duration) ViewOption {
return func(o *voptions) {
o.storageSnapshotInterval = interval
}
}
// WithViewKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
func WithViewKafkaMetrics(registry metrics.Registry) ViewOption {
return func(o *voptions) {
o.kafkaRegistry = registry
}
}
// WithViewPartitionChannelSize replaces the default partition channel size.
// This is mostly used for testing by setting it to 0 to have synchronous behavior
// of goka.
func WithViewPartitionChannelSize(size int) ViewOption {
return func(o *voptions) {
o.partitionChannelSize = size
}
}
func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
for _, o := range opts {
o(opt)
}
// config not set, use default one
if opt.builders.storage == nil {
opt.builders.storage = opt.defaultStorageBuilder
}
if opt.builders.consumer == nil {
opt.builders.consumer = defaultConsumerBuilder
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
}
opt.registry = metrics.NewRegistry()
// Set a default registry to pass it to the kafka consumer builders.
if opt.kafkaRegistry == nil {
opt.kafkaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("kafka.view-%s.", topic))
}
// prefix registry
opt.gokaRegistry = metrics.NewPrefixedChildRegistry(opt.registry, fmt.Sprintf("goka.view-%s.", topic))
if opt.storageSnapshotInterval == 0 {
opt.storageSnapshotInterval = storage.DefaultStorageSnapshotInterval
}
return nil
}
func (opt *voptions) storagePathForPartition(topic string, partitionID int32) string {
return filepath.Join(opt.storagePath, "view", fmt.Sprintf("%s.%d", topic, partitionID))
}
func (opt *voptions) defaultStorageBuilder(topic string, partition int32, codec Codec, reg metrics.Registry) (storage.Storage, error) {
return storage.New(opt.storagePathForPartition(topic, partition), codec, reg, opt.storageSnapshotInterval)
}
///////////////////////////////////////////////////////////////////////////////
// emitter options
///////////////////////////////////////////////////////////////////////////////
// EmitterOption defines a configuration option to be used when creating an
// emitter.
type EmitterOption func(*eoptions)
// emitter options
type eoptions struct {
clientID string
registry metrics.Registry
codec Codec
builders struct {
topicmgr topicmgrBuilder
producer producerBuilder
}
kafkaRegistry metrics.Registry
}
// WithEmitterClientID defines the client ID used to identify with kafka.
func WithEmitterClientID(clientID string) EmitterOption {
return func(o *eoptions) {
o.clientID = clientID
}
}
// WithEmitterTopicManager defines a topic manager.
func WithEmitterTopicManager(tm kafka.TopicManager) EmitterOption {
return func(o *eoptions) {
o.builders.topicmgr = func(brokers []string) (kafka.TopicManager, error) {
if tm == nil {
return nil, fmt.Errorf("TopicManager cannot be nil")
}
return tm, nil
}
}
}
// WithEmitterProducer replaces goka's default producer. Mainly for testing.
func WithEmitterProducer(p kafka.Producer) EmitterOption {
return func(o *eoptions) {
o.builders.producer = func(brokers []string, registry metrics.Registry) (kafka.Producer, error) {
if p == nil {
return nil, fmt.Errorf("producer cannot be nil")
}
return p, nil
}
}
}
// WithEmitterKafkaMetrics sets a go-metrics registry to collect
// kafka metrics.
// The metric-points are https://godoc.org/github.com/Shopify/sarama
func WithEmitterKafkaMetrics(registry metrics.Registry) EmitterOption {
return func(o *eoptions) {
o.kafkaRegistry = registry
}
}
func (opt *eoptions) applyOptions(opts ...EmitterOption) error {
opt.clientID = defaultClientID
for _, o := range opts {
o(opt)
}
// config not set, use default one
if opt.builders.producer == nil {
opt.builders.producer = defaultProducerBuilder
}
if opt.builders.topicmgr == nil {
opt.builders.topicmgr = defaultTopicManagerBuilder
}
// Set a default registry to pass it to the kafka producer/consumer builders.
if opt.kafkaRegistry == nil {
opt.kafkaRegistry = metrics.NewPrefixedRegistry("goka.kafka.producer.")
}
// prefix registry
opt.registry = metrics.NewPrefixedRegistry("goka.producer.")
return nil
}