forked from monicasarbu/topbeat
/
preprocess.go
188 lines (161 loc) · 4.61 KB
/
preprocess.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package publisher
import (
"errors"
"fmt"
"github.com/elastic/libbeat/common"
"github.com/elastic/libbeat/logp"
"github.com/elastic/libbeat/outputs"
)
type preprocessor struct {
handler messageHandler
pub *PublisherType
}
func newPreprocessor(p *PublisherType, h messageHandler) *preprocessor {
return &preprocessor{
handler: h,
pub: p,
}
}
func (p *preprocessor) onStop() { p.handler.onStop() }
func (p *preprocessor) onMessage(m message) {
publisher := p.pub
single := false
events := m.events
if m.event != nil {
single = true
events = []common.MapStr{m.event}
}
var ignore []int // indices of events to be removed from events
debug("preprocessor")
for i, event := range events {
// validate some required field
if err := filterEvent(event); err != nil {
logp.Err("Publishing event failed: %v", err)
ignore = append(ignore, i)
continue
}
// update address and geo-ip information. Ignore event
// if address is invalid or event is found to be a duplicate
ok := updateEventAddresses(publisher, event)
if !ok {
ignore = append(ignore, i)
continue
}
// add additional meta data
event["shipper"] = publisher.name
if len(publisher.tags) > 0 {
event["tags"] = publisher.tags
}
if logp.IsDebug("publish") {
PrintPublishEvent(event)
}
}
// return if no event is left
if len(ignore) == len(events) {
debug("no event left, complete send")
outputs.SignalCompleted(m.context.signal)
return
}
// remove invalid events.
// TODO: is order important? Removal can be turned into O(len(ignore)) by
// copying last element into idx and doing
// events=events[:len(events)-len(ignore)] afterwards
// Alternatively filtering could be implemented like:
// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
for i := len(ignore) - 1; i >= 0; i-- {
idx := ignore[i]
debug("remove event[%v]", idx)
events = append(events[:idx], events[idx+1:]...)
}
if publisher.disabled {
debug("publisher disabled")
outputs.SignalCompleted(m.context.signal)
return
}
debug("preprocessor forward")
if single {
p.handler.onMessage(message{context: m.context, event: events[0]})
} else {
p.handler.onMessage(message{context: m.context, events: events})
}
}
// filterEvent validates an event for common required fields with types.
// If event is to be filtered out the reason is returned as error.
func filterEvent(event common.MapStr) error {
ts, ok := event["@timestamp"]
if !ok {
return errors.New("Missing '@timestamp' field from event")
}
_, ok = ts.(common.Time)
if !ok {
return errors.New("Invalid '@timestamp' field from event.")
}
err := event.EnsureCountField()
if err != nil {
return err
}
t, ok := event["type"]
if !ok {
return errors.New("Missing 'type' field from event.")
}
_, ok = t.(string)
if !ok {
return errors.New("Invalid 'type' field from event.")
}
return nil
}
func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool {
var srcServer, dstServer string
src, ok := event["src"].(*common.Endpoint)
if ok {
srcServer = publisher.GetServerName(src.Ip)
event["client_ip"] = src.Ip
event["client_port"] = src.Port
event["client_proc"] = src.Proc
event["client_server"] = srcServer
delete(event, "src")
}
dst, ok := event["dst"].(*common.Endpoint)
if ok {
dstServer = publisher.GetServerName(dst.Ip)
event["ip"] = dst.Ip
event["port"] = dst.Port
event["proc"] = dst.Proc
event["server"] = dstServer
delete(event, "dst")
//get the direction of the transaction: outgoing (as client)/incoming (as server)
if publisher.IsPublisherIP(dst.Ip) {
// outgoing transaction
event["direction"] = "out"
} else {
//incoming transaction
event["direction"] = "in"
}
}
if publisher.IgnoreOutgoing && dstServer != "" &&
dstServer != publisher.name {
// duplicated transaction -> ignore it
debug("Ignore duplicated transaction on %s: %s -> %s",
publisher.name, srcServer, dstServer)
return false
}
if publisher.GeoLite != nil {
realIP, exists := event["real_ip"]
if exists && len(realIP.(string)) > 0 {
loc := publisher.GeoLite.GetLocationByIP(realIP.(string))
if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 {
loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude)
event["client_location"] = loc
}
} else {
if len(srcServer) == 0 && src != nil { // only for external IP addresses
loc := publisher.GeoLite.GetLocationByIP(src.Ip)
if loc != nil && loc.Latitude != 0 && loc.Longitude != 0 {
loc := fmt.Sprintf("%f, %f", loc.Latitude, loc.Longitude)
event["client_location"] = loc
}
}
}
}
return true
}