forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
harvester.go
84 lines (70 loc) · 2.15 KB
/
harvester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
The harvester package harvest different inputs for new information. Currently
two harvester types exist:
* log
* stdin
The log harvester reads a file line by line. In case the end of a file is found
with an incomplete line, the line pointer stays at the beginning of the incomplete
line. As soon as the line is completed, it is read and returned.
The stdin harvesters reads data from stdin.
*/
package harvester
import (
"errors"
"fmt"
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
)
var (
ErrFileTruncate = errors.New("detected file being truncated")
ErrRenamed = errors.New("file was renamed")
ErrRemoved = errors.New("file was removed")
ErrInactive = errors.New("file inactive")
ErrClosed = errors.New("reader closed")
)
type Harvester struct {
config harvesterConfig
state file.State
prospectorChan chan *input.Event
file source.FileSource /* the file being watched */
done chan struct{}
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
}
func NewHarvester(
cfg *common.Config,
state file.State,
prospectorChan chan *input.Event,
done chan struct{},
) (*Harvester, error) {
h := &Harvester{
config: defaultConfig,
state: state,
prospectorChan: prospectorChan,
done: done,
}
if err := cfg.Unpack(&h.config); err != nil {
return nil, err
}
encodingFactory, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encodingFactory == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encodingFactory = encodingFactory
return h, nil
}
// open does open the file given under h.Path and assigns the file handler to h.file
func (h *Harvester) open() error {
switch h.config.InputType {
case config.StdinInputType:
return h.openStdin()
case config.LogInputType:
return h.openFile()
default:
return fmt.Errorf("Invalid input type")
}
}