forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multiline.go
290 lines (242 loc) · 6.39 KB
/
multiline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package processor
import (
"errors"
"fmt"
"regexp"
"time"
"github.com/elastic/beats/filebeat/config"
)
// MultiLine processor combining multiple line events into one multi-line event.
//
// Lines to be combined are matched by some configurable predicate using
// regular expression.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline processor to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type MultiLine struct {
reader LineProcessor
pred matcher
maxBytes int // bytes stored in content
maxLines int
ts time.Time
content []byte
last []byte
readBytes int // bytes as read from input source
numLines int
err error // last seen error
state func(*MultiLine) (Line, error)
}
const (
// Default maximum number of lines to return in one multi-line event
defaultMaxLines = 500
// Default timeout to finish a multi-line event.
defaultMultilineTimeout = 5 * time.Second
)
// Matcher represents the predicate comparing any two lines
// to find start and end of multiline events in stream of line events.
type matcher func(last, current []byte) bool
var (
errMultilineTimeout = errors.New("multline timeout")
)
// NewMultiline creates a new multi-line processor combining stream of
// line events into stream of multi-line events.
func NewMultiline(
r LineProcessor,
maxBytes int,
config *config.MultilineConfig,
) (*MultiLine, error) {
type matcherFactory func(pattern string) (matcher, error)
types := map[string]matcherFactory{
"before": beforeMatcher,
"after": afterMatcher,
}
matcherType, ok := types[config.Match]
if !ok {
return nil, fmt.Errorf("unknown matcher type: %s", config.Match)
}
matcher, err := matcherType(config.Pattern)
if err != nil {
return nil, err
}
if config.Negate {
matcher = negatedMatcher(matcher)
}
maxLines := defaultMaxLines
if config.MaxLines != nil {
maxLines = *config.MaxLines
}
timeout := defaultMultilineTimeout
if config.Timeout != "" {
timeout, err = time.ParseDuration(config.Timeout)
if err != nil {
return nil, fmt.Errorf("failed to parse duration '%s': %v", config.Timeout, err)
}
if timeout < 0 {
return nil, fmt.Errorf("timeout %v must not be negative", config.Timeout)
}
}
if timeout > 0 {
r = newTimeoutProcessor(r, errMultilineTimeout, timeout)
}
mlr := &MultiLine{
reader: r,
pred: matcher,
state: (*MultiLine).readFirst,
maxBytes: maxBytes,
maxLines: maxLines,
}
return mlr, nil
}
// Next returns next multi-line event.
func (mlr *MultiLine) Next() (Line, error) {
return mlr.state(mlr)
}
func (mlr *MultiLine) readFirst() (Line, error) {
for {
l, err := mlr.reader.Next()
if err == nil {
if l.Bytes == 0 {
continue
}
mlr.startNewLine(l)
mlr.state = (*MultiLine).readNext
return mlr.readNext()
}
// no lines buffered -> ignore timeout
if err == errMultilineTimeout {
continue
}
// something is wrong here
return l, err
}
}
func (mlr *MultiLine) readNext() (Line, error) {
for {
l, err := mlr.reader.Next()
if err != nil {
// handle multiline timeout signal
if err == errMultilineTimeout {
// no lines buffered -> ignore timeout
if mlr.numLines == 0 {
continue
}
// return collected multiline event and
// empty buffer for new multiline event
l := mlr.pushLine()
mlr.reset()
return l, nil
}
// handle error without any bytes returned from reader
if l.Bytes == 0 {
// no lines buffered -> return error
if mlr.numLines == 0 {
return Line{}, err
}
// lines buffered, return multiline and error on next read
l := mlr.pushLine()
mlr.err = err
mlr.state = (*MultiLine).readFailed
return l, nil
}
// handle error with some content being returned by reader and
// line matching multiline criteria or no multiline started yet
if mlr.readBytes == 0 || mlr.pred(mlr.last, l.Content) {
mlr.addLine(l)
// return multiline and error on next read
l := mlr.pushLine()
mlr.err = err
mlr.state = (*MultiLine).readFailed
return l, nil
}
// no match, return current multline and retry with current line on next
// call to readNext awaiting the error being reproduced (or resolved)
// in next call to Next
l := mlr.startNewLine(l)
return l, nil
}
// if predicate does not match current multiline -> return multiline event
if mlr.readBytes > 0 && !mlr.pred(mlr.last, l.Content) {
l := mlr.startNewLine(l)
return l, nil
}
// add line to current multiline event
mlr.addLine(l)
}
}
func (mlr *MultiLine) readFailed() (Line, error) {
// return error and reset line reader
err := mlr.err
mlr.err = nil
mlr.reset()
return Line{}, err
}
func (mlr *MultiLine) startNewLine(l Line) Line {
retLine := mlr.pushLine()
mlr.addLine(l)
mlr.ts = l.Ts
return retLine
}
func (mlr *MultiLine) pushLine() Line {
content := mlr.content
sz := mlr.readBytes
mlr.content = nil
mlr.last = nil
mlr.readBytes = 0
mlr.numLines = 0
mlr.err = nil
return Line{Ts: mlr.ts, Content: content, Bytes: sz}
}
func (mlr *MultiLine) addLine(l Line) {
if l.Bytes <= 0 {
return
}
space := mlr.maxBytes - len(mlr.content)
spaceLeft := (mlr.maxBytes <= 0 || space > 0) &&
(mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines)
if spaceLeft {
if space < 0 || space > len(l.Content) {
space = len(l.Content)
}
mlr.content = append(mlr.content, l.Content[:space]...)
mlr.numLines++
}
mlr.last = l.Content
mlr.readBytes += l.Bytes
}
func (mlr *MultiLine) reset() {
mlr.state = (*MultiLine).readFirst
}
// matchers
func afterMatcher(pattern string) (matcher, error) {
return genPatternMatcher(pattern, func(last, current []byte) []byte {
return current
})
}
func beforeMatcher(pattern string) (matcher, error) {
return genPatternMatcher(pattern, func(last, current []byte) []byte {
return last
})
}
func negatedMatcher(m matcher) matcher {
return func(last, current []byte) bool {
return !m(last, current)
}
}
func genPatternMatcher(
pattern string,
sel func(last, current []byte) []byte,
) (matcher, error) {
reg, err := regexp.CompilePOSIX(pattern)
if err != nil {
return nil, err
}
matcher := func(last, current []byte) bool {
line := sel(last, current)
return reg.Match(line)
}
return matcher, nil
}