forked from elastic/beats
/
eventreader_fsnotify.go
112 lines (96 loc) · 2.26 KB
/
eventreader_fsnotify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// +build linux freebsd openbsd netbsd windows darwin
package file
import (
"path/filepath"
"syscall"
"time"
"github.com/fsnotify/fsnotify"
"github.com/elastic/beats/libbeat/logp"
)
// NewEventReader creates a new EventReader backed by fsnotify.
func NewEventReader(c Config) (EventReader, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &reader{watcher, c, make(chan Event, 1), make(chan error, 1)}, nil
}
type reader struct {
watcher *fsnotify.Watcher
config Config
outC chan Event
errC chan error
}
func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
for _, p := range r.config.Paths {
if err := r.watcher.Add(p); err != nil {
if err == syscall.EMFILE {
logp.Warn("%v Failed to watch %v: %v (check the max number of "+
"open files allowed with 'ulimit -a')", logPrefix, p, err)
} else {
logp.Warn("%v Failed to watch %v: %v", logPrefix, p, err)
}
}
}
go func() {
defer close(r.outC)
defer r.watcher.Close()
for {
select {
case event := <-r.watcher.Events:
if event.Name == "" {
continue
}
r.outC <- convertToFileEvent(event, r.config.MaxFileSizeBytes, r.config.HashTypes)
case err := <-r.watcher.Errors:
r.errC <- err
}
}
}()
return r.outC, nil
}
func convertToFileEvent(e fsnotify.Event, maxFileSize uint64, hashTypes []string) Event {
event := Event{
Timestamp: time.Now().UTC(),
Path: e.Name,
Action: opToAction(e.Op).String(),
}
var err error
event.Info, err = Stat(event.Path)
if err != nil {
event.errors = append(event.errors, err)
}
if event.Info == nil {
return event
}
switch event.Info.Type {
case "file":
if uint64(event.Info.Size) <= maxFileSize {
hashes, err := hashFile(event.Path, hashTypes...)
if err != nil {
event.errors = append(event.errors, err)
} else {
event.Hashes = hashes
}
}
case "symlink":
event.TargetPath, _ = filepath.EvalSymlinks(event.Path)
}
return event
}
func opToAction(op fsnotify.Op) Action {
switch op {
case fsnotify.Create:
return Created
case fsnotify.Write:
return Updated
case fsnotify.Remove:
return Deleted
case fsnotify.Rename:
return Moved
case fsnotify.Chmod:
return AttributesModified
default:
return Unknown
}
}