forked from ahjdzx/libbeat
-
Notifications
You must be signed in to change notification settings - Fork 1
/
filters_runner.go
141 lines (123 loc) · 3.72 KB
/
filters_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package filters
import (
"fmt"
"github.com/johann8384/libbeat/common"
"github.com/johann8384/libbeat/logp"
)
// Executes the filters
type FilterRunner struct {
FiltersQueue chan common.MapStr
results chan common.MapStr
// The order in which the plugins are
// executed. A filter plugin can be loaded
// more than once.
order []FilterPlugin
}
// Goroutine that reads the objects from the FiltersQueue,
// executes all filters on them and writes the modified objects
// in the results channel.
func (runner *FilterRunner) Run() error {
for event := range runner.FiltersQueue {
for _, plugin := range runner.order {
var err error
event, err = plugin.Filter(event)
if err != nil {
logp.Err("Error executing filter %s: %v. Dropping event.", plugin, err)
break // drop event in case of errors
}
}
runner.results <- event
}
return nil
}
// Create a new FilterRunner
func NewFilterRunner(results chan common.MapStr, order []FilterPlugin) *FilterRunner {
runner := new(FilterRunner)
runner.results = results
runner.order = order
runner.FiltersQueue = make(chan common.MapStr, 1000)
return runner
}
// LoadConfiguredFilters interprets the [filters] configuration, loads the configured
// plugins and returns the order in which they need to be executed.
func LoadConfiguredFilters(config map[string]interface{}) ([]FilterPlugin, error) {
var err error
plugins := []FilterPlugin{}
filters_list, exists := config["filters"]
if !exists {
return plugins, nil
}
filters_iface, ok := filters_list.([]interface{})
if !ok {
return nil, fmt.Errorf("Expected the filters to be an array of strings")
}
for _, filter_iface := range filters_iface {
filter, ok := filter_iface.(string)
if !ok {
return nil, fmt.Errorf("Expected the filters array to only contain strings")
}
cfg, exists := config[filter]
var plugin_type Filter
var plugin_config map[string]interface{}
if !exists {
// Maybe default configuration by name
plugin_type, err = FilterFromName(filter)
if err != nil {
return nil, fmt.Errorf("No such filter type and no corresponding configuration: %s", filter)
}
} else {
logp.Debug("filters", "%v", cfg)
plugin_config, ok := cfg.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("Invalid configuration for: %s", filter)
}
type_str, ok := plugin_config["type"].(string)
if !ok {
return nil, fmt.Errorf("Couldn't get type for filter: %s", filter)
}
plugin_type, err = FilterFromName(type_str)
if err != nil {
return nil, fmt.Errorf("No such filter type: %s", type_str)
}
}
filter_plugin := Filters.Get(plugin_type)
if filter_plugin == nil {
return nil, fmt.Errorf("No plugin loaded for %s", plugin_type)
}
plugin, err := filter_plugin.New(filter, plugin_config)
if err != nil {
return nil, fmt.Errorf("Initializing filter plugin %s failed: %v",
plugin_type, err)
}
plugins = append(plugins, plugin)
}
return plugins, nil
}
func FiltersRun(config common.MapStr, plugins map[Filter]FilterPlugin,
next chan common.MapStr, stopCb func()) (input chan common.MapStr, err error) {
logp.Debug("filters", "Initializing filters plugins")
for filter, plugin := range plugins {
Filters.Register(filter, plugin)
}
filters_plugins, err :=
LoadConfiguredFilters(config)
if err != nil {
return nil, fmt.Errorf("Error loading filters plugins: %v", err)
}
logp.Debug("filters", "Filters plugins order: %v", filters_plugins)
if len(filters_plugins) > 0 {
runner := NewFilterRunner(next, filters_plugins)
go func() {
err := runner.Run()
if err != nil {
logp.Critical("Filters runner failed: %v", err)
// shutting down
stopCb()
}
}()
input = runner.FiltersQueue
} else {
input = next
}
return input, nil
}