-
Notifications
You must be signed in to change notification settings - Fork 50
/
output_configurable_multiple.go
91 lines (71 loc) · 2.3 KB
/
output_configurable_multiple.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package stream
import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
"github.com/justtrackio/gosoline/pkg/cfg"
"github.com/justtrackio/gosoline/pkg/log"
)
type multiOutput struct {
outputs []Output
}
func (m *multiOutput) WriteOne(ctx context.Context, msg WritableMessage) error {
err := &multierror.Error{}
for _, output := range m.outputs {
err = multierror.Append(err, output.WriteOne(ctx, msg))
}
return err.ErrorOrNil()
}
func (m *multiOutput) Write(ctx context.Context, batch []WritableMessage) error {
err := &multierror.Error{}
for _, output := range m.outputs {
err = multierror.Append(err, output.Write(ctx, batch))
}
return err.ErrorOrNil()
}
func (m *multiOutput) IsPartitionedOutput() bool {
for _, o := range m.outputs {
if po, ok := o.(PartitionedOutput); ok && po.IsPartitionedOutput() {
return true
}
}
return false
}
func (m *multiOutput) GetMaxMessageSize() (maxMessageSize *int) {
for _, o := range m.outputs {
if sro, ok := o.(SizeRestrictedOutput); ok {
outputMaxMessageSize := sro.GetMaxMessageSize()
if (maxMessageSize == nil && outputMaxMessageSize != nil) || (maxMessageSize != nil && outputMaxMessageSize != nil && *maxMessageSize > *outputMaxMessageSize) {
maxMessageSize = outputMaxMessageSize
}
}
}
return
}
func (m *multiOutput) GetMaxBatchSize() (maxBatchSize *int) {
for _, o := range m.outputs {
if sro, ok := o.(SizeRestrictedOutput); ok {
outputMaxBatchSize := sro.GetMaxBatchSize()
if (maxBatchSize == nil && outputMaxBatchSize != nil) || (maxBatchSize != nil && outputMaxBatchSize != nil && *maxBatchSize > *outputMaxBatchSize) {
maxBatchSize = outputMaxBatchSize
}
}
}
return
}
func NewConfigurableMultiOutput(ctx context.Context, config cfg.Config, logger log.Logger, base string) (Output, error) {
key := fmt.Sprintf("%s.types", ConfigurableOutputKey(base))
ts := config.Get(key).(map[string]interface{})
multiOutput := &multiOutput{
outputs: make([]Output, 0),
}
for outputName := range ts {
name := fmt.Sprintf("%s.types.%s", base, outputName)
if output, err := NewConfigurableOutput(ctx, config, logger, name); err != nil {
return nil, fmt.Errorf("can not create multi output %s: %w", base, err)
} else {
multiOutput.outputs = append(multiOutput.outputs, output)
}
}
return multiOutput, nil
}