Skip to content
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,6 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/netobserv/gopipes v0.1.1 h1:f8zJsvnMgRFRa2B+1siwRtW0Y4dqeBROmkcI/HgT1gE=
github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 h1:c2swm3EamzgjBq9idNbEs5bNz20FJo/HK6uxyigXekQ=
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2 h1:K7SjoqEfzpMfIjHV85Lg8UDMvZu8rPfrsgKRoo7W30o=
Expand Down
9 changes: 9 additions & 0 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ func (tn *TransformNetwork) GetServiceFiles() (string, string) {
return p, s
}

const (
OpAddRegexIf = "add_regex_if"
OpAddIf = "add_if"
OpAddSubnet = "add_subnet"
OpAddLocation = "add_location"
OpAddService = "add_service"
OpAddKubernetes = "add_kubernetes"
)

type TransformNetworkOperationEnum struct {
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"`
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package config
import (
"bytes"
"encoding/json"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/sirupsen/logrus"
Expand All @@ -38,6 +39,7 @@ type ConfigFileStruct struct {
Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"`
Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"`
MetricsSettings MetricsSettings `yaml:"metricsSettings,omitempty" json:"metricsSettings,omitempty"`
PerfSettings PerfSettings `yaml:"perfSettings,omitempty" json:"perfSettings,omitempty"`
}

type Health struct {
Expand All @@ -60,6 +62,12 @@ type MetricsSettings struct {
NoPanic bool `yaml:"noPanic,omitempty" json:"noPanic,omitempty"`
}

// PerfSettings allows setting some internal configuration parameters
type PerfSettings struct {
BatcherMaxLen int `yaml:"batcherMaxLen,omitempty" json:"batcherMaxLen,omitempty"`
BatcherTimeout time.Duration `yaml:"batcherMaxTimeout,omitempty" json:"batcherMaxTimeout,omitempty"`
}

type Stage struct {
Name string `yaml:"name" json:"name"`
Follows string `yaml:"follows,omitempty" json:"follows,omitempty"`
Expand Down
4 changes: 3 additions & 1 deletion pkg/pipeline/aggregate_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ parameters:
// we use ElementsMatch() rather than Equals()
require.ElementsMatch(t, tt.expectedAggs, actualAggs)

promEncode.Encode(actualAggs)
for _, aa := range actualAggs {
promEncode.Encode(aa)
}
exposed := test.ReadExposedMetrics(t)

for _, expected := range tt.expectedEncode {
Expand Down
41 changes: 30 additions & 11 deletions pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package pipeline
import (
"bufio"
"os"
"sync/atomic"
"testing"
"time"

test2 "github.com/mariomac/guara/pkg/test"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
Expand Down Expand Up @@ -95,24 +97,31 @@ func TestConnTrack(t *testing.T) {
var err error
v, cfg := test.InitConfig(t, testConfigConntrack)
require.NotNil(t, v)
cfg.PerfSettings.BatcherMaxLen = 200_000
cfg.PerfSettings.BatcherTimeout = 2 * time.Second

mainPipeline, err = NewPipeline(cfg)
require.NoError(t, err)

go mainPipeline.Run()

in := mainPipeline.pipelineStages[0].Ingester.(*ingest.IngestFake).In
ingest := mainPipeline.pipelineStages[0].Ingester.(*ingest.IngestFake)
in := ingest.In
writer := mainPipeline.pipelineStages[2].Writer.(*write.WriteFake)

ingestFile(t, in, "../../hack/examples/ocp-ipfix-flowlogs.json")
writer.Wait()
writer.ResetWait()
sentLines := ingestFile(t, in, "../../hack/examples/ocp-ipfix-flowlogs.json")

// wait for all the lines to be ingested
test2.Eventually(t, 15*time.Second, func(t require.TestingT) {
require.EqualValues(t, atomic.LoadInt64(&ingest.Count), sentLines,
"sent: %d. got: %d", sentLines, ingest.Count)
}, test2.Interval(10*time.Millisecond))

// Wait a moment to make the connections expired
time.Sleep(2 * time.Second)
// Send an empty list to the pipeline to allow the connection tracking output end connection records
in <- []config.GenericMap{}
writer.Wait()

// Send something to the pipeline to allow the connection tracking output end connection records
in <- config.GenericMap{"DstAddr": "1.2.3.4"}

// Verify that the output records contain an expected end connection record.
expected := config.GenericMap{
Expand All @@ -131,10 +140,14 @@ func TestConnTrack(t *testing.T) {
"_RecordType": "endConnection",
"numFlowLogs": 5.0,
}
require.Containsf(t, writer.AllRecords, expected, "The output records don't include the expected record %v", expected)
// Wait for the record to be eventually forwarded to the writer
test2.Eventually(t, 15*time.Second, func(t require.TestingT) {
require.Containsf(t, writer.AllRecords(), expected,
"The output records don't include the expected record %v", expected)
})
}

func ingestFile(t *testing.T, in chan<- []config.GenericMap, filepath string) {
func ingestFile(t *testing.T, in chan<- config.GenericMap, filepath string) int {
t.Helper()
file, err := os.Open(filepath)
require.NoError(t, err)
Expand All @@ -147,8 +160,14 @@ func ingestFile(t *testing.T, in chan<- []config.GenericMap, filepath string) {
text := scanner.Text()
lines = append(lines, []byte(text))
}
submittedLines := 0
decoder, err := decode.NewDecodeJson()
require.NoError(t, err)
decoded := decoder.Decode(lines)
in <- decoded
for _, line := range lines {
line, err := decoder.Decode(line)
require.NoError(t, err)
in <- line
submittedLines++
}
return submittedLines
}
2 changes: 1 addition & 1 deletion pkg/pipeline/decode/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

type Decoder interface {
Decode(in [][]byte) []config.GenericMap
Decode(in []byte) (config.GenericMap, error)
}

func GetDecoder(params api.Decoder) (Decoder, error) {
Expand Down
40 changes: 17 additions & 23 deletions pkg/pipeline/decode/decode_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,26 @@ type DecodeJson struct {
}

// Decode decodes input strings to a list of flow entries
func (c *DecodeJson) Decode(in [][]byte) []config.GenericMap {
out := make([]config.GenericMap, 0)
for _, line := range in {
if log.IsLevelEnabled(log.DebugLevel) {
log.Debugf("decodeJson: line = %v", string(line))
}
var decodedLine map[string]interface{}
err := json.Unmarshal(line, &decodedLine)
if err != nil {
log.Errorf("decodeJson Decode: error unmarshalling a line: %v", err)
log.Errorf("line = %s", line)
func (c *DecodeJson) Decode(line []byte) (config.GenericMap, error) {

if log.IsLevelEnabled(log.DebugLevel) {
log.Debugf("decodeJson: line = %v", string(line))
}
var decodedLine map[string]interface{}
if err := json.Unmarshal(line, &decodedLine); err != nil {
return nil, err
}
decodedLine2 := make(config.GenericMap, len(decodedLine))
// flows directly ingested by flp-transformer won't have this field, so we need to add it
// here. If the received line already contains the field, it will be overridden later
decodedLine2["TimeReceived"] = time.Now().Unix()
for k, v := range decodedLine {
if v == nil {
continue
}
decodedLine2 := make(config.GenericMap, len(decodedLine))
// flows directly ingested by flp-transformer won't have this field, so we need to add it
// here. If the received line already contains the field, it will be overridden later
decodedLine2["TimeReceived"] = time.Now().Unix()
for k, v := range decodedLine {
if v == nil {
continue
}
decodedLine2[k] = v
}
out = append(out, decodedLine2)
decodedLine2[k] = v
}
return out
return decodedLine2, nil
}

// NewDecodeJson create a new decode
Expand Down
45 changes: 16 additions & 29 deletions pkg/pipeline/decode/decode_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package decode
import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/stretchr/testify/require"
)

Expand All @@ -33,41 +32,29 @@ func initNewDecodeJson(t *testing.T) Decoder {
func TestDecodeJson(t *testing.T) {
newDecode := initNewDecodeJson(t)
decodeJson := newDecode.(*DecodeJson)
inputString1 := "{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true, \"TimeReceived\":12345}"
inputString3 := "{}"
inputStringErr := "{\"varInt\": 14, \"varString\",\"testString2\", \"varBool\":true}"
var in [][]byte
var out []config.GenericMap
out = decodeJson.Decode(in)
require.Equal(t, 0, len(out))
in = append(in, []byte(inputString1))
in = append(in, []byte(inputString2))
in = append(in, []byte(inputString3))
in = append(in, []byte(inputStringErr))
out = decodeJson.Decode(in)
require.Equal(t, len(out), 3)
require.Equal(t, float64(12), out[0]["varInt"])
require.Equal(t, "testString", out[0]["varString"])
require.Equal(t, false, out[0]["varBool"])

out, err := decodeJson.Decode([]byte(
"{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"))
require.NoError(t, err)
require.Equal(t, float64(12), out["varInt"])
require.Equal(t, "testString", out["varString"])
require.Equal(t, false, out["varBool"])
// TimeReceived is added if it does not exist
require.NotZero(t, out[0]["TimeReceived"])
require.NotZero(t, out["TimeReceived"])

out, err = decodeJson.Decode([]byte(
"{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true, \"TimeReceived\":12345}"))
require.NoError(t, err)
// TimeReceived is kept if it already existed
require.EqualValues(t, 12345, out[1]["TimeReceived"])
require.EqualValues(t, 12345, out["TimeReceived"])

// TODO: Check for more complicated json structures
}

func TestDecodeJsonTimestamps(t *testing.T) {
newDecode := initNewDecodeJson(t)
decodeJson := newDecode.(*DecodeJson)
inputString1 := "{\"unixTime\": 1645104030 }"
var in [][]byte
var out []config.GenericMap
out = decodeJson.Decode(in)
require.Equal(t, 0, len(out))
in = append(in, []byte(inputString1))
out = decodeJson.Decode(in)
require.Equal(t, len(out), 1)
require.Equal(t, float64(1645104030), out[0]["unixTime"])
out, err := decodeJson.Decode([]byte("{\"unixTime\": 1645104030 }"))
require.NoError(t, err)
require.Equal(t, float64(1645104030), out["unixTime"])
}
30 changes: 6 additions & 24 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

var pflog = logrus.WithField("component", "Protobuf")

// Protobuf decodes protobuf flow records definitions, as forwarded by
// ingest.NetObservAgent, into a Generic Map that follows the same naming conventions
// as the IPFIX flows from ingest.IngestCollector
Expand All @@ -25,30 +22,15 @@ func NewProtobuf() (*Protobuf, error) {

// Decode decodes the protobuf raw flows and returns a list of GenericMaps representing all
// the flows there
func (p *Protobuf) Decode(rawFlows [][]byte) []config.GenericMap {
flows := make([]config.GenericMap, 0, len(rawFlows))
for _, pbRaw := range rawFlows {
record := pbflow.Record{}
if err := proto.Unmarshal(pbRaw, &record); err != nil {
pflog.WithError(err).Debug("can't unmarshall received protobuf flow. Ignoring")
continue
}
flows = append(flows, pbFlowToMap(&record))
func (p *Protobuf) Decode(rawFlow []byte) (config.GenericMap, error) {
record := pbflow.Record{}
if err := proto.Unmarshal(rawFlow, &record); err != nil {
return nil, fmt.Errorf("unmarshaling ProtoBuf record: %w", err)
}
return flows
}

// PBRecordsAsMaps transform all the flows in a pbflow.Records entry into a slice
// of GenericMaps
func PBRecordsAsMaps(flow *pbflow.Records) []config.GenericMap {
out := make([]config.GenericMap, 0, len(flow.Entries))
for _, entry := range flow.Entries {
out = append(out, pbFlowToMap(entry))
}
return out
return PBFlowToMap(&record), nil
}

func pbFlowToMap(flow *pbflow.Record) config.GenericMap {
func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
if flow == nil {
return config.GenericMap{}
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/pipeline/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func TestDecodeProtobuf(t *testing.T) {
rawPB, err := proto.Marshal(&flow)
require.NoError(t, err)

out := decoder.Decode([][]byte{rawPB})
require.Len(t, out, 1)
assert.NotZero(t, out[0]["TimeReceived"])
delete(out[0], "TimeReceived")
out, err := decoder.Decode(rawPB)
require.NoError(t, err)
assert.NotZero(t, out["TimeReceived"])
delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"Bytes": uint64(456),
Expand All @@ -64,10 +64,10 @@ func TestDecodeProtobuf(t *testing.T) {
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interface": "eth0",
}, out[0])
}, out)
}

func TestPBRecordsAsMaps(t *testing.T) {
func TestPBFlowToMap(t *testing.T) {
someTime := time.Now()
flow := &pbflow.Record{
Interface: "eth0",
Expand Down Expand Up @@ -96,10 +96,9 @@ func TestPBRecordsAsMaps(t *testing.T) {
},
}

out := PBRecordsAsMaps(&pbflow.Records{Entries: []*pbflow.Record{flow}})
require.Len(t, out, 1)
assert.NotZero(t, out[0]["TimeReceived"])
delete(out[0], "TimeReceived")
out := PBFlowToMap(flow)
assert.NotZero(t, out["TimeReceived"])
delete(out, "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"Bytes": uint64(456),
Expand All @@ -115,6 +114,6 @@ func TestPBRecordsAsMaps(t *testing.T) {
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interface": "eth0",
}, out[0])
}, out)

}
Loading