Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions cmd/flowlogs2metrics/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,6 @@ func main() {
}

func run() {
var (
err error
mainPipeline *pipeline.Pipeline
)

// Initial log message
fmt.Printf("%s starting - version [%s]\n\n", filepath.Base(os.Args[0]), Version)

Expand All @@ -172,11 +167,7 @@ func run() {
utils.SetupElegantExit()

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline()
if err != nil {
log.Fatalf("failed to initialize pipeline %s", err)
os.Exit(1)
}
mainPipeline := pipeline.NewPipeline()

// Starts the flows pipeline
mainPipeline.Run()
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
github.com/mariomac/go-pipes v0.1.0
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9
github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c
Expand Down Expand Up @@ -74,6 +75,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mariomac/go-pipes v0.1.0 h1:iGbKfW0+iNQyvCvDG13kOTXyhzuXjHvijS5kcftMYOI=
github.com/mariomac/go-pipes v0.1.0/go.mod h1:UKxWnzK1YRLR7tY4OD9BjTQMImKl5MW6LviwEbiJjwg=
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
Expand Down Expand Up @@ -776,8 +778,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4=
github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.28 h1:ATYbyenAlsoFxnV+VpIJMF87bvRuRsX7fezHNfpwkdM=
github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down Expand Up @@ -1405,6 +1405,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY=
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 1 addition & 3 deletions pkg/pipeline/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package ingest

type ProcessFunction func(entries []interface{})

type Ingester interface {
Ingest(ProcessFunction)
Ingest(out chan<- []interface{})
}
type IngesterNone struct {
}
14 changes: 8 additions & 6 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"net"
"time"

"github.com/netobserv/flowlogs2metrics/pkg/api"
"github.com/netobserv/flowlogs2metrics/pkg/config"
pUtils "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
"net"
"time"

ms "github.com/mitchellh/mapstructure"
goflowFormat "github.com/netsampler/goflow2/format"
Expand Down Expand Up @@ -91,15 +92,15 @@ func (w *TransportWrapper) Send(_, data []byte) error {
}

// Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2)
func (r *ingestCollector) Ingest(process ProcessFunction) {
func (r *ingestCollector) Ingest(out chan<- []interface{}) {
ctx := context.Background()
r.in = make(chan map[string]interface{}, channelSize)

// initialize background listeners (a.k.a.netflow+legacy collector)
r.initCollectorListener(ctx)

// forever process log lines received by collector
r.processLogLines(process)
r.processLogLines(out)

}

Expand Down Expand Up @@ -140,7 +141,7 @@ func (r *ingestCollector) initCollectorListener(ctx context.Context) {

}

func (r *ingestCollector) processLogLines(process ProcessFunction) {
func (r *ingestCollector) processLogLines(out chan<- []interface{}) {
var records []interface{}
for {
select {
Expand All @@ -153,7 +154,8 @@ func (r *ingestCollector) processLogLines(process ProcessFunction) {
case <-time.After(time.Millisecond * batchMaxTimeInMilliSecs): // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
process(records)
log.Debugf("ingestCollector sending %d entries", len(records))
out <- records
}
records = []interface{}{}
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/pipeline/ingest/ingest_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package ingest
import (
"bufio"
"fmt"
"os"
"time"

"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
"os"
"time"
)

type IngestFile struct {
Expand All @@ -36,7 +37,7 @@ type IngestFile struct {
const delaySeconds = 10

// Ingest ingests entries from a file and resends the same data every delaySeconds seconds
func (r *IngestFile) Ingest(process ProcessFunction) {
func (r *IngestFile) Ingest(out chan<- []interface{}) {
lines := make([]interface{}, 0)
file, err := os.Open(r.fileName)
if err != nil {
Expand All @@ -56,7 +57,8 @@ func (r *IngestFile) Ingest(process ProcessFunction) {
switch config.Opt.PipeLine.Ingest.Type {
case "file":
r.PrevRecords = lines
process(lines)
log.Debugf("ingestFile sending %d lines", len(lines))
out <- lines
case "file_loop":
// loop forever
ticker := time.NewTicker(time.Duration(delaySeconds) * time.Second)
Expand All @@ -68,7 +70,8 @@ func (r *IngestFile) Ingest(process ProcessFunction) {
case <-ticker.C:
log.Debugf("ingestFile; for loop; before process")
r.PrevRecords = lines
process(lines)
log.Debugf("ingestFile sending %d lines", len(lines))
out <- lines
}
}
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/pipeline/ingest/ingest_file_chunks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2021 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"bufio"
"fmt"
"os"

"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
)

const chunkLines = 100

// FileChunks ingest entries from a file and resends them in chunks of fixed number of lines.
// It might be used to test processing speed in pipelines.
type FileChunks struct {
fileName string
PrevRecords []interface{}
TotalRecords int
}

func (r *FileChunks) Ingest(out chan<- []interface{}) {
lines := make([]interface{}, 0, chunkLines)
file, err := os.Open(r.fileName)
if err != nil {
log.Fatal(err)
}
defer func() {
_ = file.Close()
}()

scanner := bufio.NewScanner(file)
nLines := 0
for scanner.Scan() {
text := scanner.Text()
lines = append(lines, text)
nLines++
if nLines%chunkLines == 0 {
r.PrevRecords = lines
r.TotalRecords += len(lines)
out <- lines
// reset slice length without deallocating/reallocating memory
lines = lines[:0]
}
}
if len(lines) > 0 {
r.PrevRecords = lines
r.TotalRecords += len(lines)
out <- lines
}
}

// NewFileChunks create a new ingester that sends entries in chunks of fixed number of lines.
func NewFileChunks() (Ingester, error) {
log.Debugf("entering NewIngestFile")
if config.Opt.PipeLine.Ingest.File.Filename == "" {
return nil, fmt.Errorf("ingest filename not specified")
}

log.Infof("input file name = %s", config.Opt.PipeLine.Ingest.File.Filename)

ch := make(chan bool, 1)
utils.RegisterExitChannel(ch)
return &FileChunks{
fileName: config.Opt.PipeLine.Ingest.File.Filename,
}, nil
}
12 changes: 7 additions & 5 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package ingest
import (
"encoding/json"
"errors"
"time"

"github.com/netobserv/flowlogs2metrics/pkg/api"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
kafkago "github.com/segmentio/kafka-go"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"time"
)

type kafkaReadMessage interface {
Expand All @@ -46,12 +47,12 @@ const channelSizeKafka = 1000
const defaultBatchReadTimeout = int64(100)

// Ingest ingests entries from kafka topic
func (r *ingestKafka) Ingest(process ProcessFunction) {
func (r *ingestKafka) Ingest(out chan<- []interface{}) {
// initialize background listener
r.kafkaListener()

// forever process log lines received by collector
r.processLogLines(process)
r.processLogLines(out)

}

Expand Down Expand Up @@ -79,7 +80,7 @@ func (r *ingestKafka) kafkaListener() {
}

// read items from ingestKafka input channel, pool them, and send down the pipeline
func (r *ingestKafka) processLogLines(process ProcessFunction) {
func (r *ingestKafka) processLogLines(out chan<- []interface{}) {
var records []interface{}
duration := time.Duration(r.kafkaParams.BatchReadTimeout) * time.Millisecond
for {
Expand All @@ -92,7 +93,8 @@ func (r *ingestKafka) processLogLines(process ProcessFunction) {
case <-time.After(duration): // Maximum batch time for each batch
// Process batch of records (if not empty)
if len(records) > 0 {
process(records)
log.Debugf("ingestKafka sending %d records", len(records))
out <- records
r.prevRecords = records
log.Debugf("prevRecords = %v", r.prevRecords)
}
Expand Down
26 changes: 10 additions & 16 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package ingest

import (
"testing"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/test"
kafkago "github.com/segmentio/kafka-go"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"testing"
"time"
)

const testConfig1 = `---
Expand Down Expand Up @@ -114,22 +115,15 @@ func Test_NewIngestKafka2(t *testing.T) {
require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout)
}

var receivedEntries []interface{}
var dummyChan chan bool

func dummyProcessFunction(entries []interface{}) {
receivedEntries = entries
dummyChan <- true
}

func Test_IngestKafka(t *testing.T) {
dummyChan = make(chan bool)
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)
ingestOutput := make(chan []interface{})
defer close(ingestOutput)

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(dummyProcessFunction)
ingestKafka.Ingest(ingestOutput)
}()
// wait a second for the ingest pipeline to come up
time.Sleep(time.Second)
Expand All @@ -145,7 +139,7 @@ func Test_IngestKafka(t *testing.T) {
inChan <- record3

// wait for the data to have been processed
<-dummyChan
receivedEntries := <-ingestOutput

require.Equal(t, 3, len(receivedEntries))
require.Equal(t, record1, receivedEntries[0])
Expand Down Expand Up @@ -186,7 +180,7 @@ func (f *fakeKafkaReader) Config() kafkago.ReaderConfig {
}

func Test_KafkaListener(t *testing.T) {
dummyChan = make(chan bool)
ingestOutput := make(chan []interface{})
newIngest := initNewIngestKafka(t, testConfig1)
ingestKafka := newIngest.(*ingestKafka)

Expand All @@ -196,11 +190,11 @@ func Test_KafkaListener(t *testing.T) {

// run Ingest in a separate thread
go func() {
ingestKafka.Ingest(dummyProcessFunction)
ingestKafka.Ingest(ingestOutput)
}()

// wait for the data to have been processed
<-dummyChan
receivedEntries := <-ingestOutput

require.Equal(t, 1, len(receivedEntries))
require.Equal(t, string(fakeRecord), receivedEntries[0])
Expand Down
Loading