Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/api/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ type IngestKafka struct {
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
}
62 changes: 46 additions & 16 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ type kafkaReadMessage interface {
}

type ingestKafka struct {
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
decoder decode.Decoder
in chan string
exitChan <-chan struct{}
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
decoder decode.Decoder
in chan string
exitChan <-chan struct{}
prevRecords []config.GenericMap // copy of most recently sent records; for testing and debugging
batchMaxLength int
}

const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(100)
const defaultBatchReadTimeout = int64(1000)
const defaultKafkaBatchMaxLength = 500
const defaultKafkaCommitInterval = 500

// Ingest ingests entries from kafka topic
func (ingestK *ingestKafka) Ingest(out chan<- []config.GenericMap) {
Expand Down Expand Up @@ -83,21 +86,36 @@ func (ingestK *ingestKafka) kafkaListener() {
func (ingestK *ingestKafka) processLogLines(out chan<- []config.GenericMap) {
var records []interface{}
duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond
flushRecords := time.NewTicker(duration)
for {
select {
case <-ingestK.exitChan:
log.Debugf("exiting ingestKafka because of signal")
return
case record := <-ingestK.in:
records = append(records, record)
case <-time.After(duration): // Maximum batch time for each batch
if len(records) >= ingestK.batchMaxLength {
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
out <- decoded
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
records = []interface{}{}
}
case <-flushRecords.C: // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestKafka sending %d records", len(records))
if len(ingestK.in) > 0 {
for len(records) < ingestK.batchMaxLength && len(ingestK.in) > 0 {
record := <-ingestK.in
records = append(records, record)
}
}
log.Debugf("ingestKafka sending %d records, %d entries waiting", len(records), len(ingestK.in))
decoded := ingestK.decoder.Decode(records)
out <- decoded
ingestK.prevRecords = decoded
log.Debugf("prevRecords = %v", ingestK.prevRecords)
out <- decoded
}
records = []interface{}{}
}
Expand Down Expand Up @@ -145,12 +163,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Infof("BatchReadTimeout = %d", jsonIngestKafka.BatchReadTimeout)

commitInterval := int64(defaultKafkaCommitInterval)
if jsonIngestKafka.CommitInterval != 0 {
commitInterval = jsonIngestKafka.CommitInterval
}

kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: jsonIngestKafka.Brokers,
Topic: jsonIngestKafka.Topic,
GroupID: jsonIngestKafka.GroupId,
GroupBalancers: groupBalancers,
StartOffset: startOffset,
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
})
if kafkaReader == nil {
errMsg := "NewIngestKafka: failed to create kafka-go reader"
Expand All @@ -164,12 +188,18 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
return nil, err
}

bml := defaultKafkaBatchMaxLength
if jsonIngestKafka.BatchMaxLen != 0 {
bml = jsonIngestKafka.BatchMaxLen
}

return &ingestKafka{
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
decoder: decoder,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]config.GenericMap, 0),
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
decoder: decoder,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]config.GenericMap, 0),
batchMaxLength: bml,
}, nil
}
69 changes: 63 additions & 6 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ parameters:
groupBalancers: ["rackAffinity"]
decoder:
type: json
batchMaxLen: 1000
commitInterval: 1000
`

func initNewIngestKafka(t *testing.T, configTemplate string) Ingester {
Expand All @@ -85,6 +87,8 @@ func Test_NewIngestKafka1(t *testing.T) {
require.Equal(t, "FirstOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 2, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, int64(300), ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(500), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(500)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func Test_NewIngestKafka2(t *testing.T) {
Expand All @@ -97,6 +101,8 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, "LastOffset", ingestKafka.kafkaParams.StartOffset)
require.Equal(t, 1, len(ingestKafka.kafkaReader.Config().GroupBalancers))
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
require.Equal(t, int(1000), ingestKafka.batchMaxLength)
require.Equal(t, time.Duration(1000)*time.Millisecond, ingestKafka.kafkaReader.Config().CommitInterval)
}

func removeTimestamp(receivedEntries []config.GenericMap) {
Expand Down Expand Up @@ -141,17 +147,16 @@ func Test_IngestKafka(t *testing.T) {
}

type fakeKafkaReader struct {
readToDo int
mock.Mock
}

var fakeRecord = []byte(`{"Bytes":20801,"DstAddr":"10.130.2.1","DstPort":36936,"Packets":401,"SrcAddr":"10.130.2.13","SrcPort":3100}`)

var performedRead = false

// ReadMessage runs in the kafka client thread, which blocks until data is available.
// If data is always available, we have an infinite loop. So we return data only once.
// If data is always available, we have an infinite loop. So we return data only a specified number of time.
func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, error) {
if performedRead {
if f.readToDo == 0 {
// block indefinitely
c := make(chan struct{})
<-c
Expand All @@ -160,7 +165,7 @@ func (f *fakeKafkaReader) ReadMessage(ctx context.Context) (kafkago.Message, err
Topic: "topic1",
Value: fakeRecord,
}
performedRead = true
f.readToDo -= 1
return message, nil
}

Expand All @@ -174,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
ingestKafka := newIngest.(*ingestKafka)

// change the ReadMessage function to the mock-up
fr := fakeKafkaReader{}
fr := fakeKafkaReader{readToDo: 1}
ingestKafka.kafkaReader = &fr

// run Ingest in a separate thread
Expand All @@ -192,3 +197,55 @@ func Test_KafkaListener(t *testing.T) {
require.Equal(t, 1, len(receivedEntries))
require.Equal(t, test.DeserializeJSONToMap(t, string(fakeRecord)), receivedEntries[0])
}

func Test_MaxBatchLength(t *testing.T) {
ingestOutput := make(chan []config.GenericMap)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

// change the ReadMessage function to the mock-up
fr := fakeKafkaReader{readToDo: 15}
ingestKafka.kafkaReader = &fr
ingestKafka.batchMaxLength = 10
ingestKafka.kafkaParams.BatchReadTimeout = 10000

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(ingestOutput)
}()

// wait for the data to have been processed
receivedEntries := <-ingestOutput

require.Equal(t, 10, len(receivedEntries))
}

func Test_BatchTimeout(t *testing.T) {
ingestOutput := make(chan []config.GenericMap)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

// change the ReadMessage function to the mock-up
fr := fakeKafkaReader{readToDo: 5}
ingestKafka.kafkaReader = &fr
ingestKafka.batchMaxLength = 1000
ingestKafka.kafkaParams.BatchReadTimeout = 100

beforeIngest := time.Now()
// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(ingestOutput)
}()

require.Equal(t, 0, len(ingestOutput))
// wait for the data to have been processed
receivedEntries := <-ingestOutput
require.Equal(t, 5, len(receivedEntries))

afterIngest := time.Now()

// We check that we get entries because of the timer
// Time must be above timer value but not too much, 20ms is our margin here
require.LessOrEqual(t, int64(100), afterIngest.Sub(beforeIngest).Milliseconds())
require.Greater(t, int64(120), afterIngest.Sub(beforeIngest).Milliseconds())
}