Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/pipeline/decode/decode_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package decode

import (
"encoding/json"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
Expand All @@ -42,6 +43,9 @@ func (c *DecodeJson) Decode(in []interface{}) []config.GenericMap {
continue
}
decodedLine2 := make(config.GenericMap, len(decodedLine))
// flows directly ingested by flp-transformer won't have this field, so we need to add it
// here. If the received line already contains the field, it will be overridden later
decodedLine2["TimeReceived"] = time.Now().Unix()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we should add a check to ensure there is no TimeReceived field?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, if there is no TimeReceived field, it will be added to the current payload. If there is an actual TimeReceived field, it will be added after this, overriding the value that we set here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it what we want? To override TimeReceived if the field is already here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok as a quick fix, but we can think about it and iterate from there.

Maybe it would make more sense to have it in the ingester stage rather than in the json decoder ? it looks more like an ingester responsibility to tell when something is received.
And I agree with Olivier that, if the field is already present, it shouldn't be overwritten. E.g. if we have:

(ovs) -> ipfix-ingester -> kafka-write -> kafka-read -> loki

the received time should be the one of ipfix-ingester .. or maybe at the opposite, on loki side?

As we use it to measure the collection latency (by comparing to FlowEndTime), maybe it makes actually more sense to deal with it directly in the loki write, after all.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained it wrong. The timereceived field is never overridden if its already there (please check the tests)

@jotak currently we set the value into the ipfix or protobuf ingesters but we added a new scenario: the eBPF agent can send it directly to the flowlogs-pipeline-transformer instance directly as a JSON via kafka.

In that case, we are bridging the ingesters and the receive time needs to be set in the transformer instance.

But if we agree that the actual ingest time should be now right before Loki writing, this could be removed and we wouldn't need to consider this special case.

for k, v := range decodedLine {
if v == nil {
continue
Expand Down
8 changes: 6 additions & 2 deletions pkg/pipeline/decode/decode_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDecodeJson(t *testing.T) {
newDecode := initNewDecodeJson(t)
decodeJson := newDecode.(*DecodeJson)
inputString1 := "{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true}"
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true, \"TimeReceived\":12345}"
inputString3 := "{}"
inputStringErr := "{\"varInt\": 14, \"varString\",\"testString2\", \"varBool\":true}"
var in []interface{}
Expand All @@ -49,7 +49,11 @@ func TestDecodeJson(t *testing.T) {
require.Equal(t, len(out), 3)
require.Equal(t, float64(12), out[0]["varInt"])
require.Equal(t, "testString", out[0]["varString"])
require.Equal(t, bool(false), out[0]["varBool"])
require.Equal(t, false, out[0]["varBool"])
// TimeReceived is added if it does not exist
require.NotZero(t, out[0]["TimeReceived"])
// TimeReceived is kept if it already existed
require.EqualValues(t, 12345, out[1]["TimeReceived"])

// TODO: Check for more complicated json structures
}
Expand Down