Skip to content

Commit

Permalink
chore: add new tracker interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Vihas Splunk committed Mar 21, 2024
1 parent 384a94d commit e920fdd
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 93 deletions.
23 changes: 9 additions & 14 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -164,20 +164,15 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
}
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2)
}

return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
currentPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
knownFiles: knownFiles,
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: tracker.New(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2),
}, nil
}

Expand Down
73 changes: 16 additions & 57 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand All @@ -27,15 +27,12 @@ type Manager struct {

readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker *tracker.Tracker

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]
}

func (m *Manager) Start(persister operator.Persister) error {
Expand All @@ -55,7 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
m.knownFiles[0].Add(offsets...)
m.tracker.LoadMetadata(offsets)
}
}

Expand All @@ -65,35 +62,16 @@ func (m *Manager) Start(persister operator.Persister) error {
return nil
}

func (m *Manager) closePreviousFiles() {
// m.previousPollFiles -> m.knownFiles[0]

for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() {
m.knownFiles[0].Add(r.Close())
}
}

func (m *Manager) rotateFilesets() {
// shift the filesets at end of every consume() call
// m.knownFiles[0] -> m.knownFiles[1] -> m.knownFiles[2]
copy(m.knownFiles[1:], m.knownFiles)
m.knownFiles[0] = fileset.New[*reader.Metadata](m.maxBatchFiles / 2)
}

// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
m.wg.Wait()
m.closePreviousFiles()
m.tracker.ClosePreviousFiles()
if m.persister != nil {
checkpoints := make([]*reader.Metadata, 0, m.totalReaders())
for _, knownFiles := range m.knownFiles {
checkpoints = append(checkpoints, knownFiles.Get()...)
}
if err := checkpoint.Save(context.Background(), m.persister, checkpoints); err != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
Expand Down Expand Up @@ -151,20 +129,12 @@ func (m *Manager) poll(ctx context.Context) {
// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
allCheckpoints := make([]*reader.Metadata, 0, m.totalReaders())
for _, knownFiles := range m.knownFiles {
allCheckpoints = append(allCheckpoints, knownFiles.Get()...)
}

for _, r := range m.previousPollFiles.Get() {
allCheckpoints = append(allCheckpoints, r.Metadata)
}
if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
// rotate at end of every poll()
m.rotateFilesets()
m.tracker.RotateFilesets()
}

func (m *Manager) consume(ctx context.Context, paths []string) {
Expand All @@ -175,7 +145,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

// read new readers to end
var wg sync.WaitGroup
for _, r := range m.currentPollFiles.Get() {
for _, r := range m.tracker.CurrentPollFiles() {
wg.Add(1)
go func(r *reader.Reader) {
defer wg.Done()
Expand All @@ -184,7 +154,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.postConsume()
m.tracker.EndPoll()
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -216,7 +186,6 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(paths []string) {
m.currentPollFiles = fileset.New[*reader.Reader](m.maxBatchFiles / 2)
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand All @@ -225,9 +194,9 @@ func (m *Manager) makeReaders(paths []string) {

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.currentPollFiles.Match(fp, fileset.Equal); r != nil {
if r := m.tracker.GetCurrentFile(fp); r != nil {
// re-add the reader as Match() removes duplicates
m.currentPollFiles.Add(r)
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
Expand All @@ -240,32 +209,22 @@ func (m *Manager) makeReaders(paths []string) {
continue
}

m.currentPollFiles.Add(r)
m.tracker.Add(r)
}
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.previousPollFiles.Match(fp, fileset.StartsWith); oldReader != nil {
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Iterate backwards to match newest first
for i := 0; i < len(m.knownFiles); i++ {
if oldMetadata := m.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
// Cleck for closed files for match
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}

// If we don't match any previously known files, create a new reader from scratch
m.Infow("Started watching file", "path", file.Name())
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) totalReaders() int {
total := m.previousPollFiles.Len()
for i := 0; i < len(m.knownFiles); i++ {
total += m.knownFiles[i].Len()
}
return total
}
16 changes: 4 additions & 12 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
// this can mean either files which were removed, or rotated into a name not matching the pattern
// we do this before reading existing files to ensure we emit older log lines before newer ones
func (m *Manager) preConsume(ctx context.Context) {
lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len())
previousPollFiles := m.tracker.PreviousPollFiles()
lostReaders := make([]*reader.Reader, 0, len(previousPollFiles))
OUTER:
for _, oldReader := range m.previousPollFiles.Get() {
for _, newReader := range m.currentPollFiles.Get() {
for _, oldReader := range previousPollFiles {
for _, newReader := range m.tracker.CurrentPollFiles() {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
Expand Down Expand Up @@ -49,12 +50,3 @@ OUTER:
}
lostWG.Wait()
}

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (m *Manager) postConsume() {
m.closePreviousFiles()

// m.currentPollFiles -> m.previousPollFiles
m.previousPollFiles = m.currentPollFiles
}
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
require.NoError(t, longFile.Close())

// Verify we have no checkpointed files
require.Equal(t, 0, operator.totalReaders())
require.Equal(t, 0, operator.tracker.TotalReaders())

// Wait until the only line in the short file and
// at least one line from the long file have been consumed
Expand Down Expand Up @@ -1285,7 +1285,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
operator.wg.Wait()
if runtime.GOOS != "windows" {
// On windows, we never keep files in previousPollFiles, so we don't expect to see them here
require.Equal(t, operator.previousPollFiles.Len(), 1)
require.Equal(t, len(operator.tracker.PreviousPollFiles()), 1)
}

// keep append data to file1 and file2
Expand Down
7 changes: 0 additions & 7 deletions pkg/stanza/fileconsumer/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,3 @@ import (

func (m *Manager) preConsume(ctx context.Context) {
}

// On windows, we close files immediately after reading because they cannot be moved while open.
func (m *Manager) postConsume() {
// m.currentPollFiles -> m.previousPollFiles
m.previousPollFiles = m.currentPollFiles
m.closePreviousFiles()
}
106 changes: 106 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type Tracker struct {
*zap.SugaredLogger

maxBatchFiles int

currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]
}

func New(logger *zap.SugaredLogger, maxBatchFiles int) *Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
return &Tracker{
SugaredLogger: logger.With("component", "fileconsumer"),
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
knownFiles: knownFiles,
}
}

func (t *Tracker) Add(reader *reader.Reader) {
// add a new reader for tracking
t.currentPollFiles.Add(reader)
}

func (t *Tracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.currentPollFiles.Match(fp, fileset.Equal)
}

func (t *Tracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.previousPollFiles.Match(fp, fileset.StartsWith)
}

func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata {
for i := 0; i < len(t.knownFiles); i++ {
if oldMetadata := t.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil {
return oldMetadata
}
}
return nil
}

func (t *Tracker) GetMetadata() []*reader.Metadata {
// return all known metadata for checkpoining
allCheckpoints := make([]*reader.Metadata, 0, t.TotalReaders())
for _, knownFiles := range t.knownFiles {
allCheckpoints = append(allCheckpoints, knownFiles.Get()...)
}

for _, r := range t.previousPollFiles.Get() {
allCheckpoints = append(allCheckpoints, r.Metadata)
}
return allCheckpoints
}

func (t *Tracker) LoadMetadata(metadata []*reader.Metadata) {
t.knownFiles[0].Add(metadata...)
}

func (t *Tracker) CurrentPollFiles() []*reader.Reader {
return t.currentPollFiles.Get()
}

func (t *Tracker) PreviousPollFiles() []*reader.Reader {
return t.previousPollFiles.Get()
}

func (t *Tracker) ClosePreviousFiles() {
// t.previousPollFiles -> t.knownFiles[0]

for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() {
t.knownFiles[0].Add(r.Close())
}
}

func (t *Tracker) RotateFilesets() {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles / 2)
}

func (t *Tracker) TotalReaders() int {
total := t.previousPollFiles.Len()
for i := 0; i < len(t.knownFiles); i++ {
total += t.knownFiles[i].Len()
}
return total
}
Loading

0 comments on commit e920fdd

Please sign in to comment.