From 0f218fe517c4e39efd9cb05dd8a1bfca1af66177 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Mon, 18 Jul 2022 14:31:01 +0200 Subject: [PATCH 1/7] Added missing metrics to kafka ingester --- pkg/pipeline/ingest/ingest_grpc.go | 7 ++-- pkg/pipeline/ingest/ingest_kafka.go | 64 +++++++++++++++++++++-------- 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index 55bccfd49..0ea728726 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -28,9 +28,9 @@ const ( ) // Prometheus metrics describing the performance of the eBPF ingest +// This metrics are internal to the goflow2 library, we update them here manually do align with the IPFIX ingester var ( - flowDecoderCount = flow.DecoderStats.With( - prometheus.Labels{"worker": "", "name": decoderName}) + flowDecoderCount = flow.DecoderStats processDelaySummary = flow.NetFlowTimeStatsSum.With( prometheus.Labels{"version": decoderVersion, "router": ""}) flowTrafficBytesSum = flow.MetricPacketSizeSum @@ -113,7 +113,8 @@ func instrumentGRPC(port int) grpc2.UnaryServerInterceptor { } // instruments number of decoded flow messages - flowDecoderCount.Inc() + flowDecoderCount.With( + prometheus.Labels{"worker": "", "name": decoderName}).Inc() // instruments number of processed individual flows linesProcessed.Add(float64(len(flowRecords.Entries))) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 037b49e9b..6371e825c 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -19,13 +19,14 @@ package ingest import ( "errors" + "strconv" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/segmentio/kafka-go" + "github.com/prometheus/client_golang/prometheus" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -75,7 +76,15 @@ func (ingestK *ingestKafka) kafkaListener() { log.Errorln(err) } log.Debugf("string(kafkaMessage) = %s\n", string(kafkaMessage.Value)) - if len(kafkaMessage.Value) > 0 { + messageLen := len(kafkaMessage.Value) + if messageLen > 0 { + trafficLabels := prometheus.Labels{ + "type": ingestK.kafkaParams.Decoder.Type, + "remote_ip": ingestK.kafkaParams.Brokers[0], + "local_ip": "0.0.0.0", + "local_port": "0", + } + flowTrafficBytesSum.With(trafficLabels).Observe(float64(messageLen)) ingestK.in <- string(kafkaMessage.Value) } } @@ -83,6 +92,39 @@ func (ingestK *ingestKafka) kafkaListener() { } +func (ingestK *ingestKafka) processBatch(out chan<- []config.GenericMap, records []interface{}) { + log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) + + // Decode batch + decoded := ingestK.decoder.Decode(records) + + // Update metrics + flowDecoderCount.With( + prometheus.Labels{"worker": "", "name": ingestK.kafkaParams.Decoder.Type}).Inc() + linesProcessed.Add(float64(len(records))) + queueLength.Set(float64(len(out))) + ingestK.prevRecords = decoded + + for _, record := range decoded { + TimeFLowEndString, ok := record["TimeFlowEnd"].(string) + if ok { + TimeFlowEnd, err := strconv.ParseInt(TimeFLowEndString, 10, 64) + if err != nil { + log.Errorf("could not parse TimeFlowEnd") + flowErrors.With(prometheus.Labels{"router": "", "error": err.Error()}).Inc() + } + delay := time.Since(time.Unix(TimeFlowEnd, 0)).Seconds() + processDelaySummary.Observe(delay) + } else { + flowErrors.With(prometheus.Labels{"router": "", "error": "No TimeFlowEnd found"}).Inc() + } + } + + // Send batch + log.Debugf("prevRecords = %v", ingestK.prevRecords) + out <- decoded +} + // read items from ingestKafka input channel, pool them, and send down the pipeline func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { var records []interface{} @@ -96,14 +138,8 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { case record := <-ingestK.in: records = append(records, record) if len(records) >= ingestK.batchMaxLength { - log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) - decoded := ingestK.decoder.Decode(records) - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) - ingestK.prevRecords = decoded - log.Debugf("prevRecords = %v", ingestK.prevRecords) + ingestK.processBatch(out, records) records = []interface{}{} - out <- decoded } case <-flushRecords.C: // Maximum batch time for each batch // Process batch of records (if not empty) @@ -114,15 +150,9 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { records = append(records, record) } } - log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) - decoded := ingestK.decoder.Decode(records) - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) - ingestK.prevRecords = decoded - log.Debugf("prevRecords = %v", ingestK.prevRecords) - out <- decoded + ingestK.processBatch(out, records) + records = []interface{}{} } - records = []interface{}{} } } } From 416445fb111b40658c33d491e484d20372aeffe5 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Mon, 18 Jul 2022 16:40:53 +0200 Subject: [PATCH 2/7] Added timeout to kafka ingester tests --- pkg/pipeline/ingest/ingest_kafka_test.go | 11 +++++++---- pkg/test/utils.go | 13 +++++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index a409e8850..6d416de6d 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -18,6 +18,7 @@ package ingest import ( + "context" "testing" "time" @@ -27,7 +28,6 @@ import ( kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "golang.org/x/net/context" ) const testConfig1 = `--- @@ -189,7 +189,8 @@ func Test_KafkaListener(t *testing.T) { }() // wait for the data to have been processed - receivedEntries := <-ingestOutput + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2000) + require.NoError(t, err) // we remove timestamp for test stability // Timereceived field is tested in the decodeJson tests @@ -216,7 +217,8 @@ func Test_MaxBatchLength(t *testing.T) { }() // wait for the data to have been processed - receivedEntries := <-ingestOutput + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2000) + require.NoError(t, err) require.Equal(t, 10, len(receivedEntries)) } @@ -240,7 +242,8 @@ func Test_BatchTimeout(t *testing.T) { require.Equal(t, 0, len(ingestOutput)) // wait for the data to have been processed - receivedEntries := <-ingestOutput + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2000) + require.NoError(t, err) require.Equal(t, 5, len(receivedEntries)) afterIngest := time.Now() diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 431b09b13..7a514f372 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -20,6 +20,7 @@ package test import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "os" @@ -27,6 +28,7 @@ import ( "reflect" "strings" "testing" + "time" jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -179,3 +181,14 @@ func DumpToTemp(content string) (string, error, func()) { os.Remove(file.Name()) } } + +func WaitFromChannel(in chan []config.GenericMap, timeout int64) ([]config.GenericMap, error) { + duration := time.Duration(timeout) * time.Millisecond + timeoutReached := time.NewTicker(duration) + select { + case record := <-in: + return record, nil + case <-timeoutReached.C: + return nil, errors.New("Timeout reached") + } +} From 61720620ec6abca6c0d06f36efb971b19fa3f73e Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Tue, 19 Jul 2022 11:35:09 +0200 Subject: [PATCH 3/7] Fixed TimeFlowEnd parsing --- pkg/pipeline/ingest/ingest_kafka.go | 32 +++++++++++++++++------------ 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 6371e825c..44dc0b23b 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -19,7 +19,7 @@ package ingest import ( "errors" - "strconv" + "fmt" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -92,6 +92,23 @@ func (ingestK *ingestKafka) kafkaListener() { } +func processRecordDelay(record config.GenericMap) { + TimeFlowEndInterface, ok := record["TimeFlowEnd"] + if !ok { + flowErrors.With(prometheus.Labels{"router": "", "error": "No TimeFlowEnd found"}).Inc() + return + } else { + fmt.Println(TimeFlowEndInterface) + } + TimeFlowEnd, ok := TimeFlowEndInterface.(float64) + if !ok { + flowErrors.With(prometheus.Labels{"router": "", "error": "Cannot parse TimeFlowEnd"}).Inc() + return + } + delay := time.Since(time.Unix(int64(TimeFlowEnd), 0)).Seconds() + processDelaySummary.Observe(delay) +} + func (ingestK *ingestKafka) processBatch(out chan<- []config.GenericMap, records []interface{}) { log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) @@ -106,18 +123,7 @@ func (ingestK *ingestKafka) processBatch(out chan<- []config.GenericMap, records ingestK.prevRecords = decoded for _, record := range decoded { - TimeFLowEndString, ok := record["TimeFlowEnd"].(string) - if ok { - TimeFlowEnd, err := strconv.ParseInt(TimeFLowEndString, 10, 64) - if err != nil { - log.Errorf("could not parse TimeFlowEnd") - flowErrors.With(prometheus.Labels{"router": "", "error": err.Error()}).Inc() - } - delay := time.Since(time.Unix(TimeFlowEnd, 0)).Seconds() - processDelaySummary.Observe(delay) - } else { - flowErrors.With(prometheus.Labels{"router": "", "error": "No TimeFlowEnd found"}).Inc() - } + processRecordDelay(record) } // Send batch From 37f4bd3b1d270e777eb9b56a8e4d0b178917f734 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Tue, 19 Jul 2022 11:39:02 +0200 Subject: [PATCH 4/7] Fixed typo --- pkg/pipeline/ingest/ingest_grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index 0ea728726..b6e3687bf 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -28,7 +28,7 @@ const ( ) // Prometheus metrics describing the performance of the eBPF ingest -// This metrics are internal to the goflow2 library, we update them here manually do align with the IPFIX ingester +// This metrics are internal to the goflow2 library, we update them here manually to align with the IPFIX ingester var ( flowDecoderCount = flow.DecoderStats processDelaySummary = flow.NetFlowTimeStatsSum.With( From 23736561d07412a65caa50146b0489833f2f5cd1 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Thu, 21 Jul 2022 14:57:18 +0200 Subject: [PATCH 5/7] Removed debug trace --- pkg/pipeline/ingest/ingest_kafka.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 44dc0b23b..54b8e97c5 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -19,7 +19,6 @@ package ingest import ( "errors" - "fmt" "time" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -97,8 +96,6 @@ func processRecordDelay(record config.GenericMap) { if !ok { flowErrors.With(prometheus.Labels{"router": "", "error": "No TimeFlowEnd found"}).Inc() return - } else { - fmt.Println(TimeFlowEndInterface) } TimeFlowEnd, ok := TimeFlowEndInterface.(float64) if !ok { From ea0c1a03747bbccf1b87376adb789ed5b4f8acfb Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Mon, 25 Jul 2022 11:04:08 +0200 Subject: [PATCH 6/7] Changed WaitFromchannel function for a better comprehension --- pkg/pipeline/ingest/ingest_kafka_test.go | 6 +++--- pkg/test/utils.go | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index 6d416de6d..edea01198 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -189,7 +189,7 @@ func Test_KafkaListener(t *testing.T) { }() // wait for the data to have been processed - receivedEntries, err := test.WaitFromChannel(ingestOutput, 2000) + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second) require.NoError(t, err) // we remove timestamp for test stability @@ -217,7 +217,7 @@ func Test_MaxBatchLength(t *testing.T) { }() // wait for the data to have been processed - receivedEntries, err := test.WaitFromChannel(ingestOutput, 2000) + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second) require.NoError(t, err) require.Equal(t, 10, len(receivedEntries)) @@ -242,7 +242,7 @@ func Test_BatchTimeout(t *testing.T) { require.Equal(t, 0, len(ingestOutput)) // wait for the data to have been processed - receivedEntries, err := test.WaitFromChannel(ingestOutput, 2000) + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second) require.NoError(t, err) require.Equal(t, 5, len(receivedEntries)) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 7a514f372..bf1680a75 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -182,9 +182,8 @@ func DumpToTemp(content string) (string, error, func()) { } } -func WaitFromChannel(in chan []config.GenericMap, timeout int64) ([]config.GenericMap, error) { - duration := time.Duration(timeout) * time.Millisecond - timeoutReached := time.NewTicker(duration) +func WaitFromChannel(in chan []config.GenericMap, timeout time.Duration) ([]config.GenericMap, error) { + timeoutReached := time.NewTicker(timeout) select { case record := <-in: return record, nil From e4e388b45ff6b3aba8c1e1f85c49df7b56a4f168 Mon Sep 17 00:00:00 2001 From: Olivier Cazade Date: Mon, 25 Jul 2022 11:52:23 +0200 Subject: [PATCH 7/7] Only import kafkago once in kafka ingester --- pkg/pipeline/ingest/ingest_kafka.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 54b8e97c5..7ed4b56d6 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -206,9 +206,9 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { commitInterval = jsonIngestKafka.CommitInterval } - dialer := &kafka.Dialer{ - Timeout: kafka.DefaultDialer.Timeout, - DualStack: kafka.DefaultDialer.DualStack, + dialer := &kafkago.Dialer{ + Timeout: kafkago.DefaultDialer.Timeout, + DualStack: kafkago.DefaultDialer.DualStack, } if jsonIngestKafka.TLS != nil { log.Infof("Using TLS configuration: %v", jsonIngestKafka.TLS)