forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
factory.go
131 lines (109 loc) · 3.1 KB
/
factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package channel
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
type OutletFactory struct {
done <-chan struct{}
pipeline beat.Pipeline
eventer beat.ClientEventer
wgEvents eventCounter
}
type eventCounter interface {
Add(n int)
Done()
}
// clientEventer adjusts wgEvents if events are dropped during shutdown.
type clientEventer struct {
wgEvents eventCounter
}
// prospectorOutletConfig defines common prospector settings
// for the publisher pipline.
type prospectorOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`
// implicit event fields
Type string `config:"type"` // prospector.type
// hidden filebeat modules settings
Module string `config:"_module_name"` // hidden setting
Fileset string `config:"_fileset_name"` // hidden setting
// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
}
// NewOutletFactory creates a new outlet factory for
// connecting a prospector to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline beat.Pipeline,
wgEvents eventCounter,
) *OutletFactory {
o := &OutletFactory{
done: done,
pipeline: pipeline,
wgEvents: wgEvents,
}
if wgEvents != nil {
o.eventer = &clientEventer{wgEvents}
}
return o
}
// Create builds a new Outleter, while applying common prospector settings.
// Prospectors and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
config := prospectorOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
setMeta := func(to common.MapStr, key, value string) {
if value != "" {
to[key] = value
}
}
meta := common.MapStr{}
setMeta(meta, "pipeline", config.Pipeline)
fields := common.MapStr{}
setMeta(fields, "module", config.Module)
setMeta(fields, "name", config.Fileset)
if len(fields) > 0 {
fields = common.MapStr{
"fileset": fields,
}
}
if config.Type != "" {
fields["prospector"] = common.MapStr{
"type": config.Type,
}
}
client, err := f.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
})
if err != nil {
return nil, err
}
outlet := newOutlet(client, f.wgEvents)
if f.done != nil {
return CloseOnSignal(outlet, f.done), nil
}
return outlet, nil
}
func (*clientEventer) Closing() {}
func (*clientEventer) Closed() {}
func (*clientEventer) Published() {}
func (c *clientEventer) FilteredOut(_ beat.Event) {}
func (c *clientEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}