Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

feat(operator/recombine): do not combine logs before first_entry matches line #416

Merged
merged 5 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions operator/transformer/recombine/recombine.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
s = DefaultSourceIdentifier
}

switch {
// This is the first entry in the next batch
if matches && r.matchIndicatesFirst() {
case matches && r.matchIndicatesFirst():
// Flush the existing batch
err := r.flushSource(s)
if err != nil {
Expand All @@ -231,16 +232,13 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error {
// Add the current log to the new batch
r.addToBatch(ctx, e, s)
return nil
}

// This is the last entry in a complete batch
if matches && r.matchIndicatesLast() {
case matches && r.matchIndicatesLast():
fallthrough
// When matching on first entry, never batch partial first. Just emit immediately
case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0:
r.addToBatch(ctx, e, s)
err := r.flushSource(s)
if err != nil {
return err
}
return nil
return r.flushSource(s)
}

// This is neither the first entry of a new log,
Expand Down
63 changes: 57 additions & 6 deletions operator/transformer/recombine/recombine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"github.com/open-telemetry/opentelemetry-log-collection/testutil"
)

const (
MatchAll string = "true"
)

func TestRecombineOperator(t *testing.T) {
t1 := time.Date(2020, time.April, 11, 21, 34, 01, 0, time.UTC)
t2 := time.Date(2020, time.April, 11, 21, 34, 02, 0, time.UTC)
Expand Down Expand Up @@ -56,7 +60,7 @@ func TestRecombineOperator(t *testing.T) {
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "true"
cfg.IsFirstEntry = MatchAll
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
Expand All @@ -68,7 +72,7 @@ func TestRecombineOperator(t *testing.T) {
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "true"
cfg.IsLastEntry = MatchAll
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
Expand All @@ -80,7 +84,7 @@ func TestRecombineOperator(t *testing.T) {
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "true"
cfg.IsFirstEntry = MatchAll
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
Expand All @@ -92,7 +96,7 @@ func TestRecombineOperator(t *testing.T) {
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsLastEntry = "true"
cfg.IsLastEntry = MatchAll
cfg.OutputIDs = []string{"fake"}
return cfg
}(),
Expand Down Expand Up @@ -133,6 +137,53 @@ func TestRecombineOperator(t *testing.T) {
entryWithBody(t2, "test1\ntest2"),
},
},
{
"EntriesNonMatchingForFirstEntry",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "$body == 'test1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
return cfg
}(),
[]*entry.Entry{
entryWithBody(t1, "test2"),
entryWithBody(t2, "test3"),
entryWithBody(t2, "test4"),
},
[]*entry.Entry{
entryWithBody(t1, "test2"),
entryWithBody(t2, "test3"),
entryWithBody(t2, "test4"),
},
},
{
"EntriesMatchingForFirstEntryOneFileOnly",
func() *RecombineOperatorConfig {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "$body == 'file1'"
cfg.OutputIDs = []string{"fake"}
cfg.OverwriteWith = "newest"
return cfg
}(),
[]*entry.Entry{
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
},
[]*entry.Entry{
entryWithBodyAttr(t1, "file1\nfile3", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file1\nfile2", map[string]string{"file.path": "file1"}),
entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}),
entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}),
},
},
{
"CombineWithEmptyString",
func() *RecombineOperatorConfig {
Expand Down Expand Up @@ -263,7 +314,7 @@ func TestRecombineOperator(t *testing.T) {
t.Run("FlushesOnShutdown", func(t *testing.T) {
cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "false"
cfg.IsFirstEntry = MatchAll
cfg.OutputIDs = []string{"fake"}
op, err := cfg.Build(testutil.Logger(t))
require.NoError(t, err)
Expand Down Expand Up @@ -335,7 +386,7 @@ func TestTimeout(t *testing.T) {

cfg := NewRecombineOperatorConfig("")
cfg.CombineField = entry.NewBodyField()
cfg.IsFirstEntry = "false"
cfg.IsFirstEntry = MatchAll
cfg.OutputIDs = []string{"fake"}
cfg.ForceFlushTimeout = 100 * time.Millisecond
op, err := cfg.Build(testutil.Logger(t))
Expand Down