From 114e5cb4fb544bbb17c02aae3e198c4f394862af Mon Sep 17 00:00:00 2001 From: jsirianni Date: Thu, 24 Jun 2021 23:34:03 -0400 Subject: [PATCH] port otel file rotation tracking https://github.com/open-telemetry/opentelemetry-log-collection/pull/182 --- docs/operators/file_input.md | 9 ++- operator/builtin/input/file/benchmark_test.go | 2 +- operator/builtin/input/file/config.go | 4 +- operator/builtin/input/file/file.go | 57 ++++++++++++++----- operator/builtin/input/file/file_test.go | 18 ++++-- operator/builtin/input/file/reader.go | 10 ++-- operator/builtin/input/file/rotation_test.go | 55 ++++++++++++++++-- operator/builtin/input/file/util_test.go | 8 +-- 8 files changed, 127 insertions(+), 36 deletions(-) diff --git a/docs/operators/file_input.md b/docs/operators/file_input.md index 5bd4ffa92..facda5370 100644 --- a/docs/operators/file_input.md +++ b/docs/operators/file_input.md @@ -19,7 +19,7 @@ The `file_input` operator reads logs from files. It will place the lines read in | `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` | | `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). | | `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory | -| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. One batch will be processed per `poll_interval`. | +| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. | | `labels` | {} | A map of `key: value` labels to add to the entry's labels | | `resource` | {} | A map of `key: value` labels to add to the entry's resource | @@ -35,6 +35,13 @@ If set, the `multiline` configuration block instructs the `file_input` operator The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that match either the beginning of a new log entry, or the end of a log entry. +Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control. + +### File rotation + +When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss. +To avoid the data loss, choose move/create rotation method and set `max_concurrent_files` higher than the twice of the number of files to tail. + ### Supported encodings | Key | Description diff --git a/operator/builtin/input/file/benchmark_test.go b/operator/builtin/input/file/benchmark_test.go index 11a22ff49..9cd4280cb 100644 --- a/operator/builtin/input/file/benchmark_test.go +++ b/operator/builtin/input/file/benchmark_test.go @@ -90,7 +90,7 @@ func BenchmarkFileInput(b *testing.B) { cfg.Include = []string{ "file*.log", } - cfg.MaxConcurrentFiles = 1 + cfg.MaxConcurrentFiles = 2 return cfg }, }, diff --git a/operator/builtin/input/file/config.go b/operator/builtin/input/file/config.go index bf0eb9a92..f560f253f 100644 --- a/operator/builtin/input/file/config.go +++ b/operator/builtin/input/file/config.go @@ -83,8 +83,8 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator, return nil, fmt.Errorf("`max_log_size` must be positive") } - if c.MaxConcurrentFiles <= 0 { - return nil, fmt.Errorf("`max_concurrent_files` must be positive") + if c.MaxConcurrentFiles <= 1 { + return nil, fmt.Errorf("`max_concurrent_files` must be greater than 1") } if c.FingerprintSize == 0 { diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index a7a4734bd..82f347bc6 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -33,8 +33,10 @@ type InputOperator struct { persist helper.Persister - knownFiles []*Reader - queuedMatches []string + knownFiles []*Reader + queuedMatches []string + maxBatchFiles int + lastPollReaders []*Reader startAtBeginning bool @@ -69,6 +71,12 @@ func (f *InputOperator) Start() error { func (f *InputOperator) Stop() error { f.cancel() f.wg.Wait() + for _, reader := range f.lastPollReaders { + reader.Close() + } + for _, reader := range f.knownFiles { + reader.Close() + } f.knownFiles = nil f.cancel = nil return nil @@ -99,8 +107,8 @@ func (f *InputOperator) startPoller(ctx context.Context) { func (f *InputOperator) poll(ctx context.Context) { var matches []string - if len(f.queuedMatches) > f.MaxConcurrentFiles { - matches, f.queuedMatches = f.queuedMatches[:f.MaxConcurrentFiles], f.queuedMatches[f.MaxConcurrentFiles:] + if len(f.queuedMatches) > f.maxBatchFiles { + matches, f.queuedMatches = f.queuedMatches[:f.maxBatchFiles], f.queuedMatches[f.maxBatchFiles:] } else if len(f.queuedMatches) > 0 { matches, f.queuedMatches = f.queuedMatches, make([]string, 0) } else { @@ -114,15 +122,35 @@ func (f *InputOperator) poll(ctx context.Context) { matches = getMatches(f.Include, f.Exclude) if f.firstCheck && len(matches) == 0 { f.Warnw("no files match the configured include patterns", "include", f.Include) - } else if len(matches) > f.MaxConcurrentFiles { - matches, f.queuedMatches = matches[:f.MaxConcurrentFiles], matches[f.MaxConcurrentFiles:] + } else if len(matches) > f.maxBatchFiles { + matches, f.queuedMatches = matches[:f.maxBatchFiles], matches[f.maxBatchFiles:] } } readers := f.makeReaders(matches) f.firstCheck = false + // Detect files that have been rotated out of matching pattern + lostReaders := make([]*Reader, 0, len(f.lastPollReaders)) +OUTER: + for _, oldReader := range f.lastPollReaders { + for _, reader := range readers { + if reader.Fingerprint.StartsWith(oldReader.Fingerprint) { + continue OUTER + } + } + lostReaders = append(lostReaders, oldReader) + } + var wg sync.WaitGroup + for _, reader := range lostReaders { + wg.Add(1) + go func(r *Reader) { + defer wg.Done() + r.ReadToEnd(ctx) + }(reader) + } + for _, reader := range readers { wg.Add(1) go func(r *Reader) { @@ -134,6 +162,13 @@ func (f *InputOperator) poll(ctx context.Context) { // Wait until all the reader goroutines are finished wg.Wait() + // Close all files + for _, reader := range f.lastPollReaders { + reader.Close() + } + + f.lastPollReaders = readers + f.saveCurrent(readers) f.syncLastPollFiles() } @@ -200,7 +235,7 @@ func (f *InputOperator) makeReaders(filePaths []string) []*Reader { // Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files OUTER: - for i := 0; i < len(fps); { + for i := 0; i < len(fps); i++ { fp := fps[i] if len(fp.FirstBytes) == 0 { files[i].Close() @@ -210,12 +245,7 @@ OUTER: } - for j := 0; j < len(fps); j++ { - if i == j { - // Skip checking itself - continue - } - + for j := i + 1; j < len(fps); j++ { fp2 := fps[j] if fp.StartsWith(fp2) || fp2.StartsWith(fp) { // Exclude @@ -224,7 +254,6 @@ OUTER: continue OUTER } } - i++ } readers := make([]*Reader, 0, len(fps)) diff --git a/operator/builtin/input/file/file_test.go b/operator/builtin/input/file/file_test.go index c0da5f1e2..a261038cc 100644 --- a/operator/builtin/input/file/file_test.go +++ b/operator/builtin/input/file/file_test.go @@ -94,6 +94,7 @@ func TestReadNewLogs(t *testing.T) { func TestReadExistingAndNewLogs(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + defer operator.Stop() // Start with a file with an entry in it, and expect that entry // to come through when we poll for the first time @@ -116,6 +117,7 @@ func TestStartAtEnd(t *testing.T) { operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *InputConfig) { cfg.StartAt = "end" }, nil) + defer operator.Stop() temp := openTemp(t, tempDir) writeString(t, temp, "testlog1\n") @@ -137,6 +139,7 @@ func TestStartAtEndNewFile(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) operator.startAtBeginning = false + defer operator.Stop() operator.poll(context.Background()) @@ -185,6 +188,7 @@ func TestSkipEmpty(t *testing.T) { func TestSplitWrite(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + defer operator.Stop() temp := openTemp(t, tempDir) writeString(t, temp, "testlog1") @@ -406,10 +410,11 @@ func TestFileBatching(t *testing.T) { files := 100 linesPerFile := 10 - maxConcurrentFiles := 10 + maxConcurrentFiles := 20 + maxBatchFiles := maxConcurrentFiles / 2 - expectedBatches := files / maxConcurrentFiles // assumes no remainder - expectedLinesPerBatch := maxConcurrentFiles * linesPerFile + expectedBatches := files / maxBatchFiles // assumes no remainder + expectedLinesPerBatch := maxBatchFiles * linesPerFile expectedMessages := make([]string, 0, files*linesPerFile) actualMessages := make([]string, 0, files*linesPerFile) @@ -419,9 +424,10 @@ func TestFileBatching(t *testing.T) { cfg.MaxConcurrentFiles = maxConcurrentFiles }, func(out *testutil.FakeOutput) { - out.Received = make(chan *entry.Entry, expectedLinesPerBatch) + out.Received = make(chan *entry.Entry, expectedLinesPerBatch*2) }, ) + defer operator.Stop() temps := make([]*os.File, 0, files) for i := 0; i < files; i++ { @@ -441,7 +447,6 @@ func TestFileBatching(t *testing.T) { // poll once so we can validate that files were batched operator.poll(context.Background()) actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...) - expectNoMessagesUntil(t, logReceived, 10*time.Millisecond) } require.ElementsMatch(t, expectedMessages, actualMessages) @@ -459,7 +464,6 @@ func TestFileBatching(t *testing.T) { // poll once so we can validate that files were batched operator.poll(context.Background()) actualMessages = append(actualMessages, waitForN(t, logReceived, expectedLinesPerBatch)...) - expectNoMessagesUntil(t, logReceived, 10*time.Millisecond) } require.ElementsMatch(t, expectedMessages, actualMessages) @@ -469,6 +473,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { t.Parallel() operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + defer operator.Stop() temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) @@ -476,6 +481,7 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { require.NoError(t, err) reader, err := operator.NewReader(temp.Name(), tempCopy, fp) require.NoError(t, err) + defer reader.Close() writeString(t, temp, "testlog1\n") reader.ReadToEnd(context.Background()) diff --git a/operator/builtin/input/file/reader.go b/operator/builtin/input/file/reader.go index dfecfefc2..ddd65db29 100644 --- a/operator/builtin/input/file/reader.go +++ b/operator/builtin/input/file/reader.go @@ -69,8 +69,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error { // ReadToEnd will read until the end of the file func (f *Reader) ReadToEnd(ctx context.Context) { - defer f.file.Close() - if _, err := f.file.Seek(f.Offset, 0); err != nil { f.Errorw("Failed to seek", zap.Error(err)) return @@ -103,8 +101,12 @@ func (f *Reader) ReadToEnd(ctx context.Context) { } // Close will close the file -func (f *Reader) Close() error { - return f.file.Close() +func (f *Reader) Close() { + if f.file != nil { + if err := f.file.Close(); err != nil { + f.Debugf("Problem closing reader", "Error", err.Error()) + } + } } // Emit creates an entry with the decoded message and sends it to the next diff --git a/operator/builtin/input/file/rotation_test.go b/operator/builtin/input/file/rotation_test.go index 8c820c68f..93738e589 100644 --- a/operator/builtin/input/file/rotation_test.go +++ b/operator/builtin/input/file/rotation_test.go @@ -17,7 +17,15 @@ import ( "github.com/stretchr/testify/require" ) +const WINDOWS_OS = "windows" + func TestMultiFileRotate(t *testing.T) { + if runtime.GOOS == WINDOWS_OS { + // Windows has very poor support for moving active files, so rotation is less commonly used + // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 + t.Skip() + } + t.Parallel() getMessage := func(f, k, m int) string { return fmt.Sprintf("file %d-%d, message %d", f, k, m) } @@ -67,7 +75,7 @@ func TestMultiFileRotate(t *testing.T) { } func TestMultiFileRotateSlow(t *testing.T) { - if runtime.GOOS == "windows" { + if runtime.GOOS == WINDOWS_OS { // Windows has very poor support for moving active files, so rotation is less commonly used // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 t.Skip() @@ -340,15 +348,19 @@ func TestRotation(t *testing.T) { } for _, tc := range cases { - t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false)) - t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true)) + if runtime.GOOS != WINDOWS_OS { + // Windows has very poor support for moving active files, so rotation is less commonly used + // This may possibly be handled better in Go 1.16: https://github.com/golang/go/issues/35358 + t.Run(fmt.Sprintf("%s/MoveCreateTimestamped", tc.name), tc.run(tc, false, false)) + t.Run(fmt.Sprintf("%s/MoveCreateSequential", tc.name), tc.run(tc, false, true)) + } t.Run(fmt.Sprintf("%s/CopyTruncateTimestamped", tc.name), tc.run(tc, true, false)) t.Run(fmt.Sprintf("%s/CopyTruncateSequential", tc.name), tc.run(tc, true, true)) } } func TestMoveFile(t *testing.T) { - if runtime.GOOS == "windows" { + if runtime.GOOS == WINDOWS_OS { t.Skip("Moving files while open is unsupported on Windows") } t.Parallel() @@ -372,6 +384,41 @@ func TestMoveFile(t *testing.T) { expectNoMessages(t, logReceived) } +func TestTrackMovedAwayFiles(t *testing.T) { + if runtime.GOOS == WINDOWS_OS { + t.Skip("Moving files while open is unsupported on Windows") + } + t.Parallel() + operator, logReceived, tempDir := newTestFileOperator(t, nil, nil) + + temp1 := openTemp(t, tempDir) + writeString(t, temp1, "testlog1\n") + temp1.Close() + + operator.poll(context.Background()) + defer operator.Stop() + + waitForMessage(t, logReceived, "testlog1") + + // Wait until all goroutines are finished before renaming + operator.wg.Wait() + + newDir := fmt.Sprintf("%s%s", tempDir[:len(tempDir)-1], "_new/") + err := os.Mkdir(newDir, 0777) + require.NoError(t, err) + newFileName := fmt.Sprintf("%s%s", newDir, "newfile.log") + + err = os.Rename(temp1.Name(), newFileName) + require.NoError(t, err) + + movedFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + require.NoError(t, err) + writeString(t, movedFile, "testlog2\n") + operator.poll(context.Background()) + + waitForMessage(t, logReceived, "testlog2") +} + // TruncateThenWrite tests that, after a file has been truncated, // any new writes are picked up func TestTruncateThenWrite(t *testing.T) { diff --git a/operator/builtin/input/file/util_test.go b/operator/builtin/input/file/util_test.go index 8f1299bb1..fa1c8f4af 100644 --- a/operator/builtin/input/file/util_test.go +++ b/operator/builtin/input/file/util_test.go @@ -20,7 +20,7 @@ import ( func newDefaultConfig(tempDir string) *InputConfig { cfg := NewInputConfig("testfile") - cfg.PollInterval = helper.Duration{Duration: 50 * time.Millisecond} + cfg.PollInterval = helper.Duration{Duration: 200 * time.Millisecond} cfg.StartAt = "beginning" cfg.Include = []string{fmt.Sprintf("%s/*", tempDir)} cfg.OutputIDs = []string{"fake"} @@ -107,7 +107,7 @@ func waitForOne(t *testing.T, c chan *entry.Entry) *entry.Entry { select { case e := <-c: return e - case <-time.After(time.Second): + case <-time.After(3 * time.Second): require.FailNow(t, "Timed out waiting for message") return nil } @@ -119,7 +119,7 @@ func waitForN(t *testing.T, c chan *entry.Entry, n int) []string { select { case e := <-c: messages = append(messages, e.Record.(string)) - case <-time.After(time.Second): + case <-time.After(3 * time.Second): require.FailNow(t, "Timed out waiting for message") return nil } @@ -131,7 +131,7 @@ func waitForMessage(t *testing.T, c chan *entry.Entry, expected string) { select { case e := <-c: require.Equal(t, expected, e.Record.(string)) - case <-time.After(time.Second): + case <-time.After(3 * time.Second): require.FailNow(t, "Timed out waiting for message", expected) } }