diff --git a/cmd/flowlogs2metrics/main.go b/cmd/flowlogs2metrics/main.go index b7d3a0cbb..a3a770450 100644 --- a/cmd/flowlogs2metrics/main.go +++ b/cmd/flowlogs2metrics/main.go @@ -157,11 +157,6 @@ func main() { } func run() { - var ( - err error - mainPipeline *pipeline.Pipeline - ) - // Initial log message fmt.Printf("%s starting - version [%s]\n\n", filepath.Base(os.Args[0]), Version) @@ -172,11 +167,7 @@ func run() { utils.SetupElegantExit() // Create new flows pipeline - mainPipeline, err = pipeline.NewPipeline() - if err != nil { - log.Fatalf("failed to initialize pipeline %s", err) - os.Exit(1) - } + mainPipeline := pipeline.NewPipeline() // Starts the flows pipeline mainPipeline.Run() diff --git a/go.mod b/go.mod index efb91dde1..dca2f8ea1 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb github.com/ip2location/ip2location-go/v9 v9.2.0 github.com/json-iterator/go v1.1.12 + github.com/mariomac/go-pipes v0.1.0 github.com/mitchellh/mapstructure v1.4.3 github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c @@ -74,6 +75,7 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.43.0 // indirect + gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.66.2 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index c545a1f72..d1e77f9b0 100644 --- a/go.sum +++ b/go.sum @@ -587,6 +587,8 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= +github.com/mariomac/go-pipes v0.1.0 h1:iGbKfW0+iNQyvCvDG13kOTXyhzuXjHvijS5kcftMYOI= +github.com/mariomac/go-pipes v0.1.0/go.mod h1:UKxWnzK1YRLR7tY4OD9BjTQMImKl5MW6LviwEbiJjwg= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= @@ -776,8 +778,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4= -github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.28 h1:ATYbyenAlsoFxnV+VpIJMF87bvRuRsX7fezHNfpwkdM= github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -1405,6 +1405,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY= +gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/pipeline/ingest/ingest.go b/pkg/pipeline/ingest/ingest.go index dd149510f..de13fa7a2 100644 --- a/pkg/pipeline/ingest/ingest.go +++ b/pkg/pipeline/ingest/ingest.go @@ -17,10 +17,8 @@ package ingest -type ProcessFunction func(entries []interface{}) - type Ingester interface { - Ingest(ProcessFunction) + Ingest(out chan<- []interface{}) } type IngesterNone struct { } diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index e044c6625..fa89f3b27 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -22,11 +22,12 @@ import ( "encoding/binary" "encoding/json" "fmt" + "net" + "time" + "github.com/netobserv/flowlogs2metrics/pkg/api" "github.com/netobserv/flowlogs2metrics/pkg/config" pUtils "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" - "net" - "time" ms "github.com/mitchellh/mapstructure" goflowFormat "github.com/netsampler/goflow2/format" @@ -91,7 +92,7 @@ func (w *TransportWrapper) Send(_, data []byte) error { } // Ingest ingests entries from a network collector using goflow2 library (https://github.com/netsampler/goflow2) -func (r *ingestCollector) Ingest(process ProcessFunction) { +func (r *ingestCollector) Ingest(out chan<- []interface{}) { ctx := context.Background() r.in = make(chan map[string]interface{}, channelSize) @@ -99,7 +100,7 @@ func (r *ingestCollector) Ingest(process ProcessFunction) { r.initCollectorListener(ctx) // forever process log lines received by collector - r.processLogLines(process) + r.processLogLines(out) } @@ -140,7 +141,7 @@ func (r *ingestCollector) initCollectorListener(ctx context.Context) { } -func (r *ingestCollector) processLogLines(process ProcessFunction) { +func (r *ingestCollector) processLogLines(out chan<- []interface{}) { var records []interface{} for { select { @@ -153,7 +154,8 @@ func (r *ingestCollector) processLogLines(process ProcessFunction) { case <-time.After(time.Millisecond * batchMaxTimeInMilliSecs): // Maximum batch time for each batch // Process batch of records (if not empty) if len(records) > 0 { - process(records) + log.Debugf("ingestCollector sending %d entries", len(records)) + out <- records } records = []interface{}{} } diff --git a/pkg/pipeline/ingest/ingest_file.go b/pkg/pipeline/ingest/ingest_file.go index 43ef62c82..faa210f1f 100644 --- a/pkg/pipeline/ingest/ingest_file.go +++ b/pkg/pipeline/ingest/ingest_file.go @@ -20,11 +20,12 @@ package ingest import ( "bufio" "fmt" + "os" + "time" + "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" log "github.com/sirupsen/logrus" - "os" - "time" ) type IngestFile struct { @@ -36,7 +37,7 @@ type IngestFile struct { const delaySeconds = 10 // Ingest ingests entries from a file and resends the same data every delaySeconds seconds -func (r *IngestFile) Ingest(process ProcessFunction) { +func (r *IngestFile) Ingest(out chan<- []interface{}) { lines := make([]interface{}, 0) file, err := os.Open(r.fileName) if err != nil { @@ -56,7 +57,8 @@ func (r *IngestFile) Ingest(process ProcessFunction) { switch config.Opt.PipeLine.Ingest.Type { case "file": r.PrevRecords = lines - process(lines) + log.Debugf("ingestFile sending %d lines", len(lines)) + out <- lines case "file_loop": // loop forever ticker := time.NewTicker(time.Duration(delaySeconds) * time.Second) @@ -68,7 +70,8 @@ func (r *IngestFile) Ingest(process ProcessFunction) { case <-ticker.C: log.Debugf("ingestFile; for loop; before process") r.PrevRecords = lines - process(lines) + log.Debugf("ingestFile sending %d lines", len(lines)) + out <- lines } } } diff --git a/pkg/pipeline/ingest/ingest_file_chunks.go b/pkg/pipeline/ingest/ingest_file_chunks.go new file mode 100644 index 000000000..0a38b72b7 --- /dev/null +++ b/pkg/pipeline/ingest/ingest_file_chunks.go @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ingest + +import ( + "bufio" + "fmt" + "os" + + "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" + log "github.com/sirupsen/logrus" +) + +const chunkLines = 100 + +// FileChunks ingest entries from a file and resends them in chunks of fixed number of lines. +// It might be used to test processing speed in pipelines. +type FileChunks struct { + fileName string + PrevRecords []interface{} + TotalRecords int +} + +func (r *FileChunks) Ingest(out chan<- []interface{}) { + lines := make([]interface{}, 0, chunkLines) + file, err := os.Open(r.fileName) + if err != nil { + log.Fatal(err) + } + defer func() { + _ = file.Close() + }() + + scanner := bufio.NewScanner(file) + nLines := 0 + for scanner.Scan() { + text := scanner.Text() + lines = append(lines, text) + nLines++ + if nLines%chunkLines == 0 { + r.PrevRecords = lines + r.TotalRecords += len(lines) + out <- lines + // reset slice length without deallocating/reallocating memory + lines = lines[:0] + } + } + if len(lines) > 0 { + r.PrevRecords = lines + r.TotalRecords += len(lines) + out <- lines + } +} + +// NewFileChunks create a new ingester that sends entries in chunks of fixed number of lines. +func NewFileChunks() (Ingester, error) { + log.Debugf("entering NewIngestFile") + if config.Opt.PipeLine.Ingest.File.Filename == "" { + return nil, fmt.Errorf("ingest filename not specified") + } + + log.Infof("input file name = %s", config.Opt.PipeLine.Ingest.File.Filename) + + ch := make(chan bool, 1) + utils.RegisterExitChannel(ch) + return &FileChunks{ + fileName: config.Opt.PipeLine.Ingest.File.Filename, + }, nil +} diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 4cb472123..56527add1 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -20,13 +20,14 @@ package ingest import ( "encoding/json" "errors" + "time" + "github.com/netobserv/flowlogs2metrics/pkg/api" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "golang.org/x/net/context" - "time" ) type kafkaReadMessage interface { @@ -46,12 +47,12 @@ const channelSizeKafka = 1000 const defaultBatchReadTimeout = int64(100) // Ingest ingests entries from kafka topic -func (r *ingestKafka) Ingest(process ProcessFunction) { +func (r *ingestKafka) Ingest(out chan<- []interface{}) { // initialize background listener r.kafkaListener() // forever process log lines received by collector - r.processLogLines(process) + r.processLogLines(out) } @@ -79,7 +80,7 @@ func (r *ingestKafka) kafkaListener() { } // read items from ingestKafka input channel, pool them, and send down the pipeline -func (r *ingestKafka) processLogLines(process ProcessFunction) { +func (r *ingestKafka) processLogLines(out chan<- []interface{}) { var records []interface{} duration := time.Duration(r.kafkaParams.BatchReadTimeout) * time.Millisecond for { @@ -92,7 +93,8 @@ func (r *ingestKafka) processLogLines(process ProcessFunction) { case <-time.After(duration): // Maximum batch time for each batch // Process batch of records (if not empty) if len(records) > 0 { - process(records) + log.Debugf("ingestKafka sending %d records", len(records)) + out <- records r.prevRecords = records log.Debugf("prevRecords = %v", r.prevRecords) } diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index f0fc3a03d..560730320 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -18,6 +18,9 @@ package ingest import ( + "testing" + "time" + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/test" @@ -25,8 +28,6 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/net/context" - "testing" - "time" ) const testConfig1 = `--- @@ -114,22 +115,15 @@ func Test_NewIngestKafka2(t *testing.T) { require.Equal(t, defaultBatchReadTimeout, ingestKafka.kafkaParams.BatchReadTimeout) } -var receivedEntries []interface{} -var dummyChan chan bool - -func dummyProcessFunction(entries []interface{}) { - receivedEntries = entries - dummyChan <- true -} - func Test_IngestKafka(t *testing.T) { - dummyChan = make(chan bool) newIngest := initNewIngestKafka(t, testConfig1) ingestKafka := newIngest.(*ingestKafka) + ingestOutput := make(chan []interface{}) + defer close(ingestOutput) // run Ingest in a separate thread go func() { - ingestKafka.Ingest(dummyProcessFunction) + ingestKafka.Ingest(ingestOutput) }() // wait a second for the ingest pipeline to come up time.Sleep(time.Second) @@ -145,7 +139,7 @@ func Test_IngestKafka(t *testing.T) { inChan <- record3 // wait for the data to have been processed - <-dummyChan + receivedEntries := <-ingestOutput require.Equal(t, 3, len(receivedEntries)) require.Equal(t, record1, receivedEntries[0]) @@ -186,7 +180,7 @@ func (f *fakeKafkaReader) Config() kafkago.ReaderConfig { } func Test_KafkaListener(t *testing.T) { - dummyChan = make(chan bool) + ingestOutput := make(chan []interface{}) newIngest := initNewIngestKafka(t, testConfig1) ingestKafka := newIngest.(*ingestKafka) @@ -196,11 +190,11 @@ func Test_KafkaListener(t *testing.T) { // run Ingest in a separate thread go func() { - ingestKafka.Ingest(dummyProcessFunction) + ingestKafka.Ingest(ingestOutput) }() // wait for the data to have been processed - <-dummyChan + receivedEntries := <-ingestOutput require.Equal(t, 1, len(receivedEntries)) require.Equal(t, string(fakeRecord), receivedEntries[0]) diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 8ae8bcbc1..2fd3f4171 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -19,7 +19,9 @@ package pipeline import ( "fmt" + "github.com/heptiolabs/healthcheck" + "github.com/mariomac/go-pipes/pkg/node" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/encode" @@ -36,13 +38,19 @@ import ( // Pipeline manager type Pipeline struct { - Ingester ingest.Ingester - Decoder decode.Decoder - Transformers []transform.Transformer - Writer write.Writer - Extractor extract.Extractor - Encoder encode.Encoder - IsRunning bool + // todo: remove the fields below, which currently are only accessed from testing + ingester ingest.Ingester + decoder decode.Decoder + writer write.Writer + + IsRunning bool + + transformers []transform.Transformer + // list of nodes that need to be started in order to start the pipeline + start []*node.Init + // list of nodes whose finalization need to be checked before considering the whole + // pipeline as finalized + terminal []*node.Terminal } func getIngester() (ingest.Ingester, error) { @@ -52,6 +60,8 @@ func getIngester() (ingest.Ingester, error) { switch config.Opt.PipeLine.Ingest.Type { case "file", "file_loop": ingester, err = ingest.NewIngestFile() + case "file_chunks": + ingester, err = ingest.NewFileChunks() case "collector": ingester, err = ingest.NewIngestCollector() case "kafka": @@ -92,6 +102,10 @@ func getWriter() (write.Writer, error) { writer, _ = write.NewWriteNone() case "loki": writer, _ = write.NewWriteLoki() + case "prom": + writer, _ = write.NewPrometheus() + case "kafka": + writer, _ = write.NewKafka() default: panic("`write` not defined; if no writer needed, specify `none`") } @@ -116,12 +130,8 @@ func getEncoder() (encode.Encoder, error) { var encoder encode.Encoder var err error switch config.Opt.PipeLine.Encode.Type { - case "prom": - encoder, _ = encode.NewEncodeProm() case "json": encoder, _ = encode.NewEncodeJson() - case "kafka": - encoder, _ = encode.NewEncodeKafka() case "none": encoder, _ = encode.NewEncodeNone() default: @@ -131,7 +141,7 @@ func getEncoder() (encode.Encoder, error) { } // NewPipeline defines the pipeline elements -func NewPipeline() (*Pipeline, error) { +func NewPipeline() *Pipeline { log.Debugf("entering NewPipeline") ingester, _ := getIngester() decoder, _ := getDecoder() @@ -139,41 +149,66 @@ func NewPipeline() (*Pipeline, error) { writer, _ := getWriter() extractor, _ := getExtractor() encoder, _ := getEncoder() - - p := &Pipeline{ - Ingester: ingester, - Decoder: decoder, - Transformers: transformers, - Extractor: extractor, - Encoder: encoder, - Writer: writer, + // TODO: enable encoders and extracts when we enable the "flexible" pipeline + _, _ = extractor, encoder + + ingests := node.AsInit(ingester.Ingest) + decodes := node.AsMiddle(decodeLoop(decoder)) + transforms := node.AsMiddle(transformLoop(transformers)) + writes := node.AsTerminal(writeLoop(writer)) + //encodes := node.AsMiddle(encoder.Encode) + //extracts := node.AsMiddle(extractor.Extract) + + ingests.SendsTo(decodes) + decodes.SendsTo(transforms) + transforms.SendsTo(writes /*, extracts*/) + //extracts.SendsTo(encodes.Encode) + + return &Pipeline{ + transformers: transformers, + start: []*node.Init{ingests}, + terminal: []*node.Terminal{writes}, + ingester: ingester, + decoder: decoder, + writer: writer, } - - return p, nil } func (p *Pipeline) Run() { p.IsRunning = true - p.Ingester.Ingest(p.Process) + p.process() p.IsRunning = false } -// Process is called by the Ingester function -func (p Pipeline) Process(entries []interface{}) { +// Process is builds the processing graph and waits until it finishes +func (p Pipeline) process() { log.Debugf("entering pipeline.Process") - log.Debugf("number of entries = %d", len(entries)) - decoded := p.Decoder.Decode(entries) - transformed := make([]config.GenericMap, 0) - var flowEntry config.GenericMap - for _, entry := range decoded { - flowEntry = transform.ExecuteTransforms(p.Transformers, entry) - transformed = append(transformed, flowEntry) + + // starting the graph + for _, s := range p.start { + s.Start() } - _ = p.Writer.Write(transformed) + // blocking the execution until the graph terminal stages to end before + for _, t := range p.terminal { + <-t.Done() + } +} - extracted := p.Extractor.Extract(transformed) - _ = p.Encoder.Encode(extracted) +func transformLoop( + transformers []transform.Transformer, +) func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { + return func(in <-chan []config.GenericMap, out chan<- []config.GenericMap) { + for entries := range in { + transformed := make([]config.GenericMap, 0, len(entries)) + for _, entry := range entries { + // TODO: for consistent configurability, each transformer could be a node by itself + transformed = append(transformed, + transform.ExecuteTransforms(transformers, entry)) + } + out <- transformed + } + } } func (p *Pipeline) IsReady() healthcheck.Check { @@ -193,3 +228,19 @@ func (p *Pipeline) IsAlive() healthcheck.Check { return nil } } + +func decodeLoop(d decode.Decoder) func(in <-chan []interface{}, out chan<- []config.GenericMap) { + return func(in <-chan []interface{}, out chan<- []config.GenericMap) { + for i := range in { + out <- d.Decode(i) + } + } +} + +func writeLoop(w write.Writer) func(in <-chan []config.GenericMap) { + return func(in <-chan []config.GenericMap) { + for i := range in { + w.Write(i) + } + } +} diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 2c15a1c14..63ddc7156 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -18,7 +18,14 @@ package pipeline import ( - "github.com/json-iterator/go" + "testing" + "time" + + "github.com/mariomac/go-pipes/pkg/node" + + "github.com/sirupsen/logrus" + + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/ingest" @@ -26,20 +33,32 @@ import ( "github.com/netobserv/flowlogs2metrics/pkg/pipeline/write" "github.com/netobserv/flowlogs2metrics/pkg/test" "github.com/stretchr/testify/require" - "testing" ) func Test_transformToLoki(t *testing.T) { - var transformed []config.GenericMap - input := config.GenericMap{"key": "value"} - transform, err := transform.NewTransformNone() - require.NoError(t, err) - transformed = append(transformed, transform.Transform(input)) - config.Opt.PipeLine.Write.Loki = "{}" + input := []config.GenericMap{{"key": "value"}} + transNone, err := transform.NewTransformNone() + require.NoError(t, err) loki, err := write.NewWriteLoki() - loki.Write(transformed) require.NoError(t, err) + + start := node.AsInit(func(out chan<- []config.GenericMap) { + out <- input + }) + nodeTrans := node.AsMiddle(transformLoop([]transform.Transformer{transNone})) + nodeLoki := node.AsTerminal(loki.Write) + start.SendsTo(nodeTrans) + nodeTrans.SendsTo(nodeLoki) + + start.Start() + + select { + case <-nodeLoki.Done(): + // ok! + case <-time.After(2 * time.Second): + require.Fail(t, "timeout while waiting for pipeline to end") + } } const configTemplate = `--- @@ -54,18 +73,19 @@ pipeline: transform: - type: generic generic: - - input: Bytes - output: fl2m_bytes - - input: DstAddr - output: fl2m_dstAddr - - input: DstPort - output: fl2m_dstPort - - input: Packets - output: fl2m_packets - - input: SrcAddr - output: fl2m_srcAddr - - input: SrcPort - output: fl2m_srcPort + rules: + - input: Bytes + output: fl2m_bytes + - input: DstAddr + output: fl2m_dstAddr + - input: DstPort + output: fl2m_dstPort + - input: Packets + output: fl2m_packets + - input: SrcAddr + output: fl2m_srcAddr + - input: SrcPort + output: fl2m_srcPort extract: type: none encode: @@ -75,10 +95,53 @@ pipeline: ` func Test_SimplePipeline(t *testing.T) { + loadGlobalConfig(t) + + mainPipeline := NewPipeline() + + // The file ingester reads the entire file, pushes it down the pipeline, and then exits + // So we don't need to run it in a separate go-routine + mainPipeline.Run() + + // What is there left to check? Check length of saved data of each stage in private structure. + ingester := mainPipeline.ingester.(*ingest.IngestFile) + decoder := mainPipeline.decoder.(*decode.DecodeJson) + writer := mainPipeline.writer.(*write.WriteNone) + require.Equal(t, 5103, len(ingester.PrevRecords)) + require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords)) + require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords)) + + // checking that the processing is done for at least the first line of the logs + require.Equal(t, ingester.PrevRecords[0], decoder.PrevRecords[0]) + // values checked from the first line of the ../../hack/examples/ocp-ipfix-flowlogs.json file + require.Equal(t, config.GenericMap{ + "fl2m_bytes": float64(20800), + "fl2m_dstAddr": "10.130.2.2", + "fl2m_dstPort": float64(36936), + "fl2m_packets": float64(400), + "fl2m_srcAddr": "10.130.2.13", + "fl2m_srcPort": float64(3100), + }, writer.PrevRecords[0]) +} + +func BenchmarkPipeline(b *testing.B) { + logrus.StandardLogger().SetLevel(logrus.ErrorLevel) + t := &testing.T{} + loadGlobalConfig(t) + config.Opt.PipeLine.Ingest.Type = "file_chunks" + if t.Failed() { + b.Fatalf("unexpected error loading config") + } + for n := 0; n < b.N; n++ { + b.StopTimer() + p := NewPipeline() + b.StartTimer() + p.Run() + } +} + +func loadGlobalConfig(t *testing.T) { var json = jsoniter.ConfigCompatibleWithStandardLibrary - var mainPipeline *Pipeline - var err error - var b []byte v := test.InitConfig(t, configTemplate) config.Opt.PipeLine.Ingest.Type = "file" config.Opt.PipeLine.Decode.Type = "json" @@ -87,21 +150,8 @@ func Test_SimplePipeline(t *testing.T) { config.Opt.PipeLine.Write.Type = "none" config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json" - val := v.Get("pipeline.transform\n") - b, err = json.Marshal(&val) + val := v.Get("pipeline.transform") + b, err := json.Marshal(&val) require.NoError(t, err) config.Opt.PipeLine.Transform = string(b) - - mainPipeline, err = NewPipeline() - require.NoError(t, err) - - // The file ingester reads the entire file, pushes it down the pipeline, and then exits - // So we don't need to run it in a separate go-routine - mainPipeline.Run() - // What is there left to check? Check length of saved data of each stage in private structure. - ingester := mainPipeline.Ingester.(*ingest.IngestFile) - decoder := mainPipeline.Decoder.(*decode.DecodeJson) - writer := mainPipeline.Writer.(*write.WriteNone) - require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords)) - require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords)) } diff --git a/pkg/pipeline/write/write.go b/pkg/pipeline/write/write.go index c00050f02..8a06035ea 100644 --- a/pkg/pipeline/write/write.go +++ b/pkg/pipeline/write/write.go @@ -23,20 +23,19 @@ import ( ) type Writer interface { - Write(in []config.GenericMap) []config.GenericMap + Write(in []config.GenericMap) } type WriteNone struct { PrevRecords []config.GenericMap } // Write writes entries -func (t *WriteNone) Write(in []config.GenericMap) []config.GenericMap { +func (t *WriteNone) Write(in []config.GenericMap) { log.Debugf("entering Write none, in = %v", in) t.PrevRecords = in - return in } // NewWriteNone create a new write -func NewWriteNone() (Writer, error) { +func NewWriteNone() (*WriteNone, error) { return &WriteNone{}, nil } diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/write/write_kafka.go similarity index 88% rename from pkg/pipeline/encode/encode_kafka.go rename to pkg/pipeline/write/write_kafka.go index b8c4eb818..c64f45fe2 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/write/write_kafka.go @@ -15,16 +15,17 @@ * */ -package encode +package write import ( "encoding/json" + "time" + "github.com/netobserv/flowlogs2metrics/pkg/api" "github.com/netobserv/flowlogs2metrics/pkg/config" kafkago "github.com/segmentio/kafka-go" log "github.com/sirupsen/logrus" "golang.org/x/net/context" - "time" ) const ( @@ -36,17 +37,16 @@ type kafkaWriteMessage interface { WriteMessages(ctx context.Context, msgs ...kafkago.Message) error } -type encodeKafka struct { +type Kafka struct { kafkaParams api.EncodeKafka kafkaWriter kafkaWriteMessage } // Encode writes entries to kafka topic -func (r *encodeKafka) Encode(in []config.GenericMap) []interface{} { - log.Debugf("entering encodeKafka Encode, #items = %d", len(in)) +func (r *Kafka) Write(in []config.GenericMap) { + log.Debugf("entering Kafka Encode, #items = %d", len(in)) var msgs []kafkago.Message msgs = make([]kafkago.Message, 0) - out := make([]interface{}, 0) for _, entry := range in { var entryByteArray []byte entryByteArray, _ = json.Marshal(entry) @@ -54,17 +54,15 @@ func (r *encodeKafka) Encode(in []config.GenericMap) []interface{} { Value: entryByteArray, } msgs = append(msgs, msg) - out = append(out, entry) } err := r.kafkaWriter.WriteMessages(context.Background(), msgs...) if err != nil { - log.Errorf("encodeKafka error: %v", err) + log.Errorf("Kafka error: %v", err) } - return out } -// NewEncodeKafka create a new writer to kafka -func NewEncodeKafka() (Encoder, error) { +// NewKafka create a new writer to kafka +func NewKafka() (Writer, error) { log.Debugf("entering NewIngestKafka") encodeKafkaString := config.Opt.PipeLine.Encode.Kafka log.Debugf("encodeKafkaString = %s", encodeKafkaString) @@ -111,7 +109,7 @@ func NewEncodeKafka() (Encoder, error) { BatchBytes: jsonEncodeKafka.BatchBytes, } - return &encodeKafka{ + return &Kafka{ kafkaParams: jsonEncodeKafka, kafkaWriter: &kafkaWriter, }, nil diff --git a/pkg/pipeline/encode/encode_kafka_test.go b/pkg/pipeline/write/write_kafka_test.go similarity index 84% rename from pkg/pipeline/encode/encode_kafka_test.go rename to pkg/pipeline/write/write_kafka_test.go index ee71d3e8d..a1231fadc 100644 --- a/pkg/pipeline/encode/encode_kafka_test.go +++ b/pkg/pipeline/write/write_kafka_test.go @@ -15,10 +15,12 @@ * */ -package encode +package write import ( "encoding/json" + "testing" + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/test" @@ -26,13 +28,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/net/context" - "testing" ) const testKafkaConfig = `--- log-level: debug pipeline: - encode: + write: type: kafka kafka: address: 1.2.3.4:9092 @@ -41,6 +42,7 @@ pipeline: type fakeKafkaWriter struct { mock.Mock + receivedData []interface{} } var receivedData []interface{} @@ -50,33 +52,33 @@ func (f *fakeKafkaWriter) WriteMessages(ctx context.Context, msg ...kafkago.Mess return nil } -func initNewEncodeKafka(t *testing.T) Encoder { +func initNewKafkaWriter(t *testing.T) Writer { v := test.InitConfig(t, testKafkaConfig) - val := v.Get("pipeline.encode.kafka") + val := v.Get("pipeline.write.kafka") var json = jsoniter.ConfigCompatibleWithStandardLibrary b, err := json.Marshal(&val) require.NoError(t, err) config.Opt.PipeLine.Encode.Kafka = string(b) - newEncode, err := NewEncodeKafka() + kw, err := NewKafka() require.NoError(t, err) - return newEncode + return kw } func Test_EncodeKafka(t *testing.T) { - newEncode := initNewEncodeKafka(t) - encodeKafka := newEncode.(*encodeKafka) + kw := initNewKafkaWriter(t) + encodeKafka := kw.(*Kafka) require.Equal(t, "1.2.3.4:9092", encodeKafka.kafkaParams.Address) require.Equal(t, "topic1", encodeKafka.kafkaParams.Topic) - fw := fakeKafkaWriter{} + fw := fakeKafkaWriter{receivedData: []interface{}{}} encodeKafka.kafkaWriter = &fw - receivedData = make([]interface{}, 0) entry1 := test.GetExtractMockEntry() entry2 := test.GetIngestMockEntry(false) in := []config.GenericMap{entry1, entry2} - newEncode.Encode(in) + kw.Write(in) + var expectedOutputString1 []byte var expectedOutputString2 []byte expectedOutputString1, _ = json.Marshal(entry1) @@ -89,5 +91,5 @@ func Test_EncodeKafka(t *testing.T) { Value: expectedOutputString2, }, } - require.Equal(t, expectedOutput, receivedData[0]) + require.Equal(t, expectedOutput, fw.receivedData[0]) } diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index 2e18bf222..bc901aeab 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -20,12 +20,13 @@ package write import ( "encoding/json" "fmt" - "github.com/netobserv/flowlogs2metrics/pkg/api" - pUtils "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" "math" "strings" "time" + "github.com/netobserv/flowlogs2metrics/pkg/api" + pUtils "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" + "github.com/netobserv/flowlogs2metrics/pkg/config" logAdapter "github.com/go-kit/kit/log/logrus" @@ -203,13 +204,11 @@ func getFloat64(timestamp interface{}) (ft float64, ok bool) { } // Write writes a flow before being stored -func (l *Loki) Write(entries []config.GenericMap) []config.GenericMap { +func (l *Loki) Write(entries []config.GenericMap) { log.Debugf("entering Loki Write") for _, entry := range entries { l.in <- entry } - - return entries } func (l *Loki) processRecords() { diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/write/write_prom.go similarity index 91% rename from pkg/pipeline/encode/encode_prom.go rename to pkg/pipeline/write/write_prom.go index 6ee8f08df..03bc21e22 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/write/write_prom.go @@ -15,7 +15,7 @@ * */ -package encode +package write import ( "container/list" @@ -67,21 +67,19 @@ type metricCacheEntry struct { PromMetric } -type metricCache map[string]*metricCacheEntry - -type encodeProm struct { +type Prometheus struct { mu sync.Mutex port string prefix string metrics map[string]metricInfo expiryTime int64 mList *list.List - mCache metricCache + mCache map[string]*metricCacheEntry exitChan chan bool } -// Encode encodes a metric before being stored -func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} { +// Write encodes a metric before being stored +func (e *Prometheus) Write(metrics []config.GenericMap) { log.Debugf("entering encodeProm Encode") e.mu.Lock() defer e.mu.Unlock() @@ -94,10 +92,9 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []interface{} { log.Debugf("out = %v", out) log.Debugf("cache = %v", e.mCache) log.Debugf("list = %v", e.mList) - return out } -func (e *encodeProm) EncodeMetric(metric config.GenericMap) []interface{} { +func (e *Prometheus) EncodeMetric(metric config.GenericMap) []interface{} { log.Debugf("entering EncodeMetric metric = %v", metric) // TODO: We may need different handling for histograms out := make([]interface{}, 0) @@ -150,12 +147,19 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []interface{} { } func generateCacheKey(sig *entrySignature) string { - eInfoString := fmt.Sprintf("%s%v", sig.Name, sig.Labels) + // standard json package makes sure that labels are sorted in the + // same order even if the internal map structure is different + keyBytes, err := json.Marshal(sig) + if err != nil { + log.Fatalf("unexpected error marshalling JSON. This is a bug!: %s", err.Error()) + } + + eInfoString := string(keyBytes) log.Debugf("generateCacheKey: eInfoString = %s", eInfoString) return eInfoString } -func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry { +func (e *Prometheus) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry { // save item in cache; use eInfo as key to the cache var cEntry *metricCacheEntry nowInSecs := time.Now().Unix() @@ -182,7 +186,7 @@ func (e *encodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]st return cEntry } -func (e *encodeProm) cleanupExpiredEntriesLoop() { +func (e *Prometheus) cleanupExpiredEntriesLoop() { ticker := time.NewTicker(time.Duration(e.expiryTime) * time.Second) for { select { @@ -196,7 +200,7 @@ func (e *encodeProm) cleanupExpiredEntriesLoop() { } // cleanupExpiredEntries - any entry that has expired should be removed from the prometheus reporting and cache -func (e *encodeProm) cleanupExpiredEntries() { +func (e *Prometheus) cleanupExpiredEntries() { log.Debugf("entering cleanupExpiredEntries") e.mu.Lock() defer e.mu.Unlock() @@ -234,7 +238,7 @@ func (e *encodeProm) cleanupExpiredEntries() { } // startPrometheusInterface listens for prometheus resource usage requests -func startPrometheusInterface(w *encodeProm) { +func startPrometheusInterface(w *Prometheus) { log.Debugf("entering startPrometheusInterface") log.Infof("startPrometheusInterface: port num = %s", w.port) @@ -249,7 +253,7 @@ func startPrometheusInterface(w *encodeProm) { } } -func NewEncodeProm() (Encoder, error) { +func NewPrometheus() (Writer, error) { encodePromString := config.Opt.PipeLine.Encode.Prom log.Debugf("promEncodeString = %s", encodePromString) var jsonEncodeProm api.PromEncode @@ -315,13 +319,13 @@ func NewEncodeProm() (Encoder, error) { utils.RegisterExitChannel(ch) log.Debugf("metrics = %v", metrics) - w := &encodeProm{ + w := &Prometheus{ port: fmt.Sprintf(":%v", portNum), prefix: promPrefix, metrics: metrics, expiryTime: expiryTime, mList: list.New(), - mCache: make(metricCache), + mCache: map[string]*metricCacheEntry{}, exitChan: ch, } go startPrometheusInterface(w) diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/write/write_prom_test.go similarity index 73% rename from pkg/pipeline/encode/encode_prom_test.go rename to pkg/pipeline/write/write_prom_test.go index 82c9a557d..d4e676259 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/write/write_prom_test.go @@ -15,24 +15,27 @@ * */ -package encode +package write import ( "container/list" + "github.com/netobserv/flowlogs2metrics/pkg/testutils" + "github.com/prometheus/client_golang/prometheus" + "testing" + "time" + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/test" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" - "time" ) const testConfig = `--- log-level: debug pipeline: - encode: + write: type: prom prom: port: 9103 @@ -59,28 +62,28 @@ pipeline: labels: ` -func initNewEncodeProm(t *testing.T) Encoder { +func initNewPromWriter(t *testing.T) Writer { v := test.InitConfig(t, testConfig) - val := v.Get("pipeline.encode.prom") + val := v.Get("pipeline.write.prom") var json = jsoniter.ConfigCompatibleWithStandardLibrary b, err := json.Marshal(&val) require.Equal(t, err, nil) config.Opt.PipeLine.Encode.Prom = string(b) - newEncode, err := NewEncodeProm() + newEncode, err := NewPrometheus() require.Equal(t, err, nil) return newEncode } func Test_NewEncodeProm(t *testing.T) { - newEncode := initNewEncodeProm(t) - encodeProm := newEncode.(*encodeProm) - require.Equal(t, ":9103", encodeProm.port) - require.Equal(t, "test_", encodeProm.prefix) - require.Equal(t, 3, len(encodeProm.metrics)) - require.Equal(t, int64(1), encodeProm.expiryTime) - - metrics := encodeProm.metrics + newWrite := initNewPromWriter(t) + promWriter := newWrite.(*Prometheus) + require.Equal(t, ":9103", promWriter.port) + require.Equal(t, "test_", promWriter.prefix) + require.Equal(t, 3, len(promWriter.metrics)) + require.Equal(t, int64(1), promWriter.expiryTime) + + metrics := promWriter.metrics assert.Contains(t, metrics, "Bytes") gInfo := metrics["Bytes"] require.Equal(t, gInfo.input, "bytes") @@ -94,7 +97,7 @@ func Test_NewEncodeProm(t *testing.T) { require.Equal(t, cInfo.labelNames, expectedList) entry := test.GetExtractMockEntry() input := []config.GenericMap{entry} - output := encodeProm.Encode(input) + promWriter.Write(input) entryLabels1 := make(map[string]string, 3) entryLabels2 := make(map[string]string, 3) @@ -118,30 +121,35 @@ func Test_NewEncodeProm(t *testing.T) { }, value: float64(34), } - require.Contains(t, output, gEntryInfo1) - require.Contains(t, output, gEntryInfo2) + + testutils.Eventually(t, 5*time.Second, func(t require.TestingT) { + promWriter.mu.Lock() + defer promWriter.mu.Unlock() + require.Contains(t, promWriter.mCache, generateCacheKey(&gEntryInfo1.eInfo)) + require.Contains(t, promWriter.mCache, generateCacheKey(&gEntryInfo2.eInfo)) + }) gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1) require.Equal(t, nil, err) bytesA := testutil.ToFloat64(gaugeA) require.Equal(t, gEntryInfo1.value, bytesA) // verify entries are in cache; one for the gauge and one for the counter - entriesMap := encodeProm.mCache + entriesMap := promWriter.mCache require.Equal(t, 2, len(entriesMap)) eInfoBytes := generateCacheKey(&gEntryInfo1.eInfo) - encodeProm.mu.Lock() - _, found := encodeProm.mCache[string(eInfoBytes)] - encodeProm.mu.Unlock() + promWriter.mu.Lock() + _, found := promWriter.mCache[eInfoBytes] + promWriter.mu.Unlock() require.Equal(t, true, found) // wait a couple seconds so that the entry will expire time.Sleep(2 * time.Second) - encodeProm.cleanupExpiredEntries() - entriesMap = encodeProm.mCache - encodeProm.mu.Lock() + promWriter.cleanupExpiredEntries() + entriesMap = promWriter.mCache + promWriter.mu.Lock() require.Equal(t, 0, len(entriesMap)) - encodeProm.mu.Unlock() + promWriter.mu.Unlock() } func Test_EncodeAggregate(t *testing.T) { @@ -156,7 +164,7 @@ func Test_EncodeAggregate(t *testing.T) { "count": "1", }} - newEncode := &encodeProm{ + promWriter := &Prometheus{ port: ":0000", prefix: "test_", metrics: map[string]metricInfo{ @@ -166,10 +174,10 @@ func Test_EncodeAggregate(t *testing.T) { }, }, mList: list.New(), - mCache: make(metricCache), + mCache: map[string]*metricCacheEntry{}, } - output := newEncode.Encode(metrics) + promWriter.Write(metrics) gEntryInfo1 := entryInfo{ eInfo: entrySignature{ @@ -182,6 +190,13 @@ func Test_EncodeAggregate(t *testing.T) { value: float64(7), } - expectedOutput := []interface{}{gEntryInfo1} - require.Equal(t, output, expectedOutput) + expectedLabels := prometheus.Labels{ + "by": "[dstIP srcIP]", + "aggregate": "20.0.0.2,10.0.0.1", + } + cacheKey := generateCacheKey(&gEntryInfo1.eInfo) + require.Contains(t, promWriter.mCache, cacheKey) + cachedEntry := promWriter.mCache[cacheKey] + assert.Equal(t, cacheKey, cachedEntry.key) + assert.Equal(t, expectedLabels, cachedEntry.labels) } diff --git a/pkg/pipeline/write/write_stdout.go b/pkg/pipeline/write/write_stdout.go index 3609d2026..0b71b97f6 100644 --- a/pkg/pipeline/write/write_stdout.go +++ b/pkg/pipeline/write/write_stdout.go @@ -19,23 +19,22 @@ package write import ( "fmt" + "time" + "github.com/netobserv/flowlogs2metrics/pkg/config" log "github.com/sirupsen/logrus" - "time" ) type writeStdout struct { } // Write writes a flow before being stored -func (t *writeStdout) Write(in []config.GenericMap) []config.GenericMap { +func (t *writeStdout) Write(in []config.GenericMap) { log.Debugf("entering writeStdout Write") log.Debugf("writeStdout: number of entries = %d", len(in)) for _, v := range in { fmt.Printf("%s: %v\n", time.Now().Format(time.StampMilli), v) } - - return in } // NewWriteStdout create a new write diff --git a/pkg/pipeline/write/write_stdout_test.go b/pkg/pipeline/write/write_stdout_test.go index 10e226c4d..a5810eb98 100644 --- a/pkg/pipeline/write/write_stdout_test.go +++ b/pkg/pipeline/write/write_stdout_test.go @@ -18,19 +18,32 @@ package write import ( + "bufio" + "os" + "testing" + "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" ) func Test_WriteStdout(t *testing.T) { - ws := writeStdout{} + ws, err := NewWriteStdout() + require.NoError(t, err) + + // Intercept standard output + oldStdout := os.Stdout + r, w, _ := os.Pipe() + defer w.Close() + defer r.Close() + os.Stdout = w + ws.Write([]config.GenericMap{{"key": "test"}}) -} -func Test_NewWriteStdout(t *testing.T) { - writer, err := NewWriteStdout() - require.Nil(t, err) - require.Equal(t, writer, &writeStdout{}) + // read last line from standard output + line, err := bufio.NewReader(r).ReadString('\n') + require.NoError(t, err) + os.Stdout = oldStdout + assert.Contains(t, line, "map[key:test]") } diff --git a/pkg/pipeline/write/write_test.go b/pkg/pipeline/write/write_test.go index 9daa95529..fbeec0e70 100644 --- a/pkg/pipeline/write/write_test.go +++ b/pkg/pipeline/write/write_test.go @@ -18,19 +18,16 @@ package write import ( + "testing" + "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" ) func Test_Write(t *testing.T) { - wn := WriteNone{} + wn, err := NewWriteNone() + require.NoError(t, err) wn.Write([]config.GenericMap{{"key": "test"}}) -} - -func Test_NewWriteNone(t *testing.T) { - writer, err := NewWriteNone() - require.Nil(t, err) - require.Equal(t, writer, &WriteNone{}) - + assert.Equal(t, []config.GenericMap{{"key": "test"}}, wn.PrevRecords) } diff --git a/pkg/test/utils.go b/pkg/test/utils.go index a83d964a2..2807cf5e7 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -52,7 +52,7 @@ func InitConfig(t *testing.T, conf string) *viper.Viper { v.SetConfigType("yaml") r := bytes.NewReader(yamlConfig) err := v.ReadConfig(r) - require.Equal(t, err, nil) + require.NoError(t, err) return v } diff --git a/pkg/testutils/eventually.go b/pkg/testutils/eventually.go new file mode 100644 index 000000000..f98ac6095 --- /dev/null +++ b/pkg/testutils/eventually.go @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Eventually retries a test until it eventually succeeds. If the timeout is reached, the test fails +// with the same failure as its last execution. +func Eventually(t *testing.T, timeout time.Duration, testFunc func(_ require.TestingT)) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + success := make(chan interface{}) + errorCh := make(chan error) + failCh := make(chan error) + + go func() { + for ctx.Err() == nil { + result := testResult{failed: false, errorCh: errorCh, failCh: failCh} + // Executing the function to test + testFunc(&result) + // If the function didn't reported failure and didn't reached timeout + if !result.HasFailed() && ctx.Err() == nil { + success <- 1 + break + } + } + }() + + // Wait for success or timeout + var err, fail error + for { + select { + case <-success: + return + case err = <-errorCh: + case fail = <-failCh: + case <-ctx.Done(): + if err != nil { + t.Error(err) + } else if fail != nil { + t.Error(fail) + } else { + t.Error("timeout while waiting for test to complete") + } + return + } + } +} + +// util class for Eventually +type testResult struct { + sync.RWMutex + failed bool + errorCh chan<- error + failCh chan<- error +} + +func (te *testResult) Errorf(format string, args ...interface{}) { + te.Lock() + te.failed = true + te.Unlock() + te.errorCh <- fmt.Errorf(format, args...) +} + +func (te *testResult) FailNow() { + te.Lock() + te.failed = true + te.Unlock() + te.failCh <- errors.New("test failed") +} + +func (te *testResult) HasFailed() bool { + te.RLock() + defer te.RUnlock() + return te.failed +} diff --git a/pkg/testutils/eventually_test.go b/pkg/testutils/eventually_test.go new file mode 100644 index 000000000..43823e413 --- /dev/null +++ b/pkg/testutils/eventually_test.go @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEventually_Error(t *testing.T) { + t.Skip("We skip this test, as it is expected to fail") + Eventually(t, 10*time.Millisecond, func(t require.TestingT) { + require.True(t, false) + }) +} + +func TestEventually_Fail(t *testing.T) { + t.Skip("We skip this test, as it is expected to fail") + Eventually(t, 10*time.Millisecond, func(t require.TestingT) { + t.FailNow() + }) +} + +func TestEventually_Timeout(t *testing.T) { + t.Skip("We skip this test, as it is expected to fail") + Eventually(t, 10*time.Millisecond, func(t require.TestingT) { + time.Sleep(5 * time.Second) + }) +} + +func TestEventually_Success(t *testing.T) { + num := 3 + Eventually(t, 5*time.Second, func(t require.TestingT) { + require.Equal(t, 0, num) + num-- + }) +}