diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index 55bccfd49..b6e3687bf 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -28,9 +28,9 @@ const ( ) // Prometheus metrics describing the performance of the eBPF ingest +// This metrics are internal to the goflow2 library, we update them here manually to align with the IPFIX ingester var ( - flowDecoderCount = flow.DecoderStats.With( - prometheus.Labels{"worker": "", "name": decoderName}) + flowDecoderCount = flow.DecoderStats processDelaySummary = flow.NetFlowTimeStatsSum.With( prometheus.Labels{"version": decoderVersion, "router": ""}) flowTrafficBytesSum = flow.MetricPacketSizeSum @@ -113,7 +113,8 @@ func instrumentGRPC(port int) grpc2.UnaryServerInterceptor { } // instruments number of decoded flow messages - flowDecoderCount.Inc() + flowDecoderCount.With( + prometheus.Labels{"worker": "", "name": decoderName}).Inc() // instruments number of processed individual flows linesProcessed.Add(float64(len(flowRecords.Entries))) diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 037b49e9b..7ed4b56d6 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -25,7 +25,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" - "github.com/segmentio/kafka-go" + "github.com/prometheus/client_golang/prometheus" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "golang.org/x/net/context" @@ -75,7 +75,15 @@ func (ingestK *ingestKafka) kafkaListener() { log.Errorln(err) } log.Debugf("string(kafkaMessage) = %s\n", string(kafkaMessage.Value)) - if len(kafkaMessage.Value) > 0 { + messageLen := len(kafkaMessage.Value) + if messageLen > 0 { + trafficLabels := prometheus.Labels{ + "type": ingestK.kafkaParams.Decoder.Type, + "remote_ip": ingestK.kafkaParams.Brokers[0], + "local_ip": "0.0.0.0", + "local_port": "0", + } + flowTrafficBytesSum.With(trafficLabels).Observe(float64(messageLen)) ingestK.in <- string(kafkaMessage.Value) } } @@ -83,6 +91,43 @@ func (ingestK *ingestKafka) kafkaListener() { } +func processRecordDelay(record config.GenericMap) { + TimeFlowEndInterface, ok := record["TimeFlowEnd"] + if !ok { + flowErrors.With(prometheus.Labels{"router": "", "error": "No TimeFlowEnd found"}).Inc() + return + } + TimeFlowEnd, ok := TimeFlowEndInterface.(float64) + if !ok { + flowErrors.With(prometheus.Labels{"router": "", "error": "Cannot parse TimeFlowEnd"}).Inc() + return + } + delay := time.Since(time.Unix(int64(TimeFlowEnd), 0)).Seconds() + processDelaySummary.Observe(delay) +} + +func (ingestK *ingestKafka) processBatch(out chan<- []config.GenericMap, records []interface{}) { + log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) + + // Decode batch + decoded := ingestK.decoder.Decode(records) + + // Update metrics + flowDecoderCount.With( + prometheus.Labels{"worker": "", "name": ingestK.kafkaParams.Decoder.Type}).Inc() + linesProcessed.Add(float64(len(records))) + queueLength.Set(float64(len(out))) + ingestK.prevRecords = decoded + + for _, record := range decoded { + processRecordDelay(record) + } + + // Send batch + log.Debugf("prevRecords = %v", ingestK.prevRecords) + out <- decoded +} + // read items from ingestKafka input channel, pool them, and send down the pipeline func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { var records []interface{} @@ -96,14 +141,8 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { case record := <-ingestK.in: records = append(records, record) if len(records) >= ingestK.batchMaxLength { - log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) - decoded := ingestK.decoder.Decode(records) - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) - ingestK.prevRecords = decoded - log.Debugf("prevRecords = %v", ingestK.prevRecords) + ingestK.processBatch(out, records) records = []interface{}{} - out <- decoded } case <-flushRecords.C: // Maximum batch time for each batch // Process batch of records (if not empty) @@ -114,15 +153,9 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { records = append(records, record) } } - log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) - decoded := ingestK.decoder.Decode(records) - linesProcessed.Add(float64(len(records))) - queueLength.Set(float64(len(out))) - ingestK.prevRecords = decoded - log.Debugf("prevRecords = %v", ingestK.prevRecords) - out <- decoded + ingestK.processBatch(out, records) + records = []interface{}{} } - records = []interface{}{} } } } @@ -173,9 +206,9 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { commitInterval = jsonIngestKafka.CommitInterval } - dialer := &kafka.Dialer{ - Timeout: kafka.DefaultDialer.Timeout, - DualStack: kafka.DefaultDialer.DualStack, + dialer := &kafkago.Dialer{ + Timeout: kafkago.DefaultDialer.Timeout, + DualStack: kafkago.DefaultDialer.DualStack, } if jsonIngestKafka.TLS != nil { log.Infof("Using TLS configuration: %v", jsonIngestKafka.TLS) diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index a409e8850..edea01198 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -18,6 +18,7 @@ package ingest import ( + "context" "testing" "time" @@ -27,7 +28,6 @@ import ( kafkago "github.com/segmentio/kafka-go" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "golang.org/x/net/context" ) const testConfig1 = `--- @@ -189,7 +189,8 @@ func Test_KafkaListener(t *testing.T) { }() // wait for the data to have been processed - receivedEntries := <-ingestOutput + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second) + require.NoError(t, err) // we remove timestamp for test stability // Timereceived field is tested in the decodeJson tests @@ -216,7 +217,8 @@ func Test_MaxBatchLength(t *testing.T) { }() // wait for the data to have been processed - receivedEntries := <-ingestOutput + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second) + require.NoError(t, err) require.Equal(t, 10, len(receivedEntries)) } @@ -240,7 +242,8 @@ func Test_BatchTimeout(t *testing.T) { require.Equal(t, 0, len(ingestOutput)) // wait for the data to have been processed - receivedEntries := <-ingestOutput + receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second) + require.NoError(t, err) require.Equal(t, 5, len(receivedEntries)) afterIngest := time.Now() diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 431b09b13..bf1680a75 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -20,6 +20,7 @@ package test import ( "bytes" "encoding/json" + "errors" "fmt" "io/ioutil" "os" @@ -27,6 +28,7 @@ import ( "reflect" "strings" "testing" + "time" jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -179,3 +181,13 @@ func DumpToTemp(content string) (string, error, func()) { os.Remove(file.Name()) } } + +func WaitFromChannel(in chan []config.GenericMap, timeout time.Duration) ([]config.GenericMap, error) { + timeoutReached := time.NewTicker(timeout) + select { + case record := <-in: + return record, nil + case <-timeoutReached.C: + return nil, errors.New("Timeout reached") + } +}