Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Commit

Permalink
track_rotated
Browse files Browse the repository at this point in the history
  • Loading branch information
rockb1017 committed Jun 2, 2021
1 parent fa68892 commit dab0b7f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 49 deletions.
60 changes: 34 additions & 26 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ type InputOperator struct {

persister operator.Persister

knownFiles []*Reader
queuedMatches []string
knownFiles []*Reader
queuedMatches []string
maxBatchFiles int
lastPollReaders []*Reader

startAtBeginning bool

Expand Down Expand Up @@ -113,9 +115,10 @@ func (f *InputOperator) startPoller(ctx context.Context) {

// poll checks all the watched paths for new entries
func (f *InputOperator) poll(ctx context.Context) {
f.maxBatchFiles = f.MaxConcurrentFiles / 2
var matches []string
if len(f.queuedMatches) > f.MaxConcurrentFiles {
matches, f.queuedMatches = f.queuedMatches[:f.MaxConcurrentFiles], f.queuedMatches[f.MaxConcurrentFiles:]
if len(f.queuedMatches) > f.maxBatchFiles {
matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:]
} else {
if len(f.queuedMatches) > 0 {
matches, f.queuedMatches = f.queuedMatches, make([]string, 0)
Expand All @@ -130,16 +133,36 @@ func (f *InputOperator) poll(ctx context.Context) {
matches = getMatches(f.Include, f.Exclude)
if f.firstCheck && len(matches) == 0 {
f.Warnw("no files match the configured include patterns", "include", f.Include)
} else if len(matches) > f.MaxConcurrentFiles {
matches, f.queuedMatches = matches[:f.MaxConcurrentFiles], matches[f.MaxConcurrentFiles:]
} else if len(matches) > f.maxBatchFiles {
matches, f.queuedMatches = matches[:f.maxBatchFiles], matches[f.maxBatchFiles:]
}
}
}

readers := f.makeReaders(matches)
f.firstCheck = false

// Remove the same files that are both in f.lastPollReaders and readers
filteredReaders := make([]*Reader, 0, len(f.lastPollReaders))
OUTER:
for _, oldReader := range f.lastPollReaders {
for _, reader := range readers {
if reader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
}
filteredReaders = append(filteredReaders, oldReader)
}

var wg sync.WaitGroup
for _, reader := range filteredReaders {
wg.Add(1)
go func(r *Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
}(reader)
}

for _, reader := range readers {
wg.Add(1)
go func(r *Reader) {
Expand All @@ -148,14 +171,15 @@ func (f *InputOperator) poll(ctx context.Context) {
}(reader)
}

// Wait until all the reader goroutines are finished
wg.Wait()

// Close all files
for _, reader := range readers {
for _, reader := range f.lastPollReaders {
reader.Close()
}

f.lastPollReaders = readers

f.saveCurrent(readers)
f.syncLastPollFiles(ctx)
}
Expand Down Expand Up @@ -220,8 +244,7 @@ func (f *InputOperator) makeReaders(filesPaths []string) []*Reader {
fps = append(fps, fp)
}

// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
OUTER:
// Exclude any empty fingerprints
for i := 0; i < len(fps); {
fp := fps[i]
if len(fp.FirstBytes) == 0 {
Expand All @@ -230,22 +253,6 @@ OUTER:
fps = append(fps[:i], fps[i+1:]...)
files = append(files[:i], files[i+1:]...)
}

for j := 0; j < len(fps); j++ {
if i == j {
// Skip checking itself
continue
}

fp2 := fps[j]
if fp.StartsWith(fp2) || fp2.StartsWith(fp) {
// Exclude
files[i].Close()
fps = append(fps[:i], fps[i+1:]...)
files = append(files[:i], files[i+1:]...)
continue OUTER
}
}
i++
}

Expand Down Expand Up @@ -289,6 +296,7 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
return nil, err
}
newReader.Path = file.Name()
oldReader.Close()
return newReader, nil
}

Expand Down
25 changes: 4 additions & 21 deletions operator/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,11 @@ func TestFileBatching(t *testing.T) {

files := 100
linesPerFile := 10
maxConcurrentFiles := 10
maxConcurrentFiles := 20
maxBatchFiles := maxConcurrentFiles / 2

expectedBatches := files / maxConcurrentFiles // assumes no remainder
expectedLinesPerBatch := maxConcurrentFiles * linesPerFile
expectedBatches := files / maxBatchFiles // assumes no remainder
expectedLinesPerBatch := maxBatchFiles * linesPerFile

expectedMessages := make([]string, 0, files*linesPerFile)
actualMessages := make([]string, 0, files*linesPerFile)
Expand Down Expand Up @@ -469,24 +470,6 @@ func TestFileBatching(t *testing.T) {
}

require.ElementsMatch(t, expectedMessages, actualMessages)

// Write more logs to each file so we can validate that all files are still known
for i, temp := range temps {
for j := 0; j < linesPerFile; j++ {
message := fmt.Sprintf("%s %d %d", stringWithLength(20), i, j)
temp.WriteString(message + "\n")
expectedMessages = append(expectedMessages, message)
}
}

for b := 0; b < expectedBatches; b++ {
// poll once so we can validate that files were batched
operator.poll(context.Background())
actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...)
expectNoMessagesUntil(t, logReceived, 10*time.Millisecond)
}

require.ElementsMatch(t, expectedMessages, actualMessages)
}

func TestFileReader_FingerprintUpdated(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions operator/builtin/input/file/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error {

// ReadToEnd will read until the end of the file
func (f *Reader) ReadToEnd(ctx context.Context) {
defer f.file.Close()

if _, err := f.file.Seek(f.Offset, 0); err != nil {
f.Errorw("Failed to seek", zap.Error(err))
return
Expand Down
36 changes: 36 additions & 0 deletions operator/builtin/input/file/rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,42 @@ func TestMoveFile(t *testing.T) {
expectNoMessages(t, logReceived)
}

func TestTrackMovedAwayFiles(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Moving files while open is unsupported on Windows")
}
t.Parallel()
operator, logReceived, tempDir := newTestFileOperator(t, nil, nil)
operator.persister = testutil.NewMockPersister("test")

temp1 := openTemp(t, tempDir)
writeString(t, temp1, "testlog1\n")
temp1.Close()

operator.poll(context.Background())
defer operator.Stop()

waitForMessage(t, logReceived, "testlog1")

// Wait until all goroutines are finished before renaming
operator.wg.Wait()

newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/")
err := os.Mkdir(newDir, 0777)
require.NoError(t, err)
newFileName := fmt.Sprintf("%s%s", newDir, "newfile.log")

err = os.Rename(temp1.Name(), newFileName)
require.NoError(t, err)

movedFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
require.NoError(t, err)
writeString(t, movedFile, "testlog2\n")
operator.poll(context.Background())

waitForMessage(t, logReceived, "testlog2")
}

// TruncateThenWrite tests that, after a file has been truncated,
// any new writes are picked up
func TestTruncateThenWrite(t *testing.T) {
Expand Down

0 comments on commit dab0b7f

Please sign in to comment.