Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,6 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4=
github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/segmentio/kafka-go v0.4.28 h1:ATYbyenAlsoFxnV+VpIJMF87bvRuRsX7fezHNfpwkdM=
github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
Expand Down Expand Up @@ -1405,6 +1403,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY=
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/decode/decode_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package decode

import (
"encoding/json"

"github.com/netobserv/flowlogs2metrics/pkg/config"
log "github.com/sirupsen/logrus"
)
Expand Down
85 changes: 85 additions & 0 deletions pkg/pipeline/ingest/ingest_file_chunks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2021 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package ingest

import (
"bufio"
"fmt"
"os"

"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
)

const chunkLines = 100

// FileChunks ingest entries from a file and resends them in chunks of fixed number of lines.
// It might be used to test processing speed in pipelines.
type FileChunks struct {
fileName string
PrevRecords []interface{}
TotalRecords int
}

func (r *FileChunks) Ingest(process ProcessFunction) {
lines := make([]interface{}, 0, chunkLines)
file, err := os.Open(r.fileName)
if err != nil {
log.Fatal(err)
}
defer func() {
_ = file.Close()
}()

scanner := bufio.NewScanner(file)
nLines := 0
for scanner.Scan() {
text := scanner.Text()
lines = append(lines, text)
nLines++
if nLines%chunkLines == 0 {
r.PrevRecords = lines
r.TotalRecords += len(lines)
process(lines)
// reset slice length without deallocating/reallocating memory
lines = lines[:0]
}
}
if len(lines) > 0 {
r.PrevRecords = lines
r.TotalRecords += len(lines)
process(lines)
}
}

// NewFileChunks create a new ingester that sends entries in chunks of fixed number of lines.
func NewFileChunks() (Ingester, error) {
log.Debugf("entering NewIngestFile")
if config.Opt.PipeLine.Ingest.File.Filename == "" {
return nil, fmt.Errorf("ingest filename not specified")
}

log.Infof("input file name = %s", config.Opt.PipeLine.Ingest.File.Filename)

ch := make(chan bool, 1)
utils.RegisterExitChannel(ch)
return &FileChunks{
fileName: config.Opt.PipeLine.Ingest.File.Filename,
}, nil
}
3 changes: 3 additions & 0 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline

import (
"fmt"

"github.com/heptiolabs/healthcheck"
"github.com/netobserv/flowlogs2metrics/pkg/config"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode"
Expand Down Expand Up @@ -52,6 +53,8 @@ func getIngester() (ingest.Ingester, error) {
switch config.Opt.PipeLine.Ingest.Type {
case "file", "file_loop":
ingester, err = ingest.NewIngestFile()
case "file_chunks":
ingester, err = ingest.NewFileChunks()
case "collector":
ingester, err = ingest.NewIngestCollector()
case "kafka":
Expand Down
99 changes: 68 additions & 31 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package pipeline

import (
"github.com/json-iterator/go"
"testing"

"github.com/sirupsen/logrus"

jsoniter "github.com/json-iterator/go"
"github.com/netobserv/flowlogs2metrics/pkg/config"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goland editor will re-order these imports to be in alphabetical order.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In netobserv we usually sort the imports with goimports, which is independent of the IDE (many team members use VScode). It can be configured to be used from Goland/IDEA too:

image

But if this is an inconvenience for you I can adapt to use the default Goland. WDYT @jotak @jpinsonneau @OlivierCazade ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also using goimports (configured in vscode)

"github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/ingest"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/transform"
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/write"
"github.com/netobserv/flowlogs2metrics/pkg/test"
"github.com/stretchr/testify/require"
"testing"
)

func Test_transformToLoki(t *testing.T) {
Expand Down Expand Up @@ -54,18 +57,19 @@ pipeline:
transform:
- type: generic
generic:
- input: Bytes
output: fl2m_bytes
- input: DstAddr
output: fl2m_dstAddr
- input: DstPort
output: fl2m_dstPort
- input: Packets
output: fl2m_packets
- input: SrcAddr
output: fl2m_srcAddr
- input: SrcPort
output: fl2m_srcPort
rules:
- input: Bytes
output: fl2m_bytes
- input: DstAddr
output: fl2m_dstAddr
- input: DstPort
output: fl2m_dstPort
- input: Packets
output: fl2m_packets
- input: SrcAddr
output: fl2m_srcAddr
- input: SrcPort
output: fl2m_srcPort
extract:
type: none
encode:
Expand All @@ -75,24 +79,9 @@ pipeline:
`

func Test_SimplePipeline(t *testing.T) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var mainPipeline *Pipeline
var err error
var b []byte
v := test.InitConfig(t, configTemplate)
config.Opt.PipeLine.Ingest.Type = "file"
config.Opt.PipeLine.Decode.Type = "json"
config.Opt.PipeLine.Extract.Type = "none"
config.Opt.PipeLine.Encode.Type = "none"
config.Opt.PipeLine.Write.Type = "none"
config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json"
loadGlobalConfig(t)

val := v.Get("pipeline.transform\n")
b, err = json.Marshal(&val)
require.NoError(t, err)
config.Opt.PipeLine.Transform = string(b)

mainPipeline, err = NewPipeline()
mainPipeline, err := NewPipeline()
require.NoError(t, err)

// The file ingester reads the entire file, pushes it down the pipeline, and then exits
Expand All @@ -102,6 +91,54 @@ func Test_SimplePipeline(t *testing.T) {
ingester := mainPipeline.Ingester.(*ingest.IngestFile)
decoder := mainPipeline.Decoder.(*decode.DecodeJson)
writer := mainPipeline.Writer.(*write.WriteNone)
require.Equal(t, 5103, len(ingester.PrevRecords))
require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords))
require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords))

// checking that the processing is done for at least the first line of the logs
require.Equal(t, ingester.PrevRecords[0], decoder.PrevRecords[0])
// values checked from the first line of the ../../hack/examples/ocp-ipfix-flowlogs.json file
require.Equal(t, config.GenericMap{
"fl2m_bytes": float64(20800),
"fl2m_dstAddr": "10.130.2.2",
"fl2m_dstPort": float64(36936),
"fl2m_packets": float64(400),
"fl2m_srcAddr": "10.130.2.13",
"fl2m_srcPort": float64(3100),
}, writer.PrevRecords[0])
}

func BenchmarkPipeline(b *testing.B) {
logrus.StandardLogger().SetLevel(logrus.ErrorLevel)
t := &testing.T{}
loadGlobalConfig(t)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariomac can you look on https://github.com/netobserv/flowlogs-pipeline/blob/main/Makefile#L88 --- maybe we can somehow improve this >>> ???? I started to create some benchmark area for the project and I totally agree we need to improve that

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariomac In any event, can we agree to create dedicated go files just for benchmark and split from the rest of the tests so we can run those stand-alone??

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a look but doesn't seem to work for me... on each invocation I got:

panic: unexpected call to os.Exit(0) during test

github.com/netobserv/flowlogs2metrics/cmd/flowlogs2metrics.run()
	/vagrant/code/flowlogs2metrics/cmd/flowlogs2metrics/main.go:190 +0x1f2

So I decided to create also a very pipeline-specific test to compare the part we are evaluating to change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With respect to your second question, we could do that if you prefer it. Anyway benchmarks are not run by default even if they are in the same file as the tests. If you mean skipping tests when you run benchmarks, you can add the -test.run=^$ argument to the go test command to skip any test.

But I'm fine if you feel it's better organizing everyting in the same benchmarks file.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eranra I see what's happening with the benchmark in benchmark_test.go. It basically runs many times the whole command, so it would measure basically the performance of starting the FL2M process and finishing it, not the actual FL2M processing performance.

Go benchmarks are more a sort of "micro-benchmarks" aimed to test some parts of the code, and that's why they are usually located in the test files of the components that they are benchmarking. For example, in the benchmark of this PR, it just tests the time of sending and processing a file of ~5000 flows with a very simple dummy pipeline (no real ingest, no real writing...), but it allows us measuring the impact of the sequential vs parallel pipeline mechanism.

I'd suggest to (in another PR to not loose the focus of our current task) remove the current benchmark_test.go and prepare some benchmark that:

  1. starts the FL2M service with a real ingester
  2. spins up a client that is able to send real IPFIX flows (we did one using a VM library in our goflow-kube)
  3. Measures how many flows it's able to process in a given amount of time. E.g. the last stage of the pipeline could be just a counter.

In the future, this could be improved e.g. spinning parallel clients

config.Opt.PipeLine.Ingest.Type = "file_chunks"
if t.Failed() {
b.Fatalf("unexpected error loading config")
}
for n := 0; n < b.N; n++ {
b.StopTimer()
p, err := NewPipeline()
if err != nil {
t.Fatalf("unexpected error %s", err)
}
b.StartTimer()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the timing information used?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StopTimer and StartTimer are used to exclude from the benchmark some parts of the code that you don't want to measure. In this case we don't want to measure the Pipeline creation time but just the amount of metrics that you can forward.

p.Run()
}
}

func loadGlobalConfig(t *testing.T) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
v := test.InitConfig(t, configTemplate)
config.Opt.PipeLine.Ingest.Type = "file"
config.Opt.PipeLine.Decode.Type = "json"
config.Opt.PipeLine.Extract.Type = "none"
config.Opt.PipeLine.Encode.Type = "none"
config.Opt.PipeLine.Write.Type = "none"
config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json"

val := v.Get("pipeline.transform")
b, err := json.Marshal(&val)
require.NoError(t, err)
config.Opt.PipeLine.Transform = string(b)
}
2 changes: 1 addition & 1 deletion pkg/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func InitConfig(t *testing.T, conf string) *viper.Viper {
v.SetConfigType("yaml")
r := bytes.NewReader(yamlConfig)
err := v.ReadConfig(r)
require.Equal(t, err, nil)
require.NoError(t, err)
return v
}

Expand Down