From 5b91c566203b83b59f3542df3466424494b9165b Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Thu, 24 Feb 2022 15:53:44 +0100 Subject: [PATCH 1/5] Fix basic pipeline test --- go.mod | 1 + go.sum | 4 ++-- pkg/pipeline/pipeline_test.go | 15 ++++++++++++++- pkg/test/utils.go | 2 +- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index efb91dde1..88b8ea1ce 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect google.golang.org/grpc v1.43.0 // indirect + gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.66.2 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect diff --git a/go.sum b/go.sum index c545a1f72..f73737533 100644 --- a/go.sum +++ b/go.sum @@ -776,8 +776,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4= -github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/segmentio/kafka-go v0.4.28 h1:ATYbyenAlsoFxnV+VpIJMF87bvRuRsX7fezHNfpwkdM= github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -1405,6 +1403,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY= +gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 2c15a1c14..45d7a346e 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -54,6 +54,7 @@ pipeline: transform: - type: generic generic: + rules: - input: Bytes output: fl2m_bytes - input: DstAddr @@ -87,7 +88,7 @@ func Test_SimplePipeline(t *testing.T) { config.Opt.PipeLine.Write.Type = "none" config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json" - val := v.Get("pipeline.transform\n") + val := v.Get("pipeline.transform") b, err = json.Marshal(&val) require.NoError(t, err) config.Opt.PipeLine.Transform = string(b) @@ -104,4 +105,16 @@ func Test_SimplePipeline(t *testing.T) { writer := mainPipeline.Writer.(*write.WriteNone) require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords)) require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords)) + + // checking that the processing is done for at least the first line of the logs + require.Equal(t, ingester.PrevRecords[0], decoder.PrevRecords[0]) + // values checked from the first line of the ../../hack/examples/ocp-ipfix-flowlogs.json file + require.Equal(t, config.GenericMap{ + "fl2m_bytes": float64(20800), + "fl2m_dstAddr": "10.130.2.2", + "fl2m_dstPort": float64(36936), + "fl2m_packets": float64(400), + "fl2m_srcAddr": "10.130.2.13", + "fl2m_srcPort": float64(3100), + }, writer.PrevRecords[0]) } diff --git a/pkg/test/utils.go b/pkg/test/utils.go index a83d964a2..2807cf5e7 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -52,7 +52,7 @@ func InitConfig(t *testing.T, conf string) *viper.Viper { v.SetConfigType("yaml") r := bytes.NewReader(yamlConfig) err := v.ReadConfig(r) - require.Equal(t, err, nil) + require.NoError(t, err) return v } From 8c6cf3136675570fb2d7e2a0f525d265fd40e40e Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Thu, 24 Feb 2022 16:15:12 +0100 Subject: [PATCH 2/5] Basic pipeline benchmark --- pkg/pipeline/pipeline_test.go | 55 +++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 45d7a346e..2bf9ef101 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -18,7 +18,9 @@ package pipeline import ( - "github.com/json-iterator/go" + "testing" + + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/ingest" @@ -26,7 +28,6 @@ import ( "github.com/netobserv/flowlogs2metrics/pkg/pipeline/write" "github.com/netobserv/flowlogs2metrics/pkg/test" "github.com/stretchr/testify/require" - "testing" ) func Test_transformToLoki(t *testing.T) { @@ -76,24 +77,9 @@ pipeline: ` func Test_SimplePipeline(t *testing.T) { - var json = jsoniter.ConfigCompatibleWithStandardLibrary - var mainPipeline *Pipeline - var err error - var b []byte - v := test.InitConfig(t, configTemplate) - config.Opt.PipeLine.Ingest.Type = "file" - config.Opt.PipeLine.Decode.Type = "json" - config.Opt.PipeLine.Extract.Type = "none" - config.Opt.PipeLine.Encode.Type = "none" - config.Opt.PipeLine.Write.Type = "none" - config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json" - - val := v.Get("pipeline.transform") - b, err = json.Marshal(&val) - require.NoError(t, err) - config.Opt.PipeLine.Transform = string(b) + loadGlobalConfig(t) - mainPipeline, err = NewPipeline() + mainPipeline, err := NewPipeline() require.NoError(t, err) // The file ingester reads the entire file, pushes it down the pipeline, and then exits @@ -118,3 +104,34 @@ func Test_SimplePipeline(t *testing.T) { "fl2m_srcPort": float64(3100), }, writer.PrevRecords[0]) } + +func BenchmarkPipeline(b *testing.B) { + t := &testing.T{} + loadGlobalConfig(t) + if t.Failed() { + b.Fatalf("unexpected error loading config") + } + for n := 0; n < b.N; n++ { + p, err := NewPipeline() + if err != nil { + t.Fatalf("unexpected error %s", err) + } + p.Run() + } +} + +func loadGlobalConfig(t *testing.T) { + var json = jsoniter.ConfigCompatibleWithStandardLibrary + v := test.InitConfig(t, configTemplate) + config.Opt.PipeLine.Ingest.Type = "file" + config.Opt.PipeLine.Decode.Type = "json" + config.Opt.PipeLine.Extract.Type = "none" + config.Opt.PipeLine.Encode.Type = "none" + config.Opt.PipeLine.Write.Type = "none" + config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json" + + val := v.Get("pipeline.transform") + b, err := json.Marshal(&val) + require.NoError(t, err) + config.Opt.PipeLine.Transform = string(b) +} From a795735d15b98966596d842177404dab7bcac6d7 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Thu, 24 Feb 2022 16:16:25 +0100 Subject: [PATCH 3/5] indenting transform rules --- pkg/pipeline/pipeline_test.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 2bf9ef101..dbe34388f 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -56,18 +56,18 @@ pipeline: - type: generic generic: rules: - - input: Bytes - output: fl2m_bytes - - input: DstAddr - output: fl2m_dstAddr - - input: DstPort - output: fl2m_dstPort - - input: Packets - output: fl2m_packets - - input: SrcAddr - output: fl2m_srcAddr - - input: SrcPort - output: fl2m_srcPort + - input: Bytes + output: fl2m_bytes + - input: DstAddr + output: fl2m_dstAddr + - input: DstPort + output: fl2m_dstPort + - input: Packets + output: fl2m_packets + - input: SrcAddr + output: fl2m_srcAddr + - input: SrcPort + output: fl2m_srcPort extract: type: none encode: From c3b2324953f5fbf30725889adb0942242e7368ba Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Thu, 24 Feb 2022 16:56:22 +0100 Subject: [PATCH 4/5] omit pipeline creation from benchmarks --- pkg/pipeline/pipeline_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index dbe34388f..035cba4f5 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -112,10 +112,12 @@ func BenchmarkPipeline(b *testing.B) { b.Fatalf("unexpected error loading config") } for n := 0; n < b.N; n++ { + b.StopTimer() p, err := NewPipeline() if err != nil { t.Fatalf("unexpected error %s", err) } + b.StartTimer() p.Run() } } From 391cc491bf970a9fc354ff4d9af43db5bd68c2b8 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Thu, 24 Feb 2022 20:22:10 +0100 Subject: [PATCH 5/5] Added FileChunks ingester to prepare it for parallel testing --- pkg/pipeline/decode/decode_json.go | 1 + pkg/pipeline/ingest/ingest_file_chunks.go | 85 +++++++++++++++++++++++ pkg/pipeline/pipeline.go | 3 + pkg/pipeline/pipeline_test.go | 5 ++ 4 files changed, 94 insertions(+) create mode 100644 pkg/pipeline/ingest/ingest_file_chunks.go diff --git a/pkg/pipeline/decode/decode_json.go b/pkg/pipeline/decode/decode_json.go index 553636301..dfd2f3f31 100644 --- a/pkg/pipeline/decode/decode_json.go +++ b/pkg/pipeline/decode/decode_json.go @@ -19,6 +19,7 @@ package decode import ( "encoding/json" + "github.com/netobserv/flowlogs2metrics/pkg/config" log "github.com/sirupsen/logrus" ) diff --git a/pkg/pipeline/ingest/ingest_file_chunks.go b/pkg/pipeline/ingest/ingest_file_chunks.go new file mode 100644 index 000000000..04f4092b9 --- /dev/null +++ b/pkg/pipeline/ingest/ingest_file_chunks.go @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ingest + +import ( + "bufio" + "fmt" + "os" + + "github.com/netobserv/flowlogs2metrics/pkg/config" + "github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils" + log "github.com/sirupsen/logrus" +) + +const chunkLines = 100 + +// FileChunks ingest entries from a file and resends them in chunks of fixed number of lines. +// It might be used to test processing speed in pipelines. +type FileChunks struct { + fileName string + PrevRecords []interface{} + TotalRecords int +} + +func (r *FileChunks) Ingest(process ProcessFunction) { + lines := make([]interface{}, 0, chunkLines) + file, err := os.Open(r.fileName) + if err != nil { + log.Fatal(err) + } + defer func() { + _ = file.Close() + }() + + scanner := bufio.NewScanner(file) + nLines := 0 + for scanner.Scan() { + text := scanner.Text() + lines = append(lines, text) + nLines++ + if nLines%chunkLines == 0 { + r.PrevRecords = lines + r.TotalRecords += len(lines) + process(lines) + // reset slice length without deallocating/reallocating memory + lines = lines[:0] + } + } + if len(lines) > 0 { + r.PrevRecords = lines + r.TotalRecords += len(lines) + process(lines) + } +} + +// NewFileChunks create a new ingester that sends entries in chunks of fixed number of lines. +func NewFileChunks() (Ingester, error) { + log.Debugf("entering NewIngestFile") + if config.Opt.PipeLine.Ingest.File.Filename == "" { + return nil, fmt.Errorf("ingest filename not specified") + } + + log.Infof("input file name = %s", config.Opt.PipeLine.Ingest.File.Filename) + + ch := make(chan bool, 1) + utils.RegisterExitChannel(ch) + return &FileChunks{ + fileName: config.Opt.PipeLine.Ingest.File.Filename, + }, nil +} diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 8ae8bcbc1..a4b151ac9 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -19,6 +19,7 @@ package pipeline import ( "fmt" + "github.com/heptiolabs/healthcheck" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode" @@ -52,6 +53,8 @@ func getIngester() (ingest.Ingester, error) { switch config.Opt.PipeLine.Ingest.Type { case "file", "file_loop": ingester, err = ingest.NewIngestFile() + case "file_chunks": + ingester, err = ingest.NewFileChunks() case "collector": ingester, err = ingest.NewIngestCollector() case "kafka": diff --git a/pkg/pipeline/pipeline_test.go b/pkg/pipeline/pipeline_test.go index 035cba4f5..4d3a66c16 100644 --- a/pkg/pipeline/pipeline_test.go +++ b/pkg/pipeline/pipeline_test.go @@ -20,6 +20,8 @@ package pipeline import ( "testing" + "github.com/sirupsen/logrus" + jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs2metrics/pkg/config" "github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode" @@ -89,6 +91,7 @@ func Test_SimplePipeline(t *testing.T) { ingester := mainPipeline.Ingester.(*ingest.IngestFile) decoder := mainPipeline.Decoder.(*decode.DecodeJson) writer := mainPipeline.Writer.(*write.WriteNone) + require.Equal(t, 5103, len(ingester.PrevRecords)) require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords)) require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords)) @@ -106,8 +109,10 @@ func Test_SimplePipeline(t *testing.T) { } func BenchmarkPipeline(b *testing.B) { + logrus.StandardLogger().SetLevel(logrus.ErrorLevel) t := &testing.T{} loadGlobalConfig(t) + config.Opt.PipeLine.Ingest.Type = "file_chunks" if t.Failed() { b.Fatalf("unexpected error loading config") }