-
Notifications
You must be signed in to change notification settings - Fork 65
/
rename.go
112 lines (92 loc) · 2.29 KB
/
rename.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package rename
import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
)
/*{ introduction
It renames the fields of the event. You can provide an unlimited number of config parameters. Each parameter handled as `cfg.FieldSelector`:`string`.
When `override` is set to `false`, the field won't be renamed in the case of field name collision.
Sequence of rename operations isn't guaranteed. Use different actions for prioritization.
**Example:**
```yaml
pipelines:
example_pipeline:
...
actions:
- type: rename
override: false
my_object.field.subfield: new_sub_field
...
```
The resulting event could look like:
```yaml
{
"my_object": {
"field": {
"new_sub_field":"value"
}
},
```
}*/
const (
overrideKey = "override"
)
type Plugin struct {
paths [][]string
names []string
preserveFields bool
}
func init() {
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
Type: "rename",
Factory: factory,
})
}
func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}
func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
sharedConfig := *config.(*Config)
conf := sharedConfig.Clone() // clone shared config to be able to modify it
val, idx := conf.Find(overrideKey)
p.preserveFields = idx == NotFoundIdx || val == "false"
conf.Remove(overrideKey)
conf = unescapeMap(conf)
conf.ForEach(func(path string, name string) {
selector := cfg.ParseFieldSelector(path)
p.paths = append(p.paths, selector)
p.names = append(p.names, name)
})
}
func (p *Plugin) Stop() {
}
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
for index, path := range p.paths {
if p.preserveFields {
if event.Root.Dig(p.names[index]) != nil {
continue
}
}
node := event.Root.Dig(path...)
if node == nil {
continue
}
node.Suicide()
event.Root.AddFieldNoAlloc(event.Root, p.names[index]).MutateToNode(node)
}
return pipeline.ActionPass
}
func unescapeMap(fields Config) Config {
newConfig := make(Config, 0, len(fields))
fields.ForEach(func(key string, value string) {
if key == "" {
return
}
if key[0] == '_' {
key = key[1:]
}
newConfig.Append(key, value)
})
return newConfig
}