From 2b8219218595eee59cf427548d26f763d2e55819 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Feb 2022 14:55:12 +0200 Subject: [PATCH 01/12] Add a test for extract_aggregate --- .../extract/extract_aggregate_test.go | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 pkg/pipeline/extract/extract_aggregate_test.go diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go new file mode 100644 index 000000000..7eca324df --- /dev/null +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package extract + +import ( + jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" + "testing" +) + +func Test_Extract(t *testing.T) { + yamlConfig := ` +aggregates: +- name: bandwidth + by: + - service + operation: sum + recordkey: bytes +` + var err error + yamlData := make(map[string]interface{}) + err = yaml.Unmarshal([]byte(yamlConfig), &yamlData) + require.NoError(t, err) + var json = jsoniter.ConfigCompatibleWithStandardLibrary + jsonBytes, err := json.Marshal(yamlData["aggregates"]) + require.NoError(t, err) + config.Opt.PipeLine.Extract.Aggregates = string(jsonBytes) + + extractAggregate, err := NewExtractAggregate() + require.NoError(t, err) + + input1 := []config.GenericMap{ + {"service": "http", "bytes": 10.0}, + {"service": "http", "bytes": 20.0}, + {"service": "tcp", "bytes": 1.0}, + {"service": "tcp", "bytes": 2.0}, + } + expectedAggs1 := []config.GenericMap{ + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "30.000000", + "bandwidth_value": "30.000000", + "recentRawValues": []float64{10.0, 20.0}, + "count": "2", + }, + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "3.000000", + "bandwidth_value": "3.000000", + "recentRawValues": []float64{1.0, 2.0}, + "count": "2", + }, + } + actualAggs1 := extractAggregate.Extract(input1) + require.Equal(t, expectedAggs1, actualAggs1) + + input2 := []config.GenericMap{ + {"service": "http", "bytes": 30.0}, + {"service": "tcp", "bytes": 4.0}, + {"service": "tcp", "bytes": 5.0}, + } + expectedAggs2 := []config.GenericMap{ + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "60.000000", + "bandwidth_value": "60.000000", + "recentRawValues": []float64{30.0}, + "count": "3", + }, + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "12.000000", + "bandwidth_value": "12.000000", + "recentRawValues": []float64{4.0, 5.0}, + "count": "4", + }, + } + actualAggs2 := extractAggregate.Extract(input2) + require.Equal(t, expectedAggs2, actualAggs2) + +} From 6c20df8eae34d2a5493d156a6365125d1809e42b Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Feb 2022 16:19:22 +0200 Subject: [PATCH 02/12] Add test for count operation --- .../extract/extract_aggregate_test.go | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 7eca324df..bb10a557d 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -34,6 +34,11 @@ aggregates: - service operation: sum recordkey: bytes +- name: bandwidth_count + by: + - service + operation: count + recordkey: "" ` var err error yamlData := make(map[string]interface{}) @@ -78,6 +83,30 @@ aggregates: "recentRawValues": []float64{1.0, 2.0}, "count": "2", }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "2.000000", + "bandwidth_count_value": "2.000000", + "recentRawValues": []float64{1.0, 1.0}, + "count": "2", + }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "2.000000", + "bandwidth_count_value": "2.000000", + "recentRawValues": []float64{1.0, 1.0}, + "count": "2", + }, } actualAggs1 := extractAggregate.Extract(input1) require.Equal(t, expectedAggs1, actualAggs1) @@ -112,6 +141,30 @@ aggregates: "recentRawValues": []float64{4.0, 5.0}, "count": "4", }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "3.000000", + "bandwidth_count_value": "3.000000", + "recentRawValues": []float64{1.0}, + "count": "3", + }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "4.000000", + "bandwidth_count_value": "4.000000", + "recentRawValues": []float64{1.0, 1.0}, + "count": "4", + }, } actualAggs2 := extractAggregate.Extract(input2) require.Equal(t, expectedAggs2, actualAggs2) From 09b8d5d100ea2b49c1a3a76f4c91615142d2380a Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Feb 2022 16:29:07 +0200 Subject: [PATCH 03/12] Make test agnostic to order --- pkg/pipeline/extract/extract_aggregate_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index bb10a557d..e4de19b69 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -109,7 +109,7 @@ aggregates: }, } actualAggs1 := extractAggregate.Extract(input1) - require.Equal(t, expectedAggs1, actualAggs1) + require.ElementsMatch(t, expectedAggs1, actualAggs1) input2 := []config.GenericMap{ {"service": "http", "bytes": 30.0}, @@ -167,6 +167,6 @@ aggregates: }, } actualAggs2 := extractAggregate.Extract(input2) - require.Equal(t, expectedAggs2, actualAggs2) + require.ElementsMatch(t, expectedAggs2, actualAggs2) } From cc564c42e72ca10ab9f1a1653fafc388337941d3 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Feb 2022 16:53:58 +0200 Subject: [PATCH 04/12] Add -coverpkg flag to coverage tests --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 9ac3b856e..314f7fdba 100644 --- a/Makefile +++ b/Makefile @@ -81,7 +81,7 @@ clean: ## Clean # note: to review coverage execute: go tool cover -html=/tmp/coverage.out .PHONY: test test: validate_go ## Test - go test -p 1 -race -covermode=atomic -coverprofile=/tmp/coverage.out ./... + go test -p 1 -race -coverpkg=./... -covermode=atomic -coverprofile=/tmp/coverage.out ./... # note: to review profile execute: go tool pprof -web /tmp/flowlogs-pipeline-cpu-profile.out (make sure graphviz is installed) .PHONY: benchmarks From f6ebc9a78c51bb3e3a38074bc238e86ce1070db1 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Feb 2022 18:06:18 +0200 Subject: [PATCH 05/12] Update year in copyright comment --- pkg/pipeline/extract/extract_aggregate_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index e4de19b69..927c6a737 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -1,5 +1,5 @@ /* - * Copyright (C) 2021 IBM, Inc. + * Copyright (C) 2022 IBM, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From e63d36bb535f7b3016bba61d93292dcae18294e2 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 10:16:32 +0200 Subject: [PATCH 06/12] Refactor test to use test cases --- .../extract/extract_aggregate_test.go | 240 ++++++++++-------- 1 file changed, 130 insertions(+), 110 deletions(-) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 927c6a737..794a027bd 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -26,7 +26,11 @@ import ( "testing" ) +// This tests extract_aggregate as a whole. It can be thought of as an integration test between extract_aggregate.go and +// aggregate.go and aggregates.go. The test sends flows in 2 batches and verifies the extractor's output after each +// batch. The output of the 2nd batch depends on the 1st batch. func Test_Extract(t *testing.T) { + // Setup yamlConfig := ` aggregates: - name: bandwidth @@ -52,121 +56,137 @@ aggregates: extractAggregate, err := NewExtractAggregate() require.NoError(t, err) - input1 := []config.GenericMap{ - {"service": "http", "bytes": 10.0}, - {"service": "http", "bytes": 20.0}, - {"service": "tcp", "bytes": 1.0}, - {"service": "tcp", "bytes": 2.0}, - } - expectedAggs1 := []config.GenericMap{ - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "30.000000", - "bandwidth_value": "30.000000", - "recentRawValues": []float64{10.0, 20.0}, - "count": "2", - }, - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "3.000000", - "bandwidth_value": "3.000000", - "recentRawValues": []float64{1.0, 2.0}, - "count": "2", - }, + // Test cases + tests := []struct { + name string + inputBatch []config.GenericMap + expectedAggs []config.GenericMap + }{ { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "2.000000", - "bandwidth_count_value": "2.000000", - "recentRawValues": []float64{1.0, 1.0}, - "count": "2", + name: "batch1", + inputBatch: []config.GenericMap{ + {"service": "http", "bytes": 10.0}, + {"service": "http", "bytes": 20.0}, + {"service": "tcp", "bytes": 1.0}, + {"service": "tcp", "bytes": 2.0}, + }, + expectedAggs: []config.GenericMap{ + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "30.000000", + "bandwidth_value": "30.000000", + "recentRawValues": []float64{10.0, 20.0}, + "count": "2", + }, + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "3.000000", + "bandwidth_value": "3.000000", + "recentRawValues": []float64{1.0, 2.0}, + "count": "2", + }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "2.000000", + "bandwidth_count_value": "2.000000", + "recentRawValues": []float64{1.0, 1.0}, + "count": "2", + }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "2.000000", + "bandwidth_count_value": "2.000000", + "recentRawValues": []float64{1.0, 1.0}, + "count": "2", + }, + }, }, { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "2.000000", - "bandwidth_count_value": "2.000000", - "recentRawValues": []float64{1.0, 1.0}, - "count": "2", + name: "batch2", + inputBatch: []config.GenericMap{ + {"service": "http", "bytes": 30.0}, + {"service": "tcp", "bytes": 4.0}, + {"service": "tcp", "bytes": 5.0}, + }, + expectedAggs: []config.GenericMap{ + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "60.000000", + "bandwidth_value": "60.000000", + "recentRawValues": []float64{30.0}, + "count": "3", + }, + { + "name": "bandwidth", + "record_key": "bytes", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationSum), + "value": "12.000000", + "bandwidth_value": "12.000000", + "recentRawValues": []float64{4.0, 5.0}, + "count": "4", + }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "http", + "service": "http", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "3.000000", + "bandwidth_count_value": "3.000000", + "recentRawValues": []float64{1.0}, + "count": "3", + }, + { + "name": "bandwidth_count", + "record_key": "", + "by": "service", + "aggregate": "tcp", + "service": "tcp", + "operation": aggregate.Operation(aggregate.OperationCount), + "value": "4.000000", + "bandwidth_count_value": "4.000000", + "recentRawValues": []float64{1.0, 1.0}, + "count": "4", + }, + }, }, } - actualAggs1 := extractAggregate.Extract(input1) - require.ElementsMatch(t, expectedAggs1, actualAggs1) - input2 := []config.GenericMap{ - {"service": "http", "bytes": 30.0}, - {"service": "tcp", "bytes": 4.0}, - {"service": "tcp", "bytes": 5.0}, + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualAggs := extractAggregate.Extract(tt.inputBatch) + // Since the order of the elements in the returned slice from Extract() is non-deterministic, we use + // ElementsMatch() rather than Equals() + require.ElementsMatch(t, tt.expectedAggs, actualAggs) + }) } - expectedAggs2 := []config.GenericMap{ - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "60.000000", - "bandwidth_value": "60.000000", - "recentRawValues": []float64{30.0}, - "count": "3", - }, - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "12.000000", - "bandwidth_value": "12.000000", - "recentRawValues": []float64{4.0, 5.0}, - "count": "4", - }, - { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "3.000000", - "bandwidth_count_value": "3.000000", - "recentRawValues": []float64{1.0}, - "count": "3", - }, - { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "4.000000", - "bandwidth_count_value": "4.000000", - "recentRawValues": []float64{1.0, 1.0}, - "count": "4", - }, - } - actualAggs2 := extractAggregate.Extract(input2) - require.ElementsMatch(t, expectedAggs2, actualAggs2) - } From 00bb41afd4509db85b5eb247aa9e662db71c3dca Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 10:51:03 +0200 Subject: [PATCH 07/12] Refactor test to use util function --- .../extract/extract_aggregate_test.go | 131 ++++-------------- 1 file changed, 30 insertions(+), 101 deletions(-) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 794a027bd..06c39238c 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -18,6 +18,7 @@ package extract import ( + "fmt" jsoniter "github.com/json-iterator/go" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" @@ -26,6 +27,22 @@ import ( "testing" ) +func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64) config.GenericMap { + valueString := fmt.Sprintf("%f", value) + return config.GenericMap{ + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": aggregate.Operation(op), + "value": valueString, + fmt.Sprintf("%v_value", name): valueString, + "recentRawValues": rrv, + "count": fmt.Sprintf("%v", count), + } +} + // This tests extract_aggregate as a whole. It can be thought of as an integration test between extract_aggregate.go and // aggregate.go and aggregates.go. The test sends flows in 2 batches and verifies the extractor's output after each // batch. The output of the 2nd batch depends on the 1st batch. @@ -33,16 +50,16 @@ func Test_Extract(t *testing.T) { // Setup yamlConfig := ` aggregates: -- name: bandwidth - by: - - service - operation: sum - recordkey: bytes - name: bandwidth_count by: - service operation: count recordkey: "" +- name: bandwidth_sum + by: + - service + operation: sum + recordkey: bytes ` var err error yamlData := make(map[string]interface{}) @@ -71,54 +88,10 @@ aggregates: {"service": "tcp", "bytes": 2.0}, }, expectedAggs: []config.GenericMap{ - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "30.000000", - "bandwidth_value": "30.000000", - "recentRawValues": []float64{10.0, 20.0}, - "count": "2", - }, - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "3.000000", - "bandwidth_value": "3.000000", - "recentRawValues": []float64{1.0, 2.0}, - "count": "2", - }, - { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "2.000000", - "bandwidth_count_value": "2.000000", - "recentRawValues": []float64{1.0, 1.0}, - "count": "2", - }, - { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "2.000000", - "bandwidth_count_value": "2.000000", - "recentRawValues": []float64{1.0, 1.0}, - "count": "2", - }, + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}), + createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), + createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), }, }, { @@ -129,54 +102,10 @@ aggregates: {"service": "tcp", "bytes": 5.0}, }, expectedAggs: []config.GenericMap{ - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "60.000000", - "bandwidth_value": "60.000000", - "recentRawValues": []float64{30.0}, - "count": "3", - }, - { - "name": "bandwidth", - "record_key": "bytes", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationSum), - "value": "12.000000", - "bandwidth_value": "12.000000", - "recentRawValues": []float64{4.0, 5.0}, - "count": "4", - }, - { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "http", - "service": "http", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "3.000000", - "bandwidth_count_value": "3.000000", - "recentRawValues": []float64{1.0}, - "count": "3", - }, - { - "name": "bandwidth_count", - "record_key": "", - "by": "service", - "aggregate": "tcp", - "service": "tcp", - "operation": aggregate.Operation(aggregate.OperationCount), - "value": "4.000000", - "bandwidth_count_value": "4.000000", - "recentRawValues": []float64{1.0, 1.0}, - "count": "4", - }, + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}), + createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}), + createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}), }, }, } From 04af3029cbbabd8ff8b4c18a6540bf68f12c627d Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 11:03:10 +0200 Subject: [PATCH 08/12] Add test for max --- .../extract/extract_aggregate_test.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 06c39238c..d6f640a04 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -55,11 +55,18 @@ aggregates: - service operation: count recordkey: "" + - name: bandwidth_sum by: - service operation: sum recordkey: bytes + +- name: bandwidth_max + by: + - service + operation: max + recordkey: bytes ` var err error yamlData := make(map[string]interface{}) @@ -88,10 +95,12 @@ aggregates: {"service": "tcp", "bytes": 2.0}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}), createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}), + createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}), + createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}), }, }, { @@ -102,10 +111,12 @@ aggregates: {"service": "tcp", "bytes": 5.0}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}), createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}), createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}), + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}), + createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}), + createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}), }, }, } From 6cc8c7165b971aacf5eed081cd767916b0d48278 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 11:07:16 +0200 Subject: [PATCH 09/12] Add test for min --- pkg/pipeline/extract/extract_aggregate_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index d6f640a04..cb664a3cf 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -67,6 +67,12 @@ aggregates: - service operation: max recordkey: bytes + +- name: bandwidth_min + by: + - service + operation: min + recordkey: bytes ` var err error yamlData := make(map[string]interface{}) @@ -101,6 +107,8 @@ aggregates: createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}), createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}), createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}), + createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}), + createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}), }, }, { @@ -117,6 +125,8 @@ aggregates: createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}), createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}), createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}), + createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}), + createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}), }, }, } From b7e27a8f286e197d368c3ebd8abe80b6316becec Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 11:10:43 +0200 Subject: [PATCH 10/12] Add test for avg --- pkg/pipeline/extract/extract_aggregate_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index cb664a3cf..21970d3d8 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -73,6 +73,12 @@ aggregates: - service operation: min recordkey: bytes + +- name: bandwidth_avg + by: + - service + operation: avg + recordkey: bytes ` var err error yamlData := make(map[string]interface{}) @@ -109,6 +115,8 @@ aggregates: createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}), createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}), createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}), + createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}), + createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}), }, }, { @@ -127,6 +135,8 @@ aggregates: createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}), createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}), createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}), + createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}), + createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}), }, }, } From f2e2ba8fd49c1c750686438d5dc64ac6d36229ad Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 11:44:08 +0200 Subject: [PATCH 11/12] Change type of bytes float->int --- pkg/pipeline/extract/extract_aggregate_test.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 21970d3d8..fc7a99d08 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -101,10 +101,10 @@ aggregates: { name: "batch1", inputBatch: []config.GenericMap{ - {"service": "http", "bytes": 10.0}, - {"service": "http", "bytes": 20.0}, - {"service": "tcp", "bytes": 1.0}, - {"service": "tcp", "bytes": 2.0}, + {"service": "http", "bytes": 10}, + {"service": "http", "bytes": 20}, + {"service": "tcp", "bytes": 1}, + {"service": "tcp", "bytes": 2}, }, expectedAggs: []config.GenericMap{ createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), @@ -122,9 +122,8 @@ aggregates: { name: "batch2", inputBatch: []config.GenericMap{ - {"service": "http", "bytes": 30.0}, - {"service": "tcp", "bytes": 4.0}, - {"service": "tcp", "bytes": 5.0}, + {"service": "http", "bytes": 30}, + {"service": "tcp", "bytes": 4}, }, expectedAggs: []config.GenericMap{ createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}), From 7d74fcefe893f5995ae5d609bb1c6cb3e88368d5 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 28 Feb 2022 12:06:40 +0200 Subject: [PATCH 12/12] Fix test --- pkg/pipeline/extract/extract_aggregate_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index fc7a99d08..5982b35e9 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -124,6 +124,7 @@ aggregates: inputBatch: []config.GenericMap{ {"service": "http", "bytes": 30}, {"service": "tcp", "bytes": 4}, + {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}),