Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][pkg/stanza] move file handling to internal package #31506

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -162,20 +162,15 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback, opts ...Opt
HeaderConfig: hCfg,
DeleteAtEOF: c.DeleteAfterRead,
}
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](c.MaxConcurrentFiles / 2)
}

return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
currentPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
previousPollFiles: fileset.New[*reader.Reader](c.MaxConcurrentFiles / 2),
knownFiles: knownFiles,
SugaredLogger: logger.With("component", "fileconsumer"),
readerFactory: readerFactory,
fileMatcher: fileMatcher,
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
tracker: tracker.New(logger.With("component", "fileconsumer"), c.MaxConcurrentFiles/2),
}, nil
}

Expand Down
72 changes: 16 additions & 56 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand All @@ -27,15 +27,12 @@ type Manager struct {

readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker *tracker.Tracker

pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int

currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]
}

func (m *Manager) Start(persister operator.Persister) error {
Expand All @@ -55,7 +52,7 @@ func (m *Manager) Start(persister operator.Persister) error {
if len(offsets) > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
m.knownFiles[0].Add(offsets...)
m.tracker.LoadMetadata(offsets)
}
}

Expand All @@ -65,34 +62,16 @@ func (m *Manager) Start(persister operator.Persister) error {
return nil
}

func (m *Manager) closePreviousFiles() {
// m.previousPollFiles -> m.knownFiles[0]
for r, _ := m.previousPollFiles.Pop(); r != nil; r, _ = m.previousPollFiles.Pop() {
m.knownFiles[0].Add(r.Close())
}
}

func (m *Manager) rotateFilesets() {
// shift the filesets at end of every consume() call
// m.knownFiles[0] -> m.knownFiles[1] -> m.knownFiles[2]
copy(m.knownFiles[1:], m.knownFiles)
m.knownFiles[0] = fileset.New[*reader.Metadata](m.maxBatchFiles / 2)
}

// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
m.wg.Wait()
m.closePreviousFiles()
m.tracker.ClosePreviousFiles()
if m.persister != nil {
checkpoints := make([]*reader.Metadata, 0, m.totalReaders())
for _, knownFiles := range m.knownFiles {
checkpoints = append(checkpoints, knownFiles.Get()...)
}
if err := checkpoint.Save(context.Background(), m.persister, checkpoints); err != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
Expand Down Expand Up @@ -150,20 +129,12 @@ func (m *Manager) poll(ctx context.Context) {
// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
allCheckpoints := make([]*reader.Metadata, 0, m.totalReaders())
for _, knownFiles := range m.knownFiles {
allCheckpoints = append(allCheckpoints, knownFiles.Get()...)
}

for _, r := range m.previousPollFiles.Get() {
allCheckpoints = append(allCheckpoints, r.Metadata)
}
if err := checkpoint.Save(context.Background(), m.persister, allCheckpoints); err != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.Errorw("save offsets", zap.Error(err))
}
}
// rotate at end of every poll()
m.rotateFilesets()
m.tracker.EndPoll()
}

func (m *Manager) consume(ctx context.Context, paths []string) {
Expand All @@ -174,7 +145,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

// read new readers to end
var wg sync.WaitGroup
for _, r := range m.currentPollFiles.Get() {
for _, r := range m.tracker.CurrentPollFiles() {
wg.Add(1)
go func(r *reader.Reader) {
defer wg.Done()
Expand All @@ -183,7 +154,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}
wg.Wait()

m.postConsume()
m.tracker.EndConsume()
}

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
Expand Down Expand Up @@ -215,7 +186,6 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(paths []string) {
m.currentPollFiles = fileset.New[*reader.Reader](m.maxBatchFiles / 2)
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand All @@ -224,9 +194,9 @@ func (m *Manager) makeReaders(paths []string) {

// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.currentPollFiles.Match(fp, fileset.Equal); r != nil {
if r := m.tracker.GetCurrentFile(fp); r != nil {
// re-add the reader as Match() removes duplicates
m.currentPollFiles.Add(r)
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
}
Expand All @@ -239,32 +209,22 @@ func (m *Manager) makeReaders(paths []string) {
continue
}

m.currentPollFiles.Add(r)
m.tracker.Add(r)
}
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.previousPollFiles.Match(fp, fileset.StartsWith); oldReader != nil {
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}

// Iterate backwards to match newest first
for i := 0; i < len(m.knownFiles); i++ {
if oldMetadata := m.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}
// Cleck for closed files for match
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
return m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
}

// If we don't match any previously known files, create a new reader from scratch
m.Infow("Started watching file", "path", file.Name())
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) totalReaders() int {
total := m.previousPollFiles.Len()
for i := 0; i < len(m.knownFiles); i++ {
total += m.knownFiles[i].Len()
}
return total
}
16 changes: 4 additions & 12 deletions pkg/stanza/fileconsumer/file_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ func (m *Manager) readLostFiles(ctx context.Context) {
// since we are deleting the files before they can become lost.
return
}
lostReaders := make([]*reader.Reader, 0, m.previousPollFiles.Len())
previousPollFiles := m.tracker.PreviousPollFiles()
lostReaders := make([]*reader.Reader, 0, len(previousPollFiles))
OUTER:
for _, oldReader := range m.previousPollFiles.Get() {
for _, newReader := range m.currentPollFiles.Get() {
for _, oldReader := range previousPollFiles {
for _, newReader := range m.tracker.CurrentPollFiles() {
if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) {
continue OUTER
}
Expand Down Expand Up @@ -54,12 +55,3 @@ OUTER:
}
lostWG.Wait()
}

// On non-windows platforms, we keep files open between poll cycles so that we can detect
// and read "lost" files, which have been moved out of the matching pattern.
func (m *Manager) postConsume() {
m.closePreviousFiles()

// m.currentPollFiles -> m.previousPollFiles
m.previousPollFiles = m.currentPollFiles
}
4 changes: 2 additions & 2 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
require.NoError(t, longFile.Close())

// Verify we have no checkpointed files
require.Equal(t, 0, operator.totalReaders())
require.Equal(t, 0, operator.tracker.TotalReaders())

// Wait until the only line in the short file and
// at least one line from the long file have been consumed
Expand Down Expand Up @@ -1298,7 +1298,7 @@ func TestStalePartialFingerprintDiscarded(t *testing.T) {
operator.wg.Wait()
if runtime.GOOS != "windows" {
// On windows, we never keep files in previousPollFiles, so we don't expect to see them here
require.Equal(t, operator.previousPollFiles.Len(), 1)
require.Equal(t, len(operator.tracker.PreviousPollFiles()), 1)
}

// keep append data to file1 and file2
Expand Down
7 changes: 0 additions & 7 deletions pkg/stanza/fileconsumer/file_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,3 @@ import (
// Noop on windows because we close files immediately after reading.
func (m *Manager) readLostFiles(ctx context.Context) {
}

// On windows, we close files immediately after reading because they cannot be moved while open.
func (m *Manager) postConsume() {
// m.currentPollFiles -> m.previousPollFiles
m.previousPollFiles = m.currentPollFiles
m.closePreviousFiles()
}
106 changes: 106 additions & 0 deletions pkg/stanza/fileconsumer/internal/tracker/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
)

type Tracker struct {
*zap.SugaredLogger

maxBatchFiles int

currentPollFiles *fileset.Fileset[*reader.Reader]
previousPollFiles *fileset.Fileset[*reader.Reader]
knownFiles []*fileset.Fileset[*reader.Metadata]
}

func New(logger *zap.SugaredLogger, maxBatchFiles int) *Tracker {
knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3)
for i := 0; i < len(knownFiles); i++ {
knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles)
}
return &Tracker{
SugaredLogger: logger.With("component", "fileconsumer"),
maxBatchFiles: maxBatchFiles,
currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles),
knownFiles: knownFiles,
}
}

func (t *Tracker) Add(reader *reader.Reader) {
// add a new reader for tracking
t.currentPollFiles.Add(reader)
}

func (t *Tracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.currentPollFiles.Match(fp, fileset.Equal)
}

func (t *Tracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader {
return t.previousPollFiles.Match(fp, fileset.StartsWith)
}

func (t *Tracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata {
for i := 0; i < len(t.knownFiles); i++ {
if oldMetadata := t.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil {
return oldMetadata
}
}
return nil
}

func (t *Tracker) GetMetadata() []*reader.Metadata {
// return all known metadata for checkpoining
allCheckpoints := make([]*reader.Metadata, 0, t.TotalReaders())
for _, knownFiles := range t.knownFiles {
allCheckpoints = append(allCheckpoints, knownFiles.Get()...)
}

for _, r := range t.previousPollFiles.Get() {
allCheckpoints = append(allCheckpoints, r.Metadata)
}
return allCheckpoints
}

func (t *Tracker) LoadMetadata(metadata []*reader.Metadata) {
t.knownFiles[0].Add(metadata...)
}

func (t *Tracker) CurrentPollFiles() []*reader.Reader {
return t.currentPollFiles.Get()
}

func (t *Tracker) PreviousPollFiles() []*reader.Reader {
return t.previousPollFiles.Get()
}

func (t *Tracker) ClosePreviousFiles() {
// t.previousPollFiles -> t.knownFiles[0]

for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() {
t.knownFiles[0].Add(r.Close())
}
}

func (t *Tracker) EndPoll() {
// shift the filesets at end of every poll() call
// t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2]
copy(t.knownFiles[1:], t.knownFiles)
t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles / 2)
}

func (t *Tracker) TotalReaders() int {
total := t.previousPollFiles.Len()
for i := 0; i < len(t.knownFiles); i++ {
total += t.knownFiles[i].Len()
}
return total
}
Loading