From 00be20f49f67cefc1ea50c377756befd1e3674ae Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Fri, 25 Feb 2022 14:48:57 +0100 Subject: [PATCH 1/5] feat(operator/recombine): do not combine logs before first_entry matches line Signed-off-by: Dominik Rosiek --- operator/transformer/recombine/recombine.go | 31 ++++++++---- .../transformer/recombine/recombine_test.go | 49 +++++++++++++++++++ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/operator/transformer/recombine/recombine.go b/operator/transformer/recombine/recombine.go index a7cefbc0..0c99847e 100644 --- a/operator/transformer/recombine/recombine.go +++ b/operator/transformer/recombine/recombine.go @@ -120,6 +120,7 @@ func (c *RecombineOperatorConfig) Build(logger *zap.SugaredLogger) (operator.Ope ticker: time.NewTicker(c.ForceFlushTimeout), chClose: make(chan struct{}), sourceIdentifier: c.SourceIdentifier, + firstLineMatched: make(map[string]bool), }, nil } @@ -139,6 +140,9 @@ type RecombineOperator struct { chClose chan struct{} sourceIdentifier entry.Field + // firstLineMatched keeps information if any entry matched first line + firstLineMatched map[string]bool + sync.Mutex batchMap map[string][]*entry.Entry } @@ -220,17 +224,26 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { s = DefaultSourceIdentifier } - // This is the first entry in the next batch - if matches && r.matchIndicatesFirst() { - // Flush the existing batch - err := r.flushSource(s) - if err != nil { - return err + // For first entry verification + if r.matchIndicatesFirst() { + // If entry matches, set firstLineMatched for source to true + if !r.firstLineMatched[s] && matches { + r.firstLineMatched[s] = true } - // Add the current log to the new batch - r.addToBatch(ctx, e, s) - return nil + // This is the first entry in the next batch + // or any entry if there wasn't any matching entry before + if matches || !r.firstLineMatched[s] { + // Flush the existing batch + err := r.flushSource(s) + if err != nil { + return err + } + + // Add the current log to the new batch + r.addToBatch(ctx, e, s) + return nil + } } // This is the last entry in a complete batch diff --git a/operator/transformer/recombine/recombine_test.go b/operator/transformer/recombine/recombine_test.go index 2bf2e6a3..c9d560aa 100644 --- a/operator/transformer/recombine/recombine_test.go +++ b/operator/transformer/recombine/recombine_test.go @@ -133,6 +133,55 @@ func TestRecombineOperator(t *testing.T) { entryWithBody(t2, "test1\ntest2"), }, }, + { + "EntriesNonMatchingForFirstEntry", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "$body == 'test1'" + cfg.OutputIDs = []string{"fake"} + cfg.OverwriteWith = "newest" + return cfg + }(), + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t2, "test3"), + entryWithBody(t2, "test4"), + entryWithBody(t2, "test5"), + }, + []*entry.Entry{ + entryWithBody(t1, "test2"), + entryWithBody(t2, "test3"), + entryWithBody(t2, "test4"), + }, + }, + { + "EntriesMatchingForFirstEntryOneFileOnly", + func() *RecombineOperatorConfig { + cfg := NewRecombineOperatorConfig("") + cfg.CombineField = entry.NewBodyField() + cfg.IsFirstEntry = "$body == 'file1'" + cfg.OutputIDs = []string{"fake"} + cfg.OverwriteWith = "newest" + return cfg + }(), + []*entry.Entry{ + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file3", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "file4", map[string]string{"file.path": "file2"}), + }, + []*entry.Entry{ + entryWithBodyAttr(t1, "file1\nfile3", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "file1\nfile2", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}), + }, + }, { "CombineWithEmptyString", func() *RecombineOperatorConfig { From ec19817a22bbbba8d8f095c3089936646912f333 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 28 Feb 2022 08:18:17 +0100 Subject: [PATCH 2/5] refactor(operator/recombine): improved flushing entries if no matches for first entry Signed-off-by: Dominik Rosiek --- operator/transformer/recombine/recombine.go | 44 ++++++++----------- .../transformer/recombine/recombine_test.go | 6 +-- 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/operator/transformer/recombine/recombine.go b/operator/transformer/recombine/recombine.go index 0c99847e..49abd713 100644 --- a/operator/transformer/recombine/recombine.go +++ b/operator/transformer/recombine/recombine.go @@ -120,7 +120,6 @@ func (c *RecombineOperatorConfig) Build(logger *zap.SugaredLogger) (operator.Ope ticker: time.NewTicker(c.ForceFlushTimeout), chClose: make(chan struct{}), sourceIdentifier: c.SourceIdentifier, - firstLineMatched: make(map[string]bool), }, nil } @@ -140,9 +139,6 @@ type RecombineOperator struct { chClose chan struct{} sourceIdentifier entry.Field - // firstLineMatched keeps information if any entry matched first line - firstLineMatched map[string]bool - sync.Mutex batchMap map[string][]*entry.Entry } @@ -224,30 +220,28 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { s = DefaultSourceIdentifier } - // For first entry verification - if r.matchIndicatesFirst() { - // If entry matches, set firstLineMatched for source to true - if !r.firstLineMatched[s] && matches { - r.firstLineMatched[s] = true - } - - // This is the first entry in the next batch - // or any entry if there wasn't any matching entry before - if matches || !r.firstLineMatched[s] { - // Flush the existing batch - err := r.flushSource(s) - if err != nil { - return err - } - - // Add the current log to the new batch - r.addToBatch(ctx, e, s) - return nil + switch { + // This is the first entry in the next batch + case matches && r.matchIndicatesFirst(): + // Flush the existing batch + err := r.flushSource(s) + if err != nil { + return err } - } + // Add the current log to the new batch + r.addToBatch(ctx, e, s) + return nil // This is the last entry in a complete batch - if matches && r.matchIndicatesLast() { + case matches && r.matchIndicatesLast(): + r.addToBatch(ctx, e, s) + err := r.flushSource(s) + if err != nil { + return err + } + return nil + // When matching on first entry, never batch partial first. Just emit immediately + case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0: r.addToBatch(ctx, e, s) err := r.flushSource(s) if err != nil { diff --git a/operator/transformer/recombine/recombine_test.go b/operator/transformer/recombine/recombine_test.go index c9d560aa..1129b85b 100644 --- a/operator/transformer/recombine/recombine_test.go +++ b/operator/transformer/recombine/recombine_test.go @@ -147,7 +147,6 @@ func TestRecombineOperator(t *testing.T) { entryWithBody(t1, "test2"), entryWithBody(t2, "test3"), entryWithBody(t2, "test4"), - entryWithBody(t2, "test5"), }, []*entry.Entry{ entryWithBody(t1, "test2"), @@ -173,7 +172,6 @@ func TestRecombineOperator(t *testing.T) { entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), entryWithBodyAttr(t2, "file2", map[string]string{"file.path": "file2"}), entryWithBodyAttr(t2, "file3", map[string]string{"file.path": "file2"}), - entryWithBodyAttr(t2, "file4", map[string]string{"file.path": "file2"}), }, []*entry.Entry{ entryWithBodyAttr(t1, "file1\nfile3", map[string]string{"file.path": "file1"}), @@ -312,7 +310,7 @@ func TestRecombineOperator(t *testing.T) { t.Run("FlushesOnShutdown", func(t *testing.T) { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsFirstEntry = "false" + cfg.IsFirstEntry = "true" cfg.OutputIDs = []string{"fake"} op, err := cfg.Build(testutil.Logger(t)) require.NoError(t, err) @@ -384,7 +382,7 @@ func TestTimeout(t *testing.T) { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsFirstEntry = "false" + cfg.IsFirstEntry = "true" cfg.OutputIDs = []string{"fake"} cfg.ForceFlushTimeout = 100 * time.Millisecond op, err := cfg.Build(testutil.Logger(t)) From 5d18534a0fc80c02025ab4e2dd33ae6802a90e3c Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Mon, 28 Feb 2022 10:42:28 +0100 Subject: [PATCH 3/5] refactor(recombine): fix lint Signed-off-by: Dominik Rosiek --- operator/transformer/recombine/recombine_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/operator/transformer/recombine/recombine_test.go b/operator/transformer/recombine/recombine_test.go index 1129b85b..26cc5c6e 100644 --- a/operator/transformer/recombine/recombine_test.go +++ b/operator/transformer/recombine/recombine_test.go @@ -26,6 +26,10 @@ import ( "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) +const ( + MatchAll string = "true" +) + func TestRecombineOperator(t *testing.T) { t1 := time.Date(2020, time.April, 11, 21, 34, 01, 0, time.UTC) t2 := time.Date(2020, time.April, 11, 21, 34, 02, 0, time.UTC) @@ -56,7 +60,7 @@ func TestRecombineOperator(t *testing.T) { func() *RecombineOperatorConfig { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsFirstEntry = "true" + cfg.IsFirstEntry = MatchAll cfg.OutputIDs = []string{"fake"} return cfg }(), @@ -68,7 +72,7 @@ func TestRecombineOperator(t *testing.T) { func() *RecombineOperatorConfig { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsLastEntry = "true" + cfg.IsLastEntry = MatchAll cfg.OutputIDs = []string{"fake"} return cfg }(), @@ -80,7 +84,7 @@ func TestRecombineOperator(t *testing.T) { func() *RecombineOperatorConfig { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsFirstEntry = "true" + cfg.IsFirstEntry = MatchAll cfg.OutputIDs = []string{"fake"} return cfg }(), @@ -92,7 +96,7 @@ func TestRecombineOperator(t *testing.T) { func() *RecombineOperatorConfig { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsLastEntry = "true" + cfg.IsLastEntry = MatchAll cfg.OutputIDs = []string{"fake"} return cfg }(), @@ -310,7 +314,7 @@ func TestRecombineOperator(t *testing.T) { t.Run("FlushesOnShutdown", func(t *testing.T) { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsFirstEntry = "true" + cfg.IsFirstEntry = MatchAll cfg.OutputIDs = []string{"fake"} op, err := cfg.Build(testutil.Logger(t)) require.NoError(t, err) @@ -382,7 +386,7 @@ func TestTimeout(t *testing.T) { cfg := NewRecombineOperatorConfig("") cfg.CombineField = entry.NewBodyField() - cfg.IsFirstEntry = "true" + cfg.IsFirstEntry = MatchAll cfg.OutputIDs = []string{"fake"} cfg.ForceFlushTimeout = 100 * time.Millisecond op, err := cfg.Build(testutil.Logger(t)) From 46c250ee4ddff84a9a1dfbe30482c41544bb645c Mon Sep 17 00:00:00 2001 From: Dominik Rosiek <58699848+sumo-drosiek@users.noreply.github.com> Date: Tue, 8 Mar 2022 07:54:38 +0100 Subject: [PATCH 4/5] Update operator/transformer/recombine/recombine.go Co-authored-by: Daniel Jaglowski --- operator/transformer/recombine/recombine.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/operator/transformer/recombine/recombine.go b/operator/transformer/recombine/recombine.go index 49abd713..94dd2e62 100644 --- a/operator/transformer/recombine/recombine.go +++ b/operator/transformer/recombine/recombine.go @@ -234,14 +234,11 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { return nil // This is the last entry in a complete batch case matches && r.matchIndicatesLast(): - r.addToBatch(ctx, e, s) - err := r.flushSource(s) - if err != nil { - return err - } - return nil + fallthrough // When matching on first entry, never batch partial first. Just emit immediately case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0: + r.addToBatch(ctx, e, s) + return r.flushSource(s) r.addToBatch(ctx, e, s) err := r.flushSource(s) if err != nil { From 98a740e0026eb89aa40ee54bb45dac125a4828b0 Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Tue, 8 Mar 2022 07:56:59 +0100 Subject: [PATCH 5/5] refactor(recombine): clean up unused code Signed-off-by: Dominik Rosiek --- operator/transformer/recombine/recombine.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/operator/transformer/recombine/recombine.go b/operator/transformer/recombine/recombine.go index 94dd2e62..9b13928c 100644 --- a/operator/transformer/recombine/recombine.go +++ b/operator/transformer/recombine/recombine.go @@ -239,12 +239,6 @@ func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { case !matches && r.matchIndicatesFirst() && len(r.batchMap[s]) == 0: r.addToBatch(ctx, e, s) return r.flushSource(s) - r.addToBatch(ctx, e, s) - err := r.flushSource(s) - if err != nil { - return err - } - return nil } // This is neither the first entry of a new log,