Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.1.0
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9
github.com/netsampler/goflow2 v1.0.4
github.com/prometheus/client_golang v1.12.1
Expand Down Expand Up @@ -74,6 +75,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
126 changes: 11 additions & 115 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Ingest struct {

type File struct {
Filename string
Loop bool
Chunks int
}

type Aws struct {
Expand Down
4 changes: 1 addition & 3 deletions pkg/pipeline/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package ingest

type ProcessFunction func(entries []interface{})

type Ingester interface {
Ingest(ProcessFunction)
Ingest(out chan<- []interface{})
}
type IngesterNone struct {
}
10 changes: 5 additions & 5 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,15 @@ func (w *TransportWrapper) Send(_, data []byte) error {
}

// Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2)
func (ingestC *ingestCollector) Ingest(process ProcessFunction) {
func (ingestC *ingestCollector) Ingest(out chan<- []interface{}) {
ctx := context.Background()
ingestC.in = make(chan map[string]interface{}, channelSize)

// initialize background listeners (a.k.a.netflow+legacy collector)
ingestC.initCollectorListener(ctx)

// forever process log lines received by collector
ingestC.processLogLines(process)

ingestC.processLogLines(out)
}

func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {
Expand Down Expand Up @@ -140,7 +139,7 @@ func (ingestC *ingestCollector) initCollectorListener(ctx context.Context) {

}

func (ingestC *ingestCollector) processLogLines(process ProcessFunction) {
func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) {
var records []interface{}
for {
select {
Expand All @@ -153,7 +152,8 @@ func (ingestC *ingestCollector) processLogLines(process ProcessFunction) {
case <-time.After(time.Millisecond * batchMaxTimeInMilliSecs): // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
process(records)
log.Debugf("ingestCollector sending %d entries", len(records))
out <- records
}
records = []interface{}{}
}
Expand Down
36 changes: 28 additions & 8 deletions pkg/pipeline/ingest/ingest_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ import (
)

type IngestFile struct {
params config.Ingest
exitChan chan bool
PrevRecords []interface{}
params config.Ingest
exitChan chan bool
PrevRecords []interface{}
TotalRecords int
}

const delaySeconds = 10
const (
delaySeconds = 10
chunkLines = 100
)

// Ingest ingests entries from a file and resends the same data every delaySeconds seconds
func (ingestF *IngestFile) Ingest(process ProcessFunction) {
func (ingestF *IngestFile) Ingest(out chan<- []interface{}) {
lines := make([]interface{}, 0)
file, err := os.Open(ingestF.params.File.Filename)
if err != nil {
Expand All @@ -53,11 +57,14 @@ func (ingestF *IngestFile) Ingest(process ProcessFunction) {
log.Debugf("%s", text)
lines = append(lines, text)
}

log.Debugf("Ingesting %d log lines from %s", len(lines), ingestF.params.File.Filename)
switch ingestF.params.Type {
case "file":
ingestF.PrevRecords = lines
process(lines)
ingestF.TotalRecords = len(lines)
log.Debugf("ingestFile sending %d lines", len(lines))
out <- lines
case "file_loop":
// loop forever
ticker := time.NewTicker(time.Duration(delaySeconds) * time.Second)
Expand All @@ -67,9 +74,22 @@ func (ingestF *IngestFile) Ingest(process ProcessFunction) {
log.Debugf("exiting ingestFile because of signal")
return
case <-ticker.C:
log.Debugf("ingestFile; for loop; before process")
ingestF.PrevRecords = lines
process(lines)
ingestF.TotalRecords += len(lines)
log.Debugf("ingestFile sending %d lines", len(lines))
out <- lines
}
}
case "file_chunks":
// sends the lines in chunks. Useful for testing parallelization
ingestF.TotalRecords = len(lines)
for len(lines) > 0 {
if len(lines) > chunkLines {
out <- lines[:chunkLines]
lines = lines[chunkLines:]
} else {
out <- lines
lines = nil
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(100)

// Ingest ingests entries from kafka topic
func (ingestK *ingestKafka) Ingest(process ProcessFunction) {
func (ingestK *ingestKafka) Ingest(out chan<- []interface{}) {
// initialize background listener
ingestK.kafkaListener()

// forever process log lines received by collector
ingestK.processLogLines(process)

ingestK.processLogLines(out)
}

// background thread to read kafka messages; place received items into ingestKafka input channel
Expand All @@ -79,7 +78,7 @@ func (ingestK *ingestKafka) kafkaListener() {
}

// read items from ingestKafka input channel, pool them, and send down the pipeline
func (ingestK *ingestKafka) processLogLines(process ProcessFunction) {
func (ingestK *ingestKafka) processLogLines(out chan<- []interface{}) {
var records []interface{}
duration := time.Duration(ingestK.kafkaParams.BatchReadTimeout) * time.Millisecond
for {
Expand All @@ -92,7 +91,8 @@ func (ingestK *ingestKafka) processLogLines(process ProcessFunction) {
case <-time.After(duration): // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
process(records)
log.Debugf("ingestKafka sending %d records", len(records))
out <- records
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KalmanMeth @mariomac do we even need this timer thing .... now that we have channels in between maybe we can just send the records every time we get one ... why do we need batch all together ?>

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Sending batches will anyway improve a bit the performance because you minimize the communication via channels but I think is worth giving a try and see if the simpler code and testing pays the overhead.

In a later PR I have fixed the timer and also made that, when the batch length reaches a given size, it is forwarded without waiting for the timer. We can try removing batches in yet another PR and evaluate the impact in the pipeline benchmark. That would also allow removing the "file_chunk" functionality.

ingestK.prevRecords = records
log.Debugf("prevRecords = %v", ingestK.prevRecords)
}
Expand Down
20 changes: 6 additions & 14 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,14 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
}

var receivedEntries []interface{}
var dummyChan chan bool

func dummyProcessFunction(entries []interface{}) {
receivedEntries = entries
dummyChan <- true
}

func Test_IngestKafka(t *testing.T) {
dummyChan = make(chan bool)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)
ingestOutput := make(chan []interface{})

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(dummyProcessFunction)
ingestKafka.Ingest(ingestOutput)
}()
// wait a second for the ingest pipeline to come up
time.Sleep(time.Second)
Expand All @@ -126,7 +118,7 @@ func Test_IngestKafka(t *testing.T) {
inChan <- record3

// wait for the data to have been processed
<-dummyChan
receivedEntries := <-ingestOutput

require.Equal(t, 3, len(receivedEntries))
require.Equal(t, record1, receivedEntries[0])
Expand Down Expand Up @@ -167,7 +159,7 @@ func (f *fakeKafkaReader) Config() kafkago.ReaderConfig {
}

func Test_KafkaListener(t *testing.T) {
dummyChan = make(chan bool)
ingestOutput := make(chan []interface{})
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

Expand All @@ -177,11 +169,11 @@ func Test_KafkaListener(t *testing.T) {

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(dummyProcessFunction)
ingestKafka.Ingest(ingestOutput)
}()

// wait for the data to have been processed
<-dummyChan
receivedEntries := <-ingestOutput

require.Equal(t, 1, len(receivedEntries))
require.Equal(t, string(fakeRecord), receivedEntries[0])
Expand Down
Loading