From ad9fb95fb0a2801c365cd1048c20e481b096fefd Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Tue, 31 May 2022 17:07:28 +0200 Subject: [PATCH] Avoid json encoding on flow ingestion #212 (breaking change) --- pkg/pipeline/ingest/ingest_collector.go | 6 +----- pkg/pipeline/ingest/ingest_collector_test.go | 5 ++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index 00ca5fffe..f5e415dbf 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -20,7 +20,6 @@ package ingest import ( "context" "encoding/binary" - "encoding/json" "fmt" "net" "time" @@ -166,10 +165,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) { log.Debugf("exiting ingestCollector because of signal") return case record := <-ingestC.in: - // TODO: for efficiency, consider forwarding directly as map, - // as this is reverted back from string to map in later pipeline stages - recordAsBytes, _ := json.Marshal(record) - records = append(records, string(recordAsBytes)) + records = append(records, record) if len(records) >= ingestC.batchMaxLength { log.Debugf("ingestCollector sending %d entries", len(records)) linesProcessed.Add(float64(len(records))) diff --git a/pkg/pipeline/ingest/ingest_collector_test.go b/pkg/pipeline/ingest/ingest_collector_test.go index 64189afc2..faabbd008 100644 --- a/pkg/pipeline/ingest/ingest_collector_test.go +++ b/pkg/pipeline/ingest/ingest_collector_test.go @@ -1,7 +1,6 @@ package ingest import ( - "encoding/json" "testing" "time" @@ -32,9 +31,9 @@ func TestIngest(t *testing.T) { received := waitForFlows(t, client, forwarded) require.NotEmpty(t, received) - require.IsType(t, "string", received[0]) flow := map[string]interface{}{} - require.NoError(t, json.Unmarshal([]byte(received[0].(string)), &flow)) + require.IsType(t, flow, received[0]) + flow = received[0].(map[string]interface{}) assert.EqualValues(t, 12345678, flow["TimeFlowStart"]) assert.EqualValues(t, 12345678, flow["TimeFlowEnd"]) assert.Equal(t, "1.2.3.4", flow["SrcAddr"])