-
Notifications
You must be signed in to change notification settings - Fork 50
/
config_postprocessor_publisher.go
156 lines (118 loc) · 5.39 KB
/
config_postprocessor_publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package mdlsub
import (
"fmt"
"time"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/stream"
)
func init() {
cfg.AddPostProcessor(8, "gosoline.mdlsub.publisher", PublisherConfigPostProcessor)
}
const sharedName = "publisher"
type publisherOutputTypeHandler func(config cfg.Config, publisherSettings *PublisherSettings, producerSettings *stream.ProducerSettings) stream.BaseOutputConfigurationAware
var publisherOutputTypeHandlers = map[string]publisherOutputTypeHandler{
stream.OutputTypeInMemory: handlePublisherOutputTypeInMemory,
stream.OutputTypeKinesis: handlePublisherOutputTypeKinesis,
stream.OutputTypeSns: handlePublisherOutputTypeSns,
stream.OutputTypeSqs: handlePublisherOutputTypeSqs,
}
func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error) {
if !config.IsSet(ConfigKeyMdlSubPublishers) {
return false, nil
}
publishers := config.GetStringMap(ConfigKeyMdlSubPublishers)
for name := range publishers {
publisherKey := getPublisherConfigKey(name)
publisherSettings := readPublisherSetting(config, name)
producerName := fmt.Sprintf("publisher-%s", name)
outputName := fmt.Sprintf("publisher-%s", name)
if len(publisherSettings.Producer) != 0 {
producerName = publisherSettings.Producer
} else {
publisherSettings.Producer = producerName
}
producerSettings := &stream.ProducerSettings{}
config.UnmarshalDefaults(producerSettings)
producerSettings.Output = outputName
producerSettings.Daemon.MessageAttributes[AttributeModelId] = publisherSettings.ModelId.String()
var ok bool
var outputTypeHandler publisherOutputTypeHandler
if outputTypeHandler, ok = publisherOutputTypeHandlers[publisherSettings.OutputType]; !ok {
return false, fmt.Errorf("no publisherOutputTypeHandler found for publisher %s and output type %s", publisherSettings.Name, publisherSettings.OutputType)
}
outputSettings := outputTypeHandler(config, publisherSettings, producerSettings)
producerKey := stream.ConfigurableProducerKey(producerName)
outputKey := stream.ConfigurableOutputKey(outputName)
configOptions := []cfg.Option{
cfg.WithConfigSetting(producerKey, producerSettings, cfg.SkipExisting),
cfg.WithConfigSetting(outputKey, outputSettings, cfg.SkipExisting),
cfg.WithConfigSetting(publisherKey, publisherSettings),
}
if err := config.Option(configOptions...); err != nil {
return false, fmt.Errorf("can not apply config settings for publisher %s: %w", publisherSettings.Name, err)
}
}
return true, nil
}
func handlePublisherOutputTypeInMemory(config cfg.Config, _ *PublisherSettings, _ *stream.ProducerSettings) stream.BaseOutputConfigurationAware {
outputSettings := &stream.InMemoryOutputConfiguration{}
config.UnmarshalDefaults(outputSettings)
return outputSettings
}
func handlePublisherOutputTypeKinesis(config cfg.Config, publisherSettings *PublisherSettings, producerSettings *stream.ProducerSettings) stream.BaseOutputConfigurationAware {
producerSettings.Daemon.Enabled = true
producerSettings.Daemon.Interval = time.Second
// kinesis batches have a max size of 5mb. we're using 4.5mb to give it some headroom
producerSettings.Daemon.BatchMaxSize = 4_500_000
// kinesis can handle up to 500 records per put records call
producerSettings.Daemon.BatchSize = 500
// kinesis limit for 1 record in size is 1mb, so we limit it to 950kb to give it some headroom
producerSettings.Daemon.AggregationMaxSize = 950_000
outputSettings := &stream.KinesisOutputConfiguration{}
config.UnmarshalDefaults(outputSettings)
outputSettings.Project = publisherSettings.Project
outputSettings.Family = publisherSettings.Family
outputSettings.Group = publisherSettings.Group
outputSettings.Application = publisherSettings.Application
outputSettings.StreamName = publisherSettings.Name
outputSettings.Tracing.Enabled = false
return outputSettings
}
func handlePublisherOutputTypeSns(config cfg.Config, publisherSettings *PublisherSettings, _ *stream.ProducerSettings) stream.BaseOutputConfigurationAware {
outputSettings := &stream.SnsOutputConfiguration{}
config.UnmarshalDefaults(outputSettings)
outputSettings.Project = publisherSettings.Project
outputSettings.Family = publisherSettings.Family
outputSettings.Group = publisherSettings.Group
outputSettings.Application = publisherSettings.Application
outputSettings.TopicId = publisherSettings.Name
if publisherSettings.Shared {
outputSettings.TopicId = sharedName
}
return outputSettings
}
func handlePublisherOutputTypeSqs(config cfg.Config, publisherSettings *PublisherSettings, _ *stream.ProducerSettings) stream.BaseOutputConfigurationAware {
outputSettings := &stream.SqsOutputConfiguration{}
config.UnmarshalDefaults(outputSettings)
outputSettings.Project = publisherSettings.Project
outputSettings.Family = publisherSettings.Family
outputSettings.Group = publisherSettings.Group
outputSettings.Application = publisherSettings.Application
outputSettings.QueueId = publisherSettings.Name
if publisherSettings.Shared {
outputSettings.QueueId = sharedName
}
return outputSettings
}
func getPublisherConfigKey(name string) string {
return fmt.Sprintf("%s.%s", ConfigKeyMdlSubPublishers, name)
}
func readPublisherSetting(config cfg.Config, name string) *PublisherSettings {
publisherKey := getPublisherConfigKey(name)
settings := &PublisherSettings{}
config.UnmarshalKey(publisherKey, settings)
if settings.Name == "" {
settings.Name = name
}
return settings
}