forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
109 lines (92 loc) · 2.88 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package processor
import (
"io"
"time"
"github.com/elastic/beats/filebeat/harvester/encoding"
)
// Line represents a line event with timestamp, content and actual number
// of bytes read from input before decoding.
type Line struct {
Ts time.Time // timestamp the line was read
Content []byte // actual line read
Bytes int // total number of bytes read to generate the line
}
// LineProcessor is the interface that wraps the basic Next method for
// getting a new line.
// Next returns the line being read or and error. EOF is returned
// if processor will not return any new lines on subsequent calls.
type LineProcessor interface {
Next() (Line, error)
}
// LineSource produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type LineSource struct {
reader *encoding.LineReader
}
// StripNewline processor removes the last trailing newline characters from
// read lines.
type StripNewline struct {
reader LineProcessor
}
// LimitProcessor sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type LimitProcessor struct {
reader LineProcessor
maxBytes int
}
// NewLineSource creates a new LineSource from input reader by applying
// the given codec.
func NewLineSource(
in io.Reader,
codec encoding.Encoding,
bufferSize int,
) (LineSource, error) {
r, err := encoding.NewLineReader(in, codec, bufferSize)
return LineSource{r}, err
}
// Next reads the next line from it's initial io.Reader
func (p LineSource) Next() (Line, error) {
c, sz, err := p.reader.Next()
return Line{Ts: time.Now(), Content: c, Bytes: sz}, err
}
// NewStripNewline creates a new line reader stripping the last tailing newline.
func NewStripNewline(r LineProcessor) *StripNewline {
return &StripNewline{r}
}
// Next returns the next line.
func (p *StripNewline) Next() (Line, error) {
line, err := p.reader.Next()
if err != nil {
return line, err
}
L := line.Content
line.Content = L[:len(L)-lineEndingChars(L)]
return line, err
}
// NewLimitProcessor creates a new processor limiting the line length.
func NewLimitProcessor(in LineProcessor, maxBytes int) *LimitProcessor {
return &LimitProcessor{reader: in, maxBytes: maxBytes}
}
// Next returns the next line.
func (p *LimitProcessor) Next() (Line, error) {
line, err := p.reader.Next()
if len(line.Content) > p.maxBytes {
line.Content = line.Content[:p.maxBytes]
}
return line, err
}
// isLine checks if the given byte array is a line, means has a line ending \n
func isLine(l []byte) bool {
return l != nil && len(l) > 0 && l[len(l)-1] == '\n'
}
// lineEndingChars returns the number of line ending chars the given by array has
// In case of Unix/Linux files, it is -1, in case of Windows mostly -2
func lineEndingChars(l []byte) int {
if !isLine(l) {
return 0
}
if len(l) > 1 && l[len(l)-2] == '\r' {
return 2
}
return 1
}