From e920fdd4f8248c1de284857c0ddda38b7cadadc3 Mon Sep 17 00:00:00 2001 From: Vihas Splunk Date: Thu, 21 Mar 2024 17:45:02 +0530 Subject: [PATCH] chore: add new tracker interface --- pkg/stanza/fileconsumer/config.go | 23 ++-- pkg/stanza/fileconsumer/file.go | 73 +++--------- pkg/stanza/fileconsumer/file_other.go | 16 +-- pkg/stanza/fileconsumer/file_test.go | 4 +- pkg/stanza/fileconsumer/file_windows.go | 7 -- .../fileconsumer/internal/tracker/tracker.go | 106 ++++++++++++++++++ .../internal/tracker/tracker_other.go | 21 ++++ .../internal/tracker/tracker_windows.go | 20 ++++ pkg/stanza/fileconsumer/util_test.go | 2 +- 9 files changed, 179 insertions(+), 93 deletions(-) create mode 100644 pkg/stanza/fileconsumer/internal/tracker/tracker.go create mode 100644 pkg/stanza/fileconsumer/internal/tracker/tracker_other.go create mode 100644 pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 18ad9d84fc38b..a90fbe2de6835 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -16,10 +16,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -164,20 +164,15 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli HeaderConfig: hCfg, DeleteAtEOF: c.DeleteAfterRead, } - knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) - for i := 0; i < len(knownFiles); i++ { - knownFiles[i] = fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2) - } + return &Manager{ - SugaredLogger: logger.With("component", "fileconsumer"), - readerFactory: readerFactory, - fileMatcher: fileMatcher, - pollInterval: c.PollInterval, - maxBatchFiles: c.MaxConcurrentFiles / 2, - maxBatches: c.MaxBatches, - currentPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2), - previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2), - knownFiles: knownFiles, + SugaredLogger: logger.With("component", "fileconsumer"), + readerFactory: readerFactory, + fileMatcher: fileMatcher, + pollInterval: c.PollInterval, + maxBatchFiles: c.MaxConcurrentFiles / 2, + maxBatches: c.MaxBatches, + tracker: tracker.New(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2), }, nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index edc4f89cb0045..7414b60f953ba 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -13,9 +13,9 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -27,15 +27,12 @@ type Manager struct { readerFactory reader.Factory fileMatcher *matcher.Matcher + tracker *tracker.Tracker pollInterval time.Duration persister operator.Persister maxBatches int maxBatchFiles int - - currentPollFiles *fileset.Fileset[*reader.Reader] - previousPollFiles *fileset.Fileset[*reader.Reader] - knownFiles []*fileset.Fileset[*reader.Metadata] } func (m *Manager) Start(persister operator.Persister) error { @@ -55,7 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error { if len(offsets) > 0 { m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") m.readerFactory.FromBeginning = true - m.knownFiles[0].Add(offsets...) + m.tracker.LoadMetadata(offsets) } } @@ -65,21 +62,6 @@ func (m *Manager) Start(persister operator.Persister) error { return nil } -func (m *Manager) closePreviousFiles() { - // m.previousPollFiles -> m.knownFiles[0] - - for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() { - m.knownFiles[0].Add(r.Close()) - } -} - -func (m *Manager) rotateFilesets() { - // shift the filesets at end of every consume() call - // m.knownFiles[0] -> m.knownFiles[1] -> m.knownFiles[2] - copy(m.knownFiles[1:], m.knownFiles) - m.knownFiles[0] = fileset.New[*reader.Metadata](m.maxBatchFiles / 2) -} - // Stop will stop the file monitoring process func (m *Manager) Stop() error { if m.cancel != nil { @@ -87,13 +69,9 @@ func (m *Manager) Stop() error { m.cancel = nil } m.wg.Wait() - m.closePreviousFiles() + m.tracker.ClosePreviousFiles() if m.persister != nil { - checkpoints := make([]*reader.Metadata, 0, m.totalReaders()) - for _, knownFiles := range m.knownFiles { - checkpoints = append(checkpoints, knownFiles.Get()...) - } - if err := checkpoint.Save(context.Background(), m.persister, checkpoints); err != nil { + if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil { m.Errorw("save offsets", zap.Error(err)) } } @@ -151,20 +129,12 @@ func (m *Manager) poll(ctx context.Context) { // Any new files that appear should be consumed entirely m.readerFactory.FromBeginning = true if m.persister != nil { - allCheckpoints := make([]*reader.Metadata, 0, m.totalReaders()) - for _, knownFiles := range m.knownFiles { - allCheckpoints = append(allCheckpoints, knownFiles.Get()...) - } - - for _, r := range m.previousPollFiles.Get() { - allCheckpoints = append(allCheckpoints, r.Metadata) - } - if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil { + if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil { m.Errorw("save offsets", zap.Error(err)) } } // rotate at end of every poll() - m.rotateFilesets() + m.tracker.RotateFilesets() } func (m *Manager) consume(ctx context.Context, paths []string) { @@ -175,7 +145,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { // read new readers to end var wg sync.WaitGroup - for _, r := range m.currentPollFiles.Get() { + for _, r := range m.tracker.CurrentPollFiles() { wg.Add(1) go func(r *reader.Reader) { defer wg.Done() @@ -184,7 +154,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } wg.Wait() - m.postConsume() + m.tracker.EndPoll() } func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { @@ -216,7 +186,6 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval func (m *Manager) makeReaders(paths []string) { - m.currentPollFiles = fileset.New[*reader.Reader](m.maxBatchFiles / 2) for _, path := range paths { fp, file := m.makeFingerprint(path) if fp == nil { @@ -225,9 +194,9 @@ func (m *Manager) makeReaders(paths []string) { // Exclude duplicate paths with the same content. This can happen when files are // being rotated with copy/truncate strategy. (After copy, prior to truncate.) - if r := m.currentPollFiles.Match(fp, fileset.Equal); r != nil { + if r := m.tracker.GetCurrentFile(fp); r != nil { // re-add the reader as Match() removes duplicates - m.currentPollFiles.Add(r) + m.tracker.Add(r) if err := file.Close(); err != nil { m.Debugw("problem closing file", zap.Error(err)) } @@ -240,32 +209,22 @@ func (m *Manager) makeReaders(paths []string) { continue } - m.currentPollFiles.Add(r) + m.tracker.Add(r) } } func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { // Check previous poll cycle for match - if oldReader := m.previousPollFiles.Match(fp, fileset.StartsWith); oldReader != nil { + if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil { return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) } - // Iterate backwards to match newest first - for i := 0; i < len(m.knownFiles); i++ { - if oldMetadata := m.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil { - return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) - } + // Cleck for closed files for match + if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil { + return m.readerFactory.NewReaderFromMetadata(file, oldMetadata) } // If we don't match any previously known files, create a new reader from scratch m.Infow("Started watching file", "path", file.Name()) return m.readerFactory.NewReader(file, fp) } - -func (m *Manager) totalReaders() int { - total := m.previousPollFiles.Len() - for i := 0; i < len(m.knownFiles); i++ { - total += m.knownFiles[i].Len() - } - return total -} diff --git a/pkg/stanza/fileconsumer/file_other.go b/pkg/stanza/fileconsumer/file_other.go index cfa0c80fae093..053643743888e 100644 --- a/pkg/stanza/fileconsumer/file_other.go +++ b/pkg/stanza/fileconsumer/file_other.go @@ -16,10 +16,11 @@ import ( // this can mean either files which were removed, or rotated into a name not matching the pattern // we do this before reading existing files to ensure we emit older log lines before newer ones func (m *Manager) preConsume(ctx context.Context) { - lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len()) + previousPollFiles := m.tracker.PreviousPollFiles() + lostReaders := make([]*reader.Reader, 0, len(previousPollFiles)) OUTER: - for _, oldReader := range m.previousPollFiles.Get() { - for _, newReader := range m.currentPollFiles.Get() { + for _, oldReader := range previousPollFiles { + for _, newReader := range m.tracker.CurrentPollFiles() { if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) { continue OUTER } @@ -49,12 +50,3 @@ OUTER: } lostWG.Wait() } - -// On non-windows platforms, we keep files open between poll cycles so that we can detect -// and read "lost" files, which have been moved out of the matching pattern. -func (m *Manager) postConsume() { - m.closePreviousFiles() - - // m.currentPollFiles -> m.previousPollFiles - m.previousPollFiles = m.currentPollFiles -} diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index d816089f228e1..4368677a2967a 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1143,7 +1143,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) { require.NoError(t, longFile.Close()) // Verify we have no checkpointed files - require.Equal(t, 0, operator.totalReaders()) + require.Equal(t, 0, operator.tracker.TotalReaders()) // Wait until the only line in the short file and // at least one line from the long file have been consumed @@ -1285,7 +1285,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) { operator.wg.Wait() if runtime.GOOS != "windows" { // On windows, we never keep files in previousPollFiles, so we don't expect to see them here - require.Equal(t, operator.previousPollFiles.Len(), 1) + require.Equal(t, len(operator.tracker.PreviousPollFiles()), 1) } // keep append data to file1 and file2 diff --git a/pkg/stanza/fileconsumer/file_windows.go b/pkg/stanza/fileconsumer/file_windows.go index 5481278301dd9..cb296de7f62bf 100644 --- a/pkg/stanza/fileconsumer/file_windows.go +++ b/pkg/stanza/fileconsumer/file_windows.go @@ -11,10 +11,3 @@ import ( func (m *Manager) preConsume(ctx context.Context) { } - -// On windows, we close files immediately after reading because they cannot be moved while open. -func (m *Manager) postConsume() { - // m.currentPollFiles -> m.previousPollFiles - m.previousPollFiles = m.currentPollFiles - m.closePreviousFiles() -} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker.go b/pkg/stanza/fileconsumer/internal/tracker/tracker.go new file mode 100644 index 0000000000000..e9e3360c66161 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" + +import ( + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) + +type Tracker struct { + *zap.SugaredLogger + + maxBatchFiles int + + currentPollFiles *fileset.Fileset[*reader.Reader] + previousPollFiles *fileset.Fileset[*reader.Reader] + knownFiles []*fileset.Fileset[*reader.Metadata] +} + +func New(logger *zap.SugaredLogger, maxBatchFiles int) *Tracker { + knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) + for i := 0; i < len(knownFiles); i++ { + knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) + } + return &Tracker{ + SugaredLogger: logger.With("component", "fileconsumer"), + maxBatchFiles: maxBatchFiles, + currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), + previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles), + knownFiles: knownFiles, + } +} + +func (t *Tracker) Add(reader *reader.Reader) { + // add a new reader for tracking + t.currentPollFiles.Add(reader) +} + +func (t *Tracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader { + return t.currentPollFiles.Match(fp, fileset.Equal) +} + +func (t *Tracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader { + return t.previousPollFiles.Match(fp, fileset.StartsWith) +} + +func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata { + for i := 0; i < len(t.knownFiles); i++ { + if oldMetadata := t.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil { + return oldMetadata + } + } + return nil +} + +func (t *Tracker) GetMetadata() []*reader.Metadata { + // return all known metadata for checkpoining + allCheckpoints := make([]*reader.Metadata, 0, t.TotalReaders()) + for _, knownFiles := range t.knownFiles { + allCheckpoints = append(allCheckpoints, knownFiles.Get()...) + } + + for _, r := range t.previousPollFiles.Get() { + allCheckpoints = append(allCheckpoints, r.Metadata) + } + return allCheckpoints +} + +func (t *Tracker) LoadMetadata(metadata []*reader.Metadata) { + t.knownFiles[0].Add(metadata...) +} + +func (t *Tracker) CurrentPollFiles() []*reader.Reader { + return t.currentPollFiles.Get() +} + +func (t *Tracker) PreviousPollFiles() []*reader.Reader { + return t.previousPollFiles.Get() +} + +func (t *Tracker) ClosePreviousFiles() { + // t.previousPollFiles -> t.knownFiles[0] + + for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() { + t.knownFiles[0].Add(r.Close()) + } +} + +func (t *Tracker) RotateFilesets() { + // shift the filesets at end of every poll() call + // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] + copy(t.knownFiles[1:], t.knownFiles) + t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles / 2) +} + +func (t *Tracker) TotalReaders() int { + total := t.previousPollFiles.Len() + for i := 0; i < len(t.knownFiles); i++ { + total += t.knownFiles[i].Len() + } + return total +} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go new file mode 100644 index 0000000000000..69a389b1169f7 --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_other.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !windows + +package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) + +// On non-windows platforms, we keep files open between poll cycles so that we can detect +// and read "lost" files, which have been moved out of the matching pattern. +func (t *Tracker) EndPoll() { + t.ClosePreviousFiles() + + // m.currentPollFiles -> m.previousPollFiles + t.previousPollFiles = t.currentPollFiles + t.currentPollFiles = fileset.New[*reader.Reader](t.maxBatchFiles) +} diff --git a/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go b/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go new file mode 100644 index 0000000000000..aecdcce14c60e --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/tracker/tracker_windows.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build windows +// +build windows + +package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" + +import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) + +// On windows, we close files immediately after reading because they cannot be moved while open. +func (t *Tracker) EndPoll() { + // m.currentPollFiles -> m.previousPollFiles + t.previousPollFiles = t.currentPollFiles + t.ClosePreviousFiles() + t.currentPollFiles = fileset.New[*reader.Reader](t.maxBatchFiles) +} diff --git a/pkg/stanza/fileconsumer/util_test.go b/pkg/stanza/fileconsumer/util_test.go index aa6c8564d5b10..550ed1ee0245f 100644 --- a/pkg/stanza/fileconsumer/util_test.go +++ b/pkg/stanza/fileconsumer/util_test.go @@ -20,6 +20,6 @@ func testManager(t *testing.T, cfg *Config) (*Manager, *emittest.Sink) { func testManagerWithSink(t *testing.T, cfg *Config, sink *emittest.Sink) *Manager { input, err := cfg.Build(testutil.Logger(t), sink.Callback) require.NoError(t, err) - t.Cleanup(func() { input.closePreviousFiles() }) + t.Cleanup(func() { input.tracker.ClosePreviousFiles() }) return input }