Skip to content

Commit

Permalink
Avoid json encoding on flow ingestion netobserv#212 (breaking change)
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Jun 16, 2022
1 parent 4549b72 commit ad9fb95
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 8 deletions.
6 changes: 1 addition & 5 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package ingest
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"time"
Expand Down Expand Up @@ -166,10 +165,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
log.Debugf("exiting ingestCollector because of signal")
return
case record := <-ingestC.in:
// TODO: for efficiency, consider forwarding directly as map,
// as this is reverted back from string to map in later pipeline stages
recordAsBytes, _ := json.Marshal(record)
records = append(records, string(recordAsBytes))
records = append(records, record)
if len(records) >= ingestC.batchMaxLength {
log.Debugf("ingestCollector sending %d entries", len(records))
linesProcessed.Add(float64(len(records)))
Expand Down
5 changes: 2 additions & 3 deletions pkg/pipeline/ingest/ingest_collector_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ingest

import (
"encoding/json"
"testing"
"time"

Expand Down Expand Up @@ -32,9 +31,9 @@ func TestIngest(t *testing.T) {

received := waitForFlows(t, client, forwarded)
require.NotEmpty(t, received)
require.IsType(t, "string", received[0])
flow := map[string]interface{}{}
require.NoError(t, json.Unmarshal([]byte(received[0].(string)), &flow))
require.IsType(t, flow, received[0])
flow = received[0].(map[string]interface{})
assert.EqualValues(t, 12345678, flow["TimeFlowStart"])
assert.EqualValues(t, 12345678, flow["TimeFlowEnd"])
assert.Equal(t, "1.2.3.4", flow["SrcAddr"])
Expand Down

0 comments on commit ad9fb95

Please sign in to comment.