forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
config.go
123 lines (114 loc) · 4.91 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package processor
import (
yaml "gopkg.in/yaml.v3"
"github.com/nehal119/benthos-119/pkg/docs"
)
// Config is the all encompassing configuration struct for all processor types.
// Deprecated: Do not add new components here. Instead, use the public plugin
// APIs. Examples can be found in: ./internal/impl.
type Config struct {
Label string `json:"label" yaml:"label"`
Type string `json:"type" yaml:"type"`
Bloblang string `json:"bloblang" yaml:"bloblang"`
BoundsCheck BoundsCheckConfig `json:"bounds_check" yaml:"bounds_check"`
Branch BranchConfig `json:"branch" yaml:"branch"`
Cache CacheConfig `json:"cache" yaml:"cache"`
Catch []Config `json:"catch" yaml:"catch"`
Compress CompressConfig `json:"compress" yaml:"compress"`
Decompress DecompressConfig `json:"decompress" yaml:"decompress"`
Dedupe DedupeConfig `json:"dedupe" yaml:"dedupe"`
ForEach []Config `json:"for_each" yaml:"for_each"`
Grok GrokConfig `json:"grok" yaml:"grok"`
GroupBy GroupByConfig `json:"group_by" yaml:"group_by"`
GroupByValue GroupByValueConfig `json:"group_by_value" yaml:"group_by_value"`
InsertPart InsertPartConfig `json:"insert_part" yaml:"insert_part"`
JMESPath JMESPathConfig `json:"jmespath" yaml:"jmespath"`
JQ JQConfig `json:"jq" yaml:"jq"`
JSONSchema JSONSchemaConfig `json:"json_schema" yaml:"json_schema"`
Log LogConfig `json:"log" yaml:"log"`
Metric MetricConfig `json:"metric" yaml:"metric"`
Noop struct{} `json:"noop" yaml:"noop"`
Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"`
Parallel ParallelConfig `json:"parallel" yaml:"parallel"`
ParseLog ParseLogConfig `json:"parse_log" yaml:"parse_log"`
RateLimit RateLimitConfig `json:"rate_limit" yaml:"rate_limit"`
Resource string `json:"resource" yaml:"resource"`
SelectParts SelectPartsConfig `json:"select_parts" yaml:"select_parts"`
Sleep SleepConfig `json:"sleep" yaml:"sleep"`
Split SplitConfig `json:"split" yaml:"split"`
Subprocess SubprocessConfig `json:"subprocess" yaml:"subprocess"`
Switch SwitchConfig `json:"switch" yaml:"switch"`
SyncResponse struct{} `json:"sync_response" yaml:"sync_response"`
Try []Config `json:"try" yaml:"try"`
While WhileConfig `json:"while" yaml:"while"`
Workflow WorkflowConfig `json:"workflow" yaml:"workflow"`
XML XMLConfig `json:"xml" yaml:"xml"`
}
// NewConfig returns a configuration struct fully populated with default values.
// Deprecated: Do not add new components here. Instead, use the public plugin
// APIs. Examples can be found in: ./internal/impl.
func NewConfig() Config {
return Config{
Label: "",
Type: "bounds_check",
Bloblang: "",
BoundsCheck: NewBoundsCheckConfig(),
Branch: NewBranchConfig(),
Cache: NewCacheConfig(),
Catch: []Config{},
Compress: NewCompressConfig(),
Decompress: NewDecompressConfig(),
Dedupe: NewDedupeConfig(),
ForEach: []Config{},
Grok: NewGrokConfig(),
GroupBy: NewGroupByConfig(),
GroupByValue: NewGroupByValueConfig(),
InsertPart: NewInsertPartConfig(),
JMESPath: NewJMESPathConfig(),
JQ: NewJQConfig(),
JSONSchema: NewJSONSchemaConfig(),
Log: NewLogConfig(),
Metric: NewMetricConfig(),
Noop: struct{}{},
Plugin: nil,
Parallel: NewParallelConfig(),
ParseLog: NewParseLogConfig(),
RateLimit: NewRateLimitConfig(),
Resource: "",
SelectParts: NewSelectPartsConfig(),
Sleep: NewSleepConfig(),
Split: NewSplitConfig(),
Subprocess: NewSubprocessConfig(),
Switch: NewSwitchConfig(),
SyncResponse: struct{}{},
Try: []Config{},
While: NewWhileConfig(),
Workflow: NewWorkflowConfig(),
XML: NewXMLConfig(),
}
}
// UnmarshalYAML ensures that when parsing configs that are in a slice the
// default values are still applied.
func (conf *Config) UnmarshalYAML(value *yaml.Node) error {
type confAlias Config
aliased := confAlias(NewConfig())
err := value.Decode(&aliased)
if err != nil {
return docs.NewLintError(value.Line, docs.LintFailedRead, err)
}
var spec docs.ComponentSpec
if aliased.Type, spec, err = docs.GetInferenceCandidateFromYAML(docs.DeprecatedProvider, docs.TypeProcessor, value); err != nil {
return docs.NewLintError(value.Line, docs.LintComponentMissing, err)
}
if spec.Plugin {
pluginNode, err := docs.GetPluginConfigYAML(aliased.Type, value)
if err != nil {
return docs.NewLintError(value.Line, docs.LintFailedRead, err)
}
aliased.Plugin = &pluginNode
} else {
aliased.Plugin = nil
}
*conf = Config(aliased)
return nil
}