forked from ruflin/filebeat
/
registrar.go
216 lines (172 loc) · 5.77 KB
/
registrar.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package crawler
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
cfg "github.com/elastic/filebeat/config"
"github.com/elastic/filebeat/input"
. "github.com/elastic/filebeat/input"
"github.com/elastic/libbeat/logp"
)
type Registrar struct {
// Path to the Registry File
registryFile string
// Map with all file paths inside and the corresponding state
State map[string]*FileState
// Channel used by the prospector and crawler to send FileStates to be persisted
Persist chan *input.FileState
running bool
Channel chan []*FileEvent
done chan struct{}
}
func NewRegistrar(registryFile string) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
}
err := r.Init()
return r, err
}
func (r *Registrar) Init() error {
// Init state
r.Persist = make(chan *FileState)
r.State = make(map[string]*FileState)
r.Channel = make(chan []*FileEvent, 1)
// Set to default in case it is not set
if r.registryFile == "" {
r.registryFile = cfg.DefaultRegistryFile
}
// Make sure the directory where we store the registryFile exists
absPath, err := filepath.Abs(r.registryFile)
if err != nil {
return fmt.Errorf("Failed to get the absolute path of %s: %v",
r.registryFile, err)
}
r.registryFile = absPath
// Create directory if it does not already exist.
registryPath := filepath.Dir(r.registryFile)
err = os.MkdirAll(registryPath, 0755)
if err != nil {
return fmt.Errorf("Failed to created registry file dir %s: %v",
registryPath, err)
}
logp.Info("Registry file set to: %s", r.registryFile)
return nil
}
// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is .filebeat file which is stored in the same path as the binary is running
func (r *Registrar) LoadState() {
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.State)
}
}
func (r *Registrar) Run() {
logp.Info("Starting Registrar")
r.running = true
// Writes registry on shutdown
defer r.writeRegistry()
for {
select {
case <-r.done:
logp.Info("Ending Registrar")
return
// Treats new log files to persist with higher priority then new events
case state := <-r.Persist:
r.State[*state.Source] = state
logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source)
case events := <-r.Channel:
r.processEvents(events)
}
if e := r.writeRegistry(); e != nil {
// REVU: but we should panic, or something, right?
logp.Err("Writing of registry returned error: %v. Continuing..", e)
}
}
}
func (r *Registrar) processEvents(events []*FileEvent) {
logp.Debug("registrar", "Processing %d events", len(events))
// Take the last event found for each file source
for _, event := range events {
if !r.running {
break
}
// skip stdin
if *event.Source == "-" {
continue
}
r.State[*event.Source] = event.GetState()
}
}
func (r *Registrar) Stop() {
logp.Info("Stopping Registrar")
r.running = false
close(r.done)
// Note: don't block using waitGroup, cause this method is run by async signal handler
}
func (r *Registrar) GetFileState(path string) (*FileState, bool) {
state, exist := r.State[path]
return state, exist
}
// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
logp.Debug("registrar", "Write registry file: %s", r.registryFile)
tempfile := r.registryFile + ".new"
file, e := os.Create(tempfile)
if e != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, e)
return e
}
encoder := json.NewEncoder(file)
encoder.Encode(r.State)
// Directly close file because of windows
file.Close()
logp.Info("Registry file updated. %d states written.", len(r.State))
return SafeFileRotate(r.registryFile, tempfile)
}
func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bool) {
// Check if there is a state for this file
lastState, isFound := r.GetFileState(filePath)
if isFound && input.IsSameFile(filePath, fileInfo) {
logp.Debug("registar", "Same file as before found. Fetch the state and persist it.")
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
r.Persist <- lastState
return lastState.Offset, true
}
if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil {
// File has rotated between shutdown and startup
// We return last state downstream, with a modified event source with the new file name
// And return the offset - also force harvest in case the file is old and we're about to skip it
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)
lastState, _ := r.GetFileState(previous)
lastState.Source = &filePath
r.Persist <- lastState
return lastState.Offset, true
}
if isFound {
logp.Info("Not resuming rotated file: %s", filePath)
}
// New file so just start from an automatic position
return 0, false
}
// getPreviousFile checks in the registrar if there is the newFile already exist with a different name
// In case an old file is found, the path to the file is returned, if not, an error is returned
func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) (string, error) {
newState := input.GetOSFileState(&newFileInfo)
for oldFilePath, oldState := range r.State {
// Skipping when path the same
if oldFilePath == newFilePath {
continue
}
// Compare states
if newState.IsSame(oldState.FileStateOS) {
logp.Info("Old file with new name found: %s is no %s", oldFilePath, newFilePath)
return oldFilePath, nil
}
}
return "", fmt.Errorf("No previous file found")
}