diff --git a/pkg/api/ingest_kafka.go b/pkg/api/ingest_kafka.go index 87b69f7f5..efba8c36f 100644 --- a/pkg/api/ingest_kafka.go +++ b/pkg/api/ingest_kafka.go @@ -25,4 +25,6 @@ type IngestKafka struct { StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"` BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"` Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"` + BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"` + CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."` } diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index b9b96f650..26f2624be 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -36,16 +36,19 @@ type kafkaReadMessage interface { } type ingestKafka struct { - kafkaParams api.IngestKafka - kafkaReader kafkaReadMessage - decoder decode.Decoder - in chan string - exitChan <-chan struct{} - prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging + kafkaParams api.IngestKafka + kafkaReader kafkaReadMessage + decoder decode.Decoder + in chan string + exitChan <-chan struct{} + prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging + batchMaxLength int } const channelSizeKafka = 1000 -const defaultBatchReadTimeout = int64(100) +const defaultBatchReadTimeout = int64(1000) +const defaultKafkaBatchMaxLength = 500 +const defaultKafkaCommitInterval = 500 // Ingest ingests entries from kafka topic func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) { @@ -83,6 +86,7 @@ func (ingestK *ingestKafka) kafkaListener() { func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { var records []interface{} duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond + flushRecords := time.NewTicker(duration) for { select { case <-ingestK.exitChan: @@ -90,14 +94,28 @@ func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) { return case record := <-ingestK.in: records = append(records, record) - case <-time.After(duration): // Maximum batch time for each batch + if len(records) >= ingestK.batchMaxLength { + log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) + decoded := ingestK.decoder.Decode(records) + out <- decoded + ingestK.prevRecords = decoded + log.Debugf("prevRecords = %v", ingestK.prevRecords) + records = []interface{}{} + } + case <-flushRecords.C: // Maximum batch time for each batch // Process batch of records (if not empty) if len(records) > 0 { - log.Debugf("ingestKafka sending %d records", len(records)) + if len(ingestK.in) > 0 { + for len(records) < ingestK.batchMaxLength && len(ingestK.in) > 0 { + record := <-ingestK.in + records = append(records, record) + } + } + log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in)) decoded := ingestK.decoder.Decode(records) - out <- decoded ingestK.prevRecords = decoded log.Debugf("prevRecords = %v", ingestK.prevRecords) + out <- decoded } records = []interface{}{} } @@ -145,12 +163,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { } log.Infof("BatchReadTimeout = %d", jsonIngestKafka.BatchReadTimeout) + commitInterval := int64(defaultKafkaCommitInterval) + if jsonIngestKafka.CommitInterval != 0 { + commitInterval = jsonIngestKafka.CommitInterval + } + kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{ Brokers: jsonIngestKafka.Brokers, Topic: jsonIngestKafka.Topic, GroupID: jsonIngestKafka.GroupId, GroupBalancers: groupBalancers, StartOffset: startOffset, + CommitInterval: time.Duration(commitInterval) * time.Millisecond, }) if kafkaReader == nil { errMsg := "NewIngestKafka: failed to create kafka-go reader" @@ -164,12 +188,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { return nil, err } + bml := defaultKafkaBatchMaxLength + if jsonIngestKafka.BatchMaxLen != 0 { + bml = jsonIngestKafka.BatchMaxLen + } + return &ingestKafka{ - kafkaParams: jsonIngestKafka, - kafkaReader: kafkaReader, - decoder: decoder, - exitChan: utils.ExitChannel(), - in: make(chan string, channelSizeKafka), - prevRecords: make([]config.GenericMap, 0), + kafkaParams: jsonIngestKafka, + kafkaReader: kafkaReader, + decoder: decoder, + exitChan: utils.ExitChannel(), + in: make(chan string, channelSizeKafka), + prevRecords: make([]config.GenericMap, 0), + batchMaxLength: bml, }, nil } diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index f0a33a78b..872708c94 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -64,6 +64,8 @@ parameters: groupBalancers: ["rackAffinity"] decoder: type: json + batchMaxLen: 1000 + commitInterval: 1000 ` func initNewIngestKafka(t *testing.T, configTemplate string) Ingester { @@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) { require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset) require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers)) require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout) + require.Equal(t, int(500), ingestKafka.batchMaxLength) + require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval) } func Test_NewIngestKafka2(t *testing.T) { @@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) { require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset) require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers)) require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout) + require.Equal(t, int(1000), ingestKafka.batchMaxLength) + require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval) } func removeTimestamp(receivedEntries []config.GenericMap) { @@ -141,17 +147,16 @@ func Test_IngestKafka(t *testing.T) { } type fakeKafkaReader struct { + readToDo int mock.Mock } var fakeRecord = []byte(`{"Bytes":20801,"DstAddr":"10.130.2.1","DstPort":36936,"Packets":401,"SrcAddr":"10.130.2.13","SrcPort":3100}`) -var performedRead = false - // ReadMessage runs in the kafka client thread, which blocks until data is available. -// If data is always available, we have an infinite loop. So we return data only once. +// If data is always available, we have an infinite loop. So we return data only a specified number of time. func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, error) { - if performedRead { + if f.readToDo == 0 { // block indefinitely c := make(chan struct{}) <-c @@ -160,7 +165,7 @@ func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, err Topic: "topic1", Value: fakeRecord, } - performedRead = true + f.readToDo -= 1 return message, nil } @@ -174,7 +179,7 @@ func Test_KafkaListener(t *testing.T) { ingestKafka := newIngest.(*ingestKafka) // change the ReadMessage function to the mock-up - fr := fakeKafkaReader{} + fr := fakeKafkaReader{readToDo: 1} ingestKafka.kafkaReader = &fr // run Ingest in a separate thread @@ -192,3 +197,55 @@ func Test_KafkaListener(t *testing.T) { require.Equal(t, 1, len(receivedEntries)) require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0]) } + +func Test_MaxBatchLength(t *testing.T) { + ingestOutput := make(chan []config.GenericMap) + newIngest := initNewIngestKafka(t, testConfig1) + ingestKafka := newIngest.(*ingestKafka) + + // change the ReadMessage function to the mock-up + fr := fakeKafkaReader{readToDo: 15} + ingestKafka.kafkaReader = &fr + ingestKafka.batchMaxLength = 10 + ingestKafka.kafkaParams.BatchReadTimeout = 10000 + + // run Ingest in a separate thread + go func() { + ingestKafka.Ingest(ingestOutput) + }() + + // wait for the data to have been processed + receivedEntries := <-ingestOutput + + require.Equal(t, 10, len(receivedEntries)) +} + +func Test_BatchTimeout(t *testing.T) { + ingestOutput := make(chan []config.GenericMap) + newIngest := initNewIngestKafka(t, testConfig1) + ingestKafka := newIngest.(*ingestKafka) + + // change the ReadMessage function to the mock-up + fr := fakeKafkaReader{readToDo: 5} + ingestKafka.kafkaReader = &fr + ingestKafka.batchMaxLength = 1000 + ingestKafka.kafkaParams.BatchReadTimeout = 100 + + beforeIngest := time.Now() + // run Ingest in a separate thread + go func() { + ingestKafka.Ingest(ingestOutput) + }() + + require.Equal(t, 0, len(ingestOutput)) + // wait for the data to have been processed + receivedEntries := <-ingestOutput + require.Equal(t, 5, len(receivedEntries)) + + afterIngest := time.Now() + + // We check that we get entries because of the timer + // Time must be above timer value but not too much, 20ms is our margin here + require.LessOrEqual(t, int64(100), afterIngest.Sub(beforeIngest).Milliseconds()) + require.Greater(t, int64(120), afterIngest.Sub(beforeIngest).Milliseconds()) +}