Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const (
)

// Prometheus metrics describing the performance of the eBPF ingest
// This metrics are internal to the goflow2 library, we update them here manually to align with the IPFIX ingester
var (
flowDecoderCount = flow.DecoderStats.With(
prometheus.Labels{"worker": "", "name": decoderName})
flowDecoderCount = flow.DecoderStats
processDelaySummary = flow.NetFlowTimeStatsSum.With(
prometheus.Labels{"version": decoderVersion, "router": ""})
flowTrafficBytesSum = flow.MetricPacketSizeSum
Expand Down Expand Up @@ -113,7 +113,8 @@ func instrumentGRPC(port int) grpc2.UnaryServerInterceptor {
}

// instruments number of decoded flow messages
flowDecoderCount.Inc()
flowDecoderCount.With(
prometheus.Labels{"worker": "", "name": decoderName}).Inc()

// instruments number of processed individual flows
linesProcessed.Add(float64(len(flowRecords.Entries)))
Expand Down
73 changes: 53 additions & 20 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/segmentio/kafka-go"
"github.com/prometheus/client_golang/prometheus"
kafkago "github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
Expand Down Expand Up @@ -75,14 +75,59 @@ func (ingestK *ingestKafka) kafkaListener() {
log.Errorln(err)
}
log.Debugf("string(kafkaMessage) = %s\n", string(kafkaMessage.Value))
if len(kafkaMessage.Value) > 0 {
messageLen := len(kafkaMessage.Value)
if messageLen > 0 {
trafficLabels := prometheus.Labels{
"type": ingestK.kafkaParams.Decoder.Type,
"remote_ip": ingestK.kafkaParams.Brokers[0],
"local_ip": "0.0.0.0",
"local_port": "0",
}
flowTrafficBytesSum.With(trafficLabels).Observe(float64(messageLen))
ingestK.in <- string(kafkaMessage.Value)
}
}
}()

}

func processRecordDelay(record config.GenericMap) {
TimeFlowEndInterface, ok := record["TimeFlowEnd"]
if !ok {
flowErrors.With(prometheus.Labels{"router": "", "error": "No TimeFlowEnd found"}).Inc()
return
}
TimeFlowEnd, ok := TimeFlowEndInterface.(float64)
if !ok {
flowErrors.With(prometheus.Labels{"router": "", "error": "Cannot parse TimeFlowEnd"}).Inc()
return
}
delay := time.Since(time.Unix(int64(TimeFlowEnd), 0)).Seconds()
processDelaySummary.Observe(delay)
}

func (ingestK *ingestKafka) processBatch(out chan<- []config.GenericMap, records []interface{}) {
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))

// Decode batch
decoded := ingestK.decoder.Decode(records)

// Update metrics
flowDecoderCount.With(
prometheus.Labels{"worker": "", "name": ingestK.kafkaParams.Decoder.Type}).Inc()
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
ingestK.prevRecords = decoded

for _, record := range decoded {
processRecordDelay(record)
}

// Send batch
log.Debugf("prevRecords = %v", ingestK.prevRecords)
out <- decoded
}

// read items from ingestKafka input channel, pool them, and send down the pipeline
func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
var records []interface{}
Expand All @@ -96,14 +141,8 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
case record := <-ingestK.in:
records = append(records, record)
if len(records) >= ingestK.batchMaxLength {
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
ingestK.processBatch(out, records)
records = []interface{}{}
out <- decoded
}
case <-flushRecords.C: // Maximum batch time for each batch
// Process batch of records (if not empty)
Expand All @@ -114,15 +153,9 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
records = append(records, record)
}
}
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
out <- decoded
ingestK.processBatch(out, records)
records = []interface{}{}
}
records = []interface{}{}
}
}
}
Expand Down Expand Up @@ -173,9 +206,9 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
commitInterval = jsonIngestKafka.CommitInterval
}

dialer := &kafka.Dialer{
Timeout: kafka.DefaultDialer.Timeout,
DualStack: kafka.DefaultDialer.DualStack,
dialer := &kafkago.Dialer{
Timeout: kafkago.DefaultDialer.Timeout,
DualStack: kafkago.DefaultDialer.DualStack,
}
if jsonIngestKafka.TLS != nil {
log.Infof("Using TLS configuration: %v", jsonIngestKafka.TLS)
Expand Down
11 changes: 7 additions & 4 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package ingest

import (
"context"
"testing"
"time"

Expand All @@ -27,7 +28,6 @@ import (
kafkago "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)

const testConfig1 = `---
Expand Down Expand Up @@ -189,7 +189,8 @@ func Test_KafkaListener(t *testing.T) {
}()

// wait for the data to have been processed
receivedEntries := <-ingestOutput
receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second)
require.NoError(t, err)

// we remove timestamp for test stability
// Timereceived field is tested in the decodeJson tests
Expand All @@ -216,7 +217,8 @@ func Test_MaxBatchLength(t *testing.T) {
}()

// wait for the data to have been processed
receivedEntries := <-ingestOutput
receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second)
require.NoError(t, err)

require.Equal(t, 10, len(receivedEntries))
}
Expand All @@ -240,7 +242,8 @@ func Test_BatchTimeout(t *testing.T) {

require.Equal(t, 0, len(ingestOutput))
// wait for the data to have been processed
receivedEntries := <-ingestOutput
receivedEntries, err := test.WaitFromChannel(ingestOutput, 2*time.Second)
require.NoError(t, err)
require.Equal(t, 5, len(receivedEntries))

afterIngest := time.Now()
Expand Down
12 changes: 12 additions & 0 deletions pkg/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package test
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"reflect"
"strings"
"testing"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand Down Expand Up @@ -179,3 +181,13 @@ func DumpToTemp(content string) (string, error, func()) {
os.Remove(file.Name())
}
}

func WaitFromChannel(in chan []config.GenericMap, timeout time.Duration) ([]config.GenericMap, error) {
timeoutReached := time.NewTicker(timeout)
select {
case record := <-in:
return record, nil
case <-timeoutReached.C:
return nil, errors.New("Timeout reached")
}
}