Skip to content

Commit

Permalink
Backport of elastic#1375
Browse files Browse the repository at this point in the history
A direct commit backport was not possible as too many things changed in the prospector and harvester
  • Loading branch information
ruflin committed Apr 19, 2016
1 parent 4bca562 commit 1baadc2
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 106 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ https://github.com/elastic/beats/compare/v1.2.0...v1.2.1[View commits]
- Fixed name of the setting `stats.proc` to `stats.process` in the default configuration file. {pull}1343[1343]
- Fix issue with cpu.system_p being greater than 1 on Windows {pull}1128[1128]
*Filebeat*
- Improvements in registrar dealing with file rotation. {pull}1281[1281]
==== Added
*Topbeat*
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beat/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func makeEvents(name string, n int) []*input.FileEvent {
DocumentType: "log",
Bytes: 100,
Offset: int64(i),
Source: &name,
Source: name,
}
events = append(events, event)
}
Expand Down
8 changes: 4 additions & 4 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i
logp.Debug("prospector", "Waiting for %d prospectors to initialise", pendingProspectorCnt)

for event := range crawler.Registrar.Persist {
if event.Source == nil {
if event.Source == "" {

pendingProspectorCnt--
if pendingProspectorCnt == 0 {
Expand All @@ -67,15 +67,15 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i
}
continue
}
crawler.Registrar.State[*event.Source] = event
logp.Debug("prospector", "Registrar will re-save state for %s", *event.Source)
crawler.Registrar.state[event.Source] = event
logp.Debug("prospector", "Registrar will re-save state for %s", event.Source)

if !crawler.running {
break
}
}

logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State))
logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.getStateCopy()))
}

func (crawler *Crawler) Stop() {
Expand Down
16 changes: 11 additions & 5 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (p *Prospector) logRun(spoolChan chan *input.FileEvent) {

// This signals we finished considering the previous state
event := &input.FileState{
Source: nil,
Source: "",
}
p.registrar.Persist <- event

Expand Down Expand Up @@ -214,7 +214,7 @@ func (p *Prospector) stdinRun(spoolChan chan *input.FileEvent) {

// This signals we finished considering the previous state
event := &input.FileState{
Source: nil,
Source: "",
}
p.registrar.Persist <- event

Expand Down Expand Up @@ -344,7 +344,7 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
if resuming {
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file)

h.Offset = offset
h.SetOffset(offset)
h.Start()
} else {
// Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up
Expand All @@ -354,11 +354,13 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
file)
newinfo.Skip(newinfo.Fileinfo.Size())
}
p.registrar.Persist <- h.GetState()
} else if previousFile, err := p.getPreviousFile(file, newinfo.Fileinfo); err == nil {
// This file was simply renamed (known inode+dev) - link the same harvester channel as the old file
logp.Debug("prospector", "File rename was detected: %s -> %s", previousFile, file)
lastinfo := p.prospectorList[previousFile]
newinfo.Continue(&lastinfo)
p.registrar.Persist <- h.GetState()
} else {

// Are we resuming a file or is this a completely new file?
Expand All @@ -369,8 +371,9 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
}

// Launch the harvester
h.Offset = offset
h.SetOffset(offset)
h.Start()
p.registrar.Persist <- h.GetState()
}
}

Expand Down Expand Up @@ -402,6 +405,7 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp

lastinfo := p.prospectorList[previousFile]
newinfo.Continue(&lastinfo)
p.registrar.Persist <- h.GetState()
} else {
// File is not the same file we saw previously, it must have rotated and is a new file
logp.Debug("prospector", "Launching harvester on rotated file: %s", file)
Expand All @@ -411,6 +415,7 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp

// Start a new harvester on the path
h.Start()
p.registrar.Persist <- h.GetState()
}

// Keep the old file in missingFiles so we don't rescan it if it was renamed and we've not yet reached the new filename
Expand All @@ -423,8 +428,9 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp

// Start a harvester on the path; an old file was just modified and it doesn't have a harvester
// The offset to continue from will be stored in the harvester channel - so take that to use and also clear the channel
h.Offset = <-newinfo.Return
h.SetOffset(<-newinfo.Return)
h.Start()
p.registrar.Persist <- h.GetState()
} else {
logp.Debug("prospector", "Not harvesting, file didn't change: %s", file)
}
Expand Down
61 changes: 48 additions & 13 deletions filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -16,7 +17,8 @@ type Registrar struct {
// Path to the Registry File
registryFile string
// Map with all file paths inside and the corresponding state
State map[string]*FileState
state map[string]*FileState
stateMutex sync.Mutex
// Channel used by the prospector and crawler to send FileStates to be persisted
Persist chan *input.FileState
running bool
Expand All @@ -39,7 +41,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) {
func (r *Registrar) Init() error {
// Init state
r.Persist = make(chan *FileState)
r.State = make(map[string]*FileState)
r.state = map[string]*FileState{}
r.Channel = make(chan []*FileEvent, 1)

// Set to default in case it is not set
Expand Down Expand Up @@ -71,11 +73,13 @@ func (r *Registrar) Init() error {
// loadState fetches the previous reading state from the configure RegistryFile file
// The default file is .filebeat file which is stored in the same path as the binary is running
func (r *Registrar) LoadState() {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
logp.Info("Loading registrar data from %s", r.registryFile)
decoder := json.NewDecoder(existing)
decoder.Decode(&r.State)
decoder.Decode(&r.state)
}
}

Expand All @@ -94,8 +98,9 @@ func (r *Registrar) Run() {
return
// Treats new log files to persist with higher priority then new events
case state := <-r.Persist:
r.State[*state.Source] = state
logp.Debug("prospector", "Registrar will re-save state for %s", *state.Source)
source := state.Source
r.setState(source, state)
logp.Debug("prospector", "Registrar will re-save state for %s", source)
case events := <-r.Channel:
r.processEvents(events)
}
Expand All @@ -121,7 +126,7 @@ func (r *Registrar) processEvents(events []*FileEvent) {
continue
}

r.State[*event.Source] = event.GetState()
r.setState(event.Source, event.GetState())
}
}

Expand All @@ -133,7 +138,7 @@ func (r *Registrar) Stop() {
}

func (r *Registrar) GetFileState(path string) (*FileState, bool) {
state, exist := r.State[path]
state, exist := r.getState(path)
return state, exist
}

Expand All @@ -149,12 +154,13 @@ func (r *Registrar) writeRegistry() error {
}

encoder := json.NewEncoder(file)
encoder.Encode(r.State)
state := r.getStateCopy()
encoder.Encode(state)

// Directly close file because of windows
file.Close()

logp.Info("Registry file updated. %d states written.", len(r.State))
logp.Info("Registry file updated. %d states written.", len(state))

return SafeFileRotate(r.registryFile, tempfile)
}
Expand All @@ -168,7 +174,6 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
logp.Debug("registar", "Same file as before found. Fetch the state and persist it.")
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
r.Persist <- lastState
return lastState.Offset, true
}

Expand All @@ -179,8 +184,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath)

lastState, _ := r.GetFileState(previous)
lastState.Source = &filePath
r.Persist <- lastState
r.updateStateSource(lastState, filePath)
return lastState.Offset, true
}

Expand All @@ -198,7 +202,7 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

newState := input.GetOSFileState(&newFileInfo)

for oldFilePath, oldState := range r.State {
for oldFilePath, oldState := range r.getStateCopy() {

// Skipping when path the same
if oldFilePath == newFilePath {
Expand All @@ -214,3 +218,34 @@ func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo)

return "", fmt.Errorf("No previous file found")
}

func (r *Registrar) setState(path string, state *FileState) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
r.state[path] = state
}

func (r *Registrar) getState(path string) (*FileState, bool) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
state, exist := r.state[path]
return state, exist
}

func (r *Registrar) updateStateSource(state *FileState, path string) {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()
state.Source = path
}

func (r *Registrar) getStateCopy() map[string]FileState {
r.stateMutex.Lock()
defer r.stateMutex.Unlock()

copy := make(map[string]FileState)
for k, v := range r.state {
copy[k] = *v
}

return copy
}
4 changes: 3 additions & 1 deletion filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"os"
"regexp"
"sync"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
Expand All @@ -27,7 +28,8 @@ type Harvester struct {
Path string /* the file path to harvest */
ProspectorConfig config.ProspectorConfig
Config *config.HarvesterConfig
Offset int64
offset int64
offsetLock sync.Mutex
Stat *FileStat
SpoolerChan chan *input.FileEvent
encoding encoding.EncodingFactory
Expand Down
2 changes: 1 addition & 1 deletion filebeat/harvester/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestExampleTest(t *testing.T) {

h := Harvester{
Path: "/var/log/",
Offset: 0,
offset: 0,
}

assert.Equal(t, "/var/log/", h.Path)
Expand Down
Loading

0 comments on commit 1baadc2

Please sign in to comment.