Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

feat(operator/recombine): do not combine logs before first_entry matches line #416

Merged
merged 5 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (c *RecombineOperatorConfig) Build(logger *zap.SugaredLogger) (operator.Ope
ticker: time.NewTicker(c.ForceFlushTimeout),
chClose: make(chan struct{}),
sourceIdentifier: c.SourceIdentifier,
firstLineMatched: make(map[string]bool),
}, nil
}

Expand All @@ -139,6 +140,9 @@ type RecombineOperator struct {
chClose chan struct{}
sourceIdentifier entry.Field

// firstLineMatched keeps information if any entry matched first line
firstLineMatched map[string]bool

sync.Mutex
batchMap map[string][]*entry.Entry
}
Expand Down Expand Up @@ -220,17 +224,26 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
s = DefaultSourceIdentifier
}

// This is the first entry in the next batch
if matches && r.matchIndicatesFirst() {
// Flush the existing batch
err := r.flushSource(s)
if err != nil {
return err
// For first entry verification
if r.matchIndicatesFirst() {
// If entry matches, set firstLineMatched for source to true
if !r.firstLineMatched[s] && matches {
r.firstLineMatched[s] = true
}

// Add the current log to the new batch
r.addToBatch(ctx, e, s)
return nil
// This is the first entry in the next batch
// or any entry if there wasn't any matching entry before
if matches || !r.firstLineMatched[s] {
// Flush the existing batch
err := r.flushSource(s)
if err != nil {
return err
}

// Add the current log to the new batch
r.addToBatch(ctx, e, s)
return nil
}
djaglowski marked this conversation as resolved.
Show resolved Hide resolved
}

// This is the last entry in a complete batch
Expand Down
49 changes: 49 additions & 0 deletions operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,55 @@ func TestRecombineOperator(t *testing.T) {
entryWithBody(t2, "test1\ntest2"),
},
},
{
"EntriesNonMatchingForFirstEntry",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "$body == 'test1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
return cfg
}(),
[]*entry.Entry{
entryWithBody(t1, "test2"),
entryWithBody(t2, "test3"),
entryWithBody(t2, "test4"),
entryWithBody(t2, "test5"),
},
[]*entry.Entry{
entryWithBody(t1, "test2"),
entryWithBody(t2, "test3"),
entryWithBody(t2, "test4"),
},
},
{
"EntriesMatchingForFirstEntryOneFileOnly",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "$body == 'file1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file4", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nfile3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file1\nfile2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
},
},
{
"CombineWithEmptyString",
func() *RecombineOperatorConfig {
Expand Down