/
default_builders.go
111 lines (98 loc) · 3.85 KB
/
default_builders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package kstream
import (
"github.com/tryfix/kstream/admin"
"github.com/tryfix/kstream/backend"
"github.com/tryfix/kstream/backend/memory"
"github.com/tryfix/kstream/consumer"
"github.com/tryfix/kstream/kstream/changelog"
"github.com/tryfix/kstream/kstream/encoding"
"github.com/tryfix/kstream/kstream/offsets"
"github.com/tryfix/kstream/kstream/store"
"github.com/tryfix/kstream/producer"
)
type DefaultBuilders struct {
Producer producer.Builder
changelog changelog.Builder
Consumer consumer.Builder
PartitionConsumer consumer.PartitionConsumerBuilder
Store store.Builder
IndexedStore store.IndexedStoreBuilder
Backend backend.Builder
StateStore store.StateStoreBuilder
OffsetManager offsets.Manager
KafkaAdmin admin.KafkaAdmin
configs *StreamBuilderConfig
}
func (dbs *DefaultBuilders) build(options ...BuilderOption) {
// apply options
for _, option := range options {
option(dbs)
}
// default backend builder will be memory
if dbs.configs.Store.BackendBuilder == nil {
backendBuilderConfig := memory.NewConfig()
backendBuilderConfig.Logger = dbs.configs.Logger
backendBuilderConfig.MetricsReporter = dbs.configs.MetricsReporter
dbs.Backend = memory.Builder(backendBuilderConfig)
dbs.configs.Store.BackendBuilder = dbs.Backend
}
dbs.Backend = dbs.configs.Store.BackendBuilder
dbs.configs.Store.BackendBuilder = dbs.Backend
dbs.Store = func(name string, keyEncoder encoding.Builder, valEncoder encoding.Builder, options ...store.Options) (store.Store, error) {
return store.NewStore(name, keyEncoder(), valEncoder(), append(
options,
store.WithBackendBuilder(dbs.configs.Store.BackendBuilder),
store.WithLogger(dbs.configs.Logger),
)...)
}
dbs.IndexedStore = func(name string, keyEncoder encoding.Builder, valEncoder encoding.Builder, indexes []store.Index, options ...store.Options) (store.IndexedStore, error) {
return store.NewIndexedStore(name, keyEncoder(), valEncoder(), indexes, append(
options,
store.WithBackendBuilder(dbs.configs.Store.BackendBuilder),
store.WithLogger(dbs.configs.Logger),
)...)
}
if dbs.Producer == nil {
pool, err := producer.NewPool(dbs.configs.Producer.Pool.NumOfWorkers, func(options *producer.Config) (producer.Producer, error) {
options = dbs.configs.Producer
options.BootstrapServers = dbs.configs.BootstrapServers
options.Logger = dbs.configs.Logger
options.MetricsReporter = dbs.configs.MetricsReporter
return producer.NewProducer(options)
})
if err != nil {
dbs.configs.Logger.Fatal(err)
}
dbs.Producer = func(options *producer.Config) (producer.Producer, error) {
return pool, nil
}
}
if dbs.Consumer == nil {
dbs.Consumer = consumer.NewBuilder()
}
dbs.Consumer.Config().GroupId = dbs.configs.ApplicationId
dbs.Consumer.Config().BootstrapServers = dbs.configs.BootstrapServers
dbs.Consumer.Config().MetricsReporter = dbs.configs.MetricsReporter
dbs.Consumer.Config().Logger = dbs.configs.Logger
dbs.Consumer.Config().Consumer = dbs.configs.Consumer.Consumer
if dbs.OffsetManager == nil {
dbs.OffsetManager = offsets.NewManager(&offsets.Config{
Config: dbs.configs.Config,
BootstrapServers: dbs.configs.BootstrapServers,
Logger: dbs.configs.Logger,
})
}
if dbs.KafkaAdmin == nil {
dbs.KafkaAdmin = admin.NewKafkaAdmin(dbs.configs.BootstrapServers,
admin.WithKafkaVersion(dbs.configs.Consumer.Version),
admin.WithLogger(dbs.configs.Logger),
)
}
if dbs.PartitionConsumer == nil {
dbs.PartitionConsumer = consumer.NewPartitionConsumerBuilder()
}
dbs.PartitionConsumer.Config().BootstrapServers = dbs.configs.BootstrapServers
dbs.PartitionConsumer.Config().MetricsReporter = dbs.configs.MetricsReporter
dbs.PartitionConsumer.Config().Logger = dbs.configs.Logger
dbs.PartitionConsumer.Config().Config.Consumer = dbs.configs.Consumer.Consumer
}