From 3b0da2bddc7dc9032824af7de4d4d0722feef24e Mon Sep 17 00:00:00 2001 From: Rock Baek Date: Tue, 8 Jun 2021 23:39:26 -0700 Subject: [PATCH 1/4] track_rotated, increase poll interval and timeout for unit tests --- docs/operators/file_input.md | 7 +- operator/builtin/input/file/file.go | 72 ++++++++++++-------- operator/builtin/input/file/file_test.go | 11 ++- operator/builtin/input/file/reader.go | 14 ++-- operator/builtin/input/file/rotation_test.go | 36 ++++++++++ operator/builtin/input/file/util_test.go | 10 +-- 6 files changed, 102 insertions(+), 48 deletions(-) diff --git a/docs/operators/file_input.md b/docs/operators/file_input.md index 7f15089d..7e237536 100644 --- a/docs/operators/file_input.md +++ b/docs/operators/file_input.md @@ -19,7 +19,7 @@ The `file_input` operator reads logs from files. It will place the lines read in | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` | | `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). | | `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory | -| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. One batch will be processed per `poll_interval`. | +| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | @@ -37,6 +37,11 @@ match either the beginning of a new log entry, or the end of a log entry. Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control. +### File rotation + +When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss. +To avoid the data loss, choose move/create rotation method and set `max_concurrent_files` higher than the twice of the number of files to tail. + ### Supported encodings | Key | Description diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 52688822..0056cd91 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -49,8 +49,10 @@ type InputOperator struct { persister operator.Persister - knownFiles []*Reader - queuedMatches []string + knownFiles []*Reader + queuedMatches []string + maxBatchFiles int + lastPollReaders []*Reader startAtBeginning bool @@ -85,6 +87,12 @@ func (f *InputOperator) Start(persister operator.Persister) error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() + for _, reader := range f.lastPollReaders { + reader.Close() + } + for _, reader := range f.knownFiles { + reader.Close() + } f.knownFiles = nil f.cancel = nil return nil @@ -113,9 +121,10 @@ func (f *InputOperator) startPoller(ctx context.Context) { // poll checks all the watched paths for new entries func (f *InputOperator) poll(ctx context.Context) { + f.maxBatchFiles = f.MaxConcurrentFiles / 2 var matches []string - if len(f.queuedMatches) > f.MaxConcurrentFiles { - matches, f.queuedMatches = f.queuedMatches[:f.MaxConcurrentFiles], f.queuedMatches[f.MaxConcurrentFiles:] + if len(f.queuedMatches) > f.maxBatchFiles { + matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:] } else { if len(f.queuedMatches) > 0 { matches, f.queuedMatches = f.queuedMatches, make([]string, 0) @@ -130,8 +139,8 @@ func (f *InputOperator) poll(ctx context.Context) { matches = getMatches(f.Include, f.Exclude) if f.firstCheck && len(matches) == 0 { f.Warnw("no files match the configured include patterns", "include", f.Include) - } else if len(matches) > f.MaxConcurrentFiles { - matches, f.queuedMatches = matches[:f.MaxConcurrentFiles], matches[f.MaxConcurrentFiles:] + } else if len(matches) > f.maxBatchFiles { + matches, f.queuedMatches = matches[:f.maxBatchFiles], matches[f.maxBatchFiles:] } } } @@ -139,7 +148,27 @@ func (f *InputOperator) poll(ctx context.Context) { readers := f.makeReaders(matches) f.firstCheck = false + // Detect files that have been rotated out of matching pattern + lostReaders := make([]*Reader, 0, len(f.lastPollReaders)) +OUTER: + for _, oldReader := range f.lastPollReaders { + for _, reader := range readers { + if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { + continue OUTER + } + } + lostReaders = append(lostReaders, oldReader) + } + var wg sync.WaitGroup + for _, reader := range lostReaders { + wg.Add(1) + go func(r *Reader) { + defer wg.Done() + r.ReadToEnd(ctx) + }(reader) + } + for _, reader := range readers { wg.Add(1) go func(r *Reader) { @@ -151,6 +180,13 @@ func (f *InputOperator) poll(ctx context.Context) { // Wait until all the reader goroutines are finished wg.Wait() + // Close all files + for _, reader := range f.lastPollReaders { + reader.Close() + } + + f.lastPollReaders = readers + f.saveCurrent(readers) f.syncLastPollFiles(ctx) } @@ -215,9 +251,8 @@ func (f *InputOperator) makeReaders(filesPaths []string) []*Reader { fps = append(fps, fp) } - // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files -OUTER: - for i := 0; i < len(fps); { + // Exclude any empty fingerprints + for i := 0; i < len(fps); i++ { fp := fps[i] if len(fp.FirstBytes) == 0 { if err := files[i].Close(); err != nil { @@ -227,25 +262,6 @@ OUTER: fps = append(fps[:i], fps[i+1:]...) files = append(files[:i], files[i+1:]...) } - - for j := 0; j < len(fps); j++ { - if i == j { - // Skip checking itself - continue - } - - fp2 := fps[j] - if fp.StartsWith(fp2) || fp2.StartsWith(fp) { - // Exclude - if err := files[i].Close(); err != nil { - f.Errorf("problem closing file", "file", files[i].Name()) - } - fps = append(fps[:i], fps[i+1:]...) - files = append(files[:i], files[i+1:]...) - continue OUTER - } - } - i++ } readers := make([]*Reader, 0, len(fps)) diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 663876b8..1ad8467f 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -429,10 +429,11 @@ func TestFileBatching(t *testing.T) { files := 100 linesPerFile := 10 - maxConcurrentFiles := 10 + maxConcurrentFiles := 20 + maxBatchFiles := maxConcurrentFiles / 2 - expectedBatches := files / maxConcurrentFiles // assumes no remainder - expectedLinesPerBatch := maxConcurrentFiles * linesPerFile + expectedBatches := files / maxBatchFiles // assumes no remainder + expectedLinesPerBatch := maxBatchFiles * linesPerFile expectedMessages := make([]string, 0, files*linesPerFile) actualMessages := make([]string, 0, files*linesPerFile) @@ -442,7 +443,7 @@ func TestFileBatching(t *testing.T) { cfg.MaxConcurrentFiles = maxConcurrentFiles }, func(out *testutil.FakeOutput) { - out.Received = make(chan *entry.Entry, expectedLinesPerBatch) + out.Received = make(chan *entry.Entry, expectedLinesPerBatch*2) }, ) operator.persister = testutil.NewMockPersister("test") @@ -465,7 +466,6 @@ func TestFileBatching(t *testing.T) { // poll once so we can validate that files were batched operator.poll(context.Background()) actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...) - expectNoMessagesUntil(t, logReceived, 10*time.Millisecond) } require.ElementsMatch(t, expectedMessages, actualMessages) @@ -483,7 +483,6 @@ func TestFileBatching(t *testing.T) { // poll once so we can validate that files were batched operator.poll(context.Background()) actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...) - expectNoMessagesUntil(t, logReceived, 10*time.Millisecond) } require.ElementsMatch(t, expectedMessages, actualMessages) diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index ac79bd5c..51603e17 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -84,12 +84,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error { // ReadToEnd will read until the end of the file func (f *Reader) ReadToEnd(ctx context.Context) { - defer func() { - if err := f.file.Close(); err != nil { - f.Errorw("Failed to close", zap.Error(err)) - } - }() - if _, err := f.file.Seek(f.Offset, 0); err != nil { f.Errorw("Failed to seek", zap.Error(err)) return @@ -122,8 +116,12 @@ func (f *Reader) ReadToEnd(ctx context.Context) { } // Close will close the file -func (f *Reader) Close() error { - return f.file.Close() +func (f *Reader) Close() { + if f.file != nil { + if err := f.file.Close(); err != nil { + f.Errorw("Problem closing reader", "Error", err.Error()) + } + } } // Emit creates an entry with the decoded message and sends it to the next diff --git a/operator/builtin/input/file/rotation_test.go b/operator/builtin/input/file/rotation_test.go index 72a43b28..2b4a76f6 100644 --- a/operator/builtin/input/file/rotation_test.go +++ b/operator/builtin/input/file/rotation_test.go @@ -355,6 +355,42 @@ func TestMoveFile(t *testing.T) { expectNoMessages(t, logReceived) } +func TestTrackMovedAwayFiles(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Moving files while open is unsupported on Windows") + } + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + operator.persister = testutil.NewMockPersister("test") + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\n") + temp1.Close() + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + + // Wait until all goroutines are finished before renaming + operator.wg.Wait() + + newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/") + err := os.Mkdir(newDir, 0777) + require.NoError(t, err) + newFileName := fmt.Sprintf("%s%s", newDir, "newfile.log") + + err = os.Rename(temp1.Name(), newFileName) + require.NoError(t, err) + + movedFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + writeString(t, movedFile, "testlog2\n") + operator.poll(context.Background()) + + waitForMessage(t, logReceived, "testlog2") +} + // TruncateThenWrite tests that, after a file has been truncated, // any new writes are picked up func TestTruncateThenWrite(t *testing.T) { diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go index eba5808f..d5514d79 100644 --- a/operator/builtin/input/file/util_test.go +++ b/operator/builtin/input/file/util_test.go @@ -35,7 +35,7 @@ import ( func newDefaultConfig(tempDir string) *InputConfig { cfg := NewInputConfig("testfile") - cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 200 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} cfg.OutputIDs = []string{"fake"} @@ -122,7 +122,7 @@ func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { select { case e := <-c: return e - case <-time.After(time.Second): + case <-time.After(5 * time.Second): require.FailNow(t, "Timed out waiting for message") return nil } @@ -134,7 +134,7 @@ func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { select { case e := <-c: messages = append(messages, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(5 * time.Second): require.FailNow(t, "Timed out waiting for message") return nil } @@ -146,7 +146,7 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { select { case e := <-c: require.Equal(t, expected, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(5 * time.Second): require.FailNow(t, "Timed out waiting for message", expected) } } @@ -158,7 +158,7 @@ LOOP: select { case e := <-c: receivedMessages = append(receivedMessages, e.Body.(string)) - case <-time.After(time.Second): + case <-time.After(5 * time.Second): break LOOP } } From a97a6270d4a67d025e5a2e3b22be3fd5abaf39dc Mon Sep 17 00:00:00 2001 From: Rock Baek Date: Wed, 9 Jun 2021 11:51:33 -0700 Subject: [PATCH 2/4] stop operator at the end of each test --- operator/builtin/input/file/file_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index 1ad8467f..1724c475 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -111,6 +111,7 @@ func TestReadExistingAndNewLogs(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) operator.persister = testutil.NewMockPersister("test") + defer operator.Stop() // Start with a file with an entry in it, and expect that entry // to come through when we poll for the first time @@ -134,6 +135,7 @@ func TestStartAtEnd(t *testing.T) { cfg.StartAt = "end" }, nil) operator.persister = testutil.NewMockPersister("test") + defer operator.Stop() temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\n") @@ -156,9 +158,9 @@ func TestStartAtEndNewFile(t *testing.T) { operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) operator.persister = testutil.NewMockPersister("test") operator.startAtBeginning = false + defer operator.Stop() operator.poll(context.Background()) - temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\ntestlog2\n") @@ -205,6 +207,7 @@ func TestSplitWrite(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) operator.persister = testutil.NewMockPersister("test") + defer operator.Stop() temp := openTemp(t, tempDir) writeString(t, temp, "testlog1") @@ -447,6 +450,7 @@ func TestFileBatching(t *testing.T) { }, ) operator.persister = testutil.NewMockPersister("test") + defer operator.Stop() temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { @@ -492,6 +496,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + defer operator.Stop() temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) @@ -499,6 +504,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { require.NoError(t, err) reader, err := operator.NewReader(temp.Name(), tempCopy, fp) require.NoError(t, err) + defer reader.Close() writeString(t, temp, "testlog1\n") reader.ReadToEnd(context.Background()) From 72af70be8d2961a318392c3aa5a0275679c7f312 Mon Sep 17 00:00:00 2001 From: Rock Baek Date: Wed, 9 Jun 2021 14:40:55 -0700 Subject: [PATCH 3/4] fix bench test and change it to debug message --- operator/builtin/input/file/benchmark_test.go | 2 +- operator/builtin/input/file/reader.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/operator/builtin/input/file/benchmark_test.go b/operator/builtin/input/file/benchmark_test.go index 5df09e48..0656487d 100644 --- a/operator/builtin/input/file/benchmark_test.go +++ b/operator/builtin/input/file/benchmark_test.go @@ -104,7 +104,7 @@ func BenchmarkFileInput(b *testing.B) { cfg.Include = []string{ "file*.log", } - cfg.MaxConcurrentFiles = 1 + cfg.MaxConcurrentFiles = 2 return cfg }, }, diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index 51603e17..a7b370c9 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -119,7 +119,7 @@ func (f *Reader) ReadToEnd(ctx context.Context) { func (f *Reader) Close() { if f.file != nil { if err := f.file.Close(); err != nil { - f.Errorw("Problem closing reader", "Error", err.Error()) + f.Debugf("Problem closing reader", "Error", err.Error()) } } } From e4664e56c2188e13081f4edd60f599f7a4852c82 Mon Sep 17 00:00:00 2001 From: Rock Baek Date: Thu, 10 Jun 2021 10:23:56 -0700 Subject: [PATCH 4/4] minimum of 2 for max_concurrent_file. --- docs/operators/file_input.md | 2 +- operator/builtin/input/file/config.go | 4 ++-- operator/builtin/input/file/util_test.go | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/operators/file_input.md b/docs/operators/file_input.md index 7e237536..64f84c07 100644 --- a/docs/operators/file_input.md +++ b/docs/operators/file_input.md @@ -19,7 +19,7 @@ The `file_input` operator reads logs from files. It will place the lines read in | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` | | `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). | | `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory | -| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. | +| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. | | `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes | | `resource` | {} | A map of `key: value` pairs to add to the entry's resource | diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index 27daaf16..59227a39 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -98,8 +98,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, fmt.Errorf("`max_log_size` must be positive") } - if c.MaxConcurrentFiles <= 0 { - return nil, fmt.Errorf("`max_concurrent_files` must be positive") + if c.MaxConcurrentFiles <= 1 { + return nil, fmt.Errorf("`max_concurrent_files` must be greater than 1") } if c.FingerprintSize == 0 { diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go index d5514d79..e1f8d167 100644 --- a/operator/builtin/input/file/util_test.go +++ b/operator/builtin/input/file/util_test.go @@ -35,7 +35,7 @@ import ( func newDefaultConfig(tempDir string) *InputConfig { cfg := NewInputConfig("testfile") - cfg.PollInterval = helper.Duration{Duration: 200 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} cfg.OutputIDs = []string{"fake"} @@ -122,7 +122,7 @@ func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { select { case e := <-c: return e - case <-time.After(5 * time.Second): + case <-time.After(3 * time.Second): require.FailNow(t, "Timed out waiting for message") return nil } @@ -134,7 +134,7 @@ func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { select { case e := <-c: messages = append(messages, e.Body.(string)) - case <-time.After(5 * time.Second): + case <-time.After(3 * time.Second): require.FailNow(t, "Timed out waiting for message") return nil } @@ -146,7 +146,7 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { select { case e := <-c: require.Equal(t, expected, e.Body.(string)) - case <-time.After(5 * time.Second): + case <-time.After(3 * time.Second): require.FailNow(t, "Timed out waiting for message", expected) } } @@ -158,7 +158,7 @@ LOOP: select { case e := <-c: receivedMessages = append(receivedMessages, e.Body.(string)) - case <-time.After(5 * time.Second): + case <-time.After(3 * time.Second): break LOOP } }