From 9ef90c07770af27d17932920747c14117442739e Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 14 Mar 2022 13:54:10 +0200 Subject: [PATCH 01/10] Add recent_op_value and recent_count --- pkg/pipeline/extract/aggregate/aggregate.go | 38 +++++++++++++--- .../extract/extract_aggregate_test.go | 44 ++++++++++--------- 2 files changed, 55 insertions(+), 27 deletions(-) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 2fda222d1..7252918f0 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -48,6 +48,8 @@ type Aggregate struct { type GroupState struct { normalizedValues NormalizedValues RecentRawValues []float64 + recentOpValue float64 + recentCount int value float64 count int } @@ -97,17 +99,27 @@ func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, Normaliz return nil, normalizedValues } +func initValue(operation string) (float64, error) { + switch operation { + case OperationSum, OperationAvg, OperationMax, OperationCount: + return 0, nil + case OperationMin: + return math.MaxFloat64, nil + default: + return 0, fmt.Errorf("unkown operation %v", operation) + } +} + func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues) error { groupState, ok := aggregate.Groups[normalizedValues] if !ok { groupState = &GroupState{normalizedValues: normalizedValues} - switch string(aggregate.Definition.Operation) { - case OperationSum, OperationMax: - groupState.value = 0 - case OperationMin: - groupState.value = math.MaxFloat64 - default: + initVal, err := initValue(string(aggregate.Definition.Operation)) + if err != nil { + return err } + groupState.value = initVal + groupState.recentOpValue = initVal aggregate.Groups[normalizedValues] = groupState } @@ -117,6 +129,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu if operation == OperationCount { groupState.value = float64(groupState.count + 1) + groupState.recentOpValue = float64(groupState.recentCount + 1) groupState.RecentRawValues = append(groupState.RecentRawValues, 1) } else { if recordKey != "" { @@ -128,12 +141,16 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu switch operation { case OperationSum: groupState.value += valueFloat64 + groupState.recentOpValue += valueFloat64 case OperationMax: groupState.value = math.Max(groupState.value, valueFloat64) + groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64) case OperationMin: groupState.value = math.Min(groupState.value, valueFloat64) + groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64) case OperationAvg: groupState.value = (groupState.value*float64(groupState.count) + valueFloat64) / float64(groupState.count+1) + groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) } } } @@ -141,6 +158,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu // update count groupState.count += 1 + groupState.recentCount += 1 return nil } @@ -176,11 +194,19 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "value": fmt.Sprintf("%f", group.value), "recentRawValues": group.RecentRawValues, "count": fmt.Sprintf("%d", group.count), + "recent_op_value": group.recentOpValue, + "recent_count": group.recentCount, aggregate.Definition.Name + "_value": fmt.Sprintf("%f", group.value), strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the raw values accumulation group.RecentRawValues = make([]float64, 0) + group.recentCount = 0 + initVal, err := initValue(string(aggregate.Definition.Operation)) + if err != nil { + log.Errorf("Error: %v", err) + } + group.recentOpValue = initVal } return metrics diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index b32e749f2..b292d55db 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -28,7 +28,7 @@ import ( "github.com/stretchr/testify/require" ) -func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64) config.GenericMap { +func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { valueString := fmt.Sprintf("%f", value) return config.GenericMap{ "name": name, @@ -41,6 +41,8 @@ func createAgg(name, recordKey, by, agg, op string, value float64, count int, rr fmt.Sprintf("%v_value", name): valueString, "recentRawValues": rrv, "count": fmt.Sprintf("%v", count), + "recent_op_value": recentOpValue, + "recent_count": recentCount, } } @@ -110,16 +112,16 @@ parameters: {"service": "tcp", "bytes": 2}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}), - createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}), - createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}), - createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}), + createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), + createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}, 30, 2), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}, 3, 2), + createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}, 20, 2), + createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}, 2, 2), + createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}, 10, 2), + createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}, 1, 2), + createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}, 15, 2), + createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}, 1.5, 2), }, }, { @@ -130,16 +132,16 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}), - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}), - createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}), - createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}), - createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}), - createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}), - createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}), - createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}), + createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}, 1, 1), + createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}, 2, 2), + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}, 30, 1), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}, 9, 2), + createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}, 30, 1), + createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}, 5, 2), + createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}, 30, 1), + createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}, 4, 2), + createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}, 30, 1), + createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}, 4.5, 2), }, }, } From ffde58d5cf0c0757c2a0d0c5e523b221ef1fcec0 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 14 Mar 2022 13:58:51 +0200 Subject: [PATCH 02/10] Update Counter's source --- pkg/pipeline/encode/encode_prom.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 2f2a6f612..4d168cbff 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -140,9 +140,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap mInfo.promGauge.With(entryLabels).Set(valueFloat) cEntry.PromMetric.promGauge = mInfo.promGauge case api.PromEncodeOperationName("Counter"): - for _, v := range metric["recentRawValues"].([]float64) { - mInfo.promCounter.With(entryLabels).Add(v) - } + mInfo.promCounter.With(entryLabels).Add(valueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): for _, v := range metric["recentRawValues"].([]float64) { From 0131f68ee9937551f4c3df08ae73d4b61d26794e Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 14 Mar 2022 15:27:54 +0200 Subject: [PATCH 03/10] Add integration test between extractor and prom encoder --- pkg/pipeline/aggregate_prom_test.go | 188 ++++++++++++++++++++ pkg/pipeline/encode/encode_prom.go | 1 + pkg/pipeline/extract/aggregate/aggregate.go | 25 +-- 3 files changed, 202 insertions(+), 12 deletions(-) create mode 100644 pkg/pipeline/aggregate_prom_test.go diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go new file mode 100644 index 000000000..be61f4319 --- /dev/null +++ b/pkg/pipeline/aggregate_prom_test.go @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package pipeline + +import ( + "fmt" + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { + valueString := fmt.Sprintf("%f", value) + return config.GenericMap{ + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "value": valueString, + fmt.Sprintf("%v_value", name): valueString, + "recentRawValues": rrv, + "count": fmt.Sprintf("%v", count), + fmt.Sprintf("%v_recent_op_value", name): recentOpValue, + fmt.Sprintf("%v_recent_count", name): recentCount, + } +} + +func createEncodeOutput(name string, labels map[string]string, value float64) config.GenericMap { + gm := config.GenericMap{ + "Name": name, + "Labels": labels, + "value": value, + } + return gm +} + +// Test_Extract_Encode tests the integration between extract_aggregate and encode_prom. +// The test sends flows in 2 batches. Each batch is passed through the extractor and the encoder. +// The output of each stage is verified. +// The output of the 2nd batch depends on the 1st batch. +func Test_Extract_Encode(t *testing.T) { + // Setup + yamlConfig := ` +pipeline: + - name: extract + - name: encode +parameters: + - name: extract + extract: + type: aggregates + aggregates: + - name: bandwidth_sum + by: + - service + operation: sum + recordkey: bytes + + - name: bandwidth_count + by: + - service + operation: count + recordkey: + - name: encode + encode: + type: prom + prom: + port: 9103 + prefix: test_ + expirytime: 1 + metrics: + - name: flow_count + type: counter + valuekey: bandwidth_count_recent_count + labels: + - service + + - name: bytes_sum + type: counter + valuekey: bandwidth_sum_recent_op_value + labels: + - service + +# - name: bytes_histogram +# type: histogram +# valuekey: recentRawValues +# labels: +# - service +` + var err error + + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + + extractAggregate, err := extract.NewExtractAggregate(config.Parameters[0]) + require.NoError(t, err) + + promEncode, err := encode.NewEncodeProm(config.Parameters[1]) + require.Equal(t, err, nil) + + // Test cases + tests := []struct { + name string + inputBatch []config.GenericMap + expectedAggs []config.GenericMap + expectedEncode []config.GenericMap + }{ + { + name: "batch1", + inputBatch: []config.GenericMap{ + {"service": "http", "bytes": 10.0}, + {"service": "http", "bytes": 20.0}, + {"service": "tcp", "bytes": 1.0}, + {"service": "tcp", "bytes": 2.0}, + }, + expectedAggs: []config.GenericMap{ + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2), + createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + }, + expectedEncode: []config.GenericMap{ + createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), + createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3), + // TODO: add the following test once raw_values operation and filters are implemented + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}), + }, + }, + { + name: "batch2", + inputBatch: []config.GenericMap{ + {"service": "http", "bytes": 30}, + {"service": "tcp", "bytes": 4}, + {"service": "tcp", "bytes": 5}, + }, + expectedAggs: []config.GenericMap{ + createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1), + createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2), + createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1), + createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2), + }, + expectedEncode: []config.GenericMap{ + createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), + createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualAggs := extractAggregate.Extract(tt.inputBatch) + // Since the order of the elements in the returned slice from Extract() and Encode() is non-deterministic, + // we use ElementsMatch() rather than Equals() + require.ElementsMatch(t, tt.expectedAggs, actualAggs) + + actualEncode := promEncode.Encode(actualAggs) + require.ElementsMatch(t, tt.expectedEncode, actualEncode) + }) + } +} diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 4d168cbff..a35e87e21 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -126,6 +126,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap value: valueFloat, } entryMap := map[string]interface{}{ + // TODO: change to lower case "Name": e.prefix + metricName, "Labels": entryLabels, "value": valueFloat, diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 7252918f0..2281566c5 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -185,19 +185,20 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error { func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { + // TODO: remove prefixes when filtering is implemented in prom encode. metrics = append(metrics, config.GenericMap{ - "name": aggregate.Definition.Name, - "operation": aggregate.Definition.Operation, - "record_key": aggregate.Definition.RecordKey, - "by": strings.Join(aggregate.Definition.By, ","), - "aggregate": string(group.normalizedValues), - "value": fmt.Sprintf("%f", group.value), - "recentRawValues": group.RecentRawValues, - "count": fmt.Sprintf("%d", group.count), - "recent_op_value": group.recentOpValue, - "recent_count": group.recentCount, - aggregate.Definition.Name + "_value": fmt.Sprintf("%f", group.value), - strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), + "name": aggregate.Definition.Name, + "operation": aggregate.Definition.Operation, + "record_key": aggregate.Definition.RecordKey, + "by": strings.Join(aggregate.Definition.By, ","), + "aggregate": string(group.normalizedValues), + "value": fmt.Sprintf("%f", group.value), + "recentRawValues": group.RecentRawValues, + "count": fmt.Sprintf("%d", group.count), + aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, + aggregate.Definition.Name + "_recent_count": group.recentCount, + aggregate.Definition.Name + "_value": fmt.Sprintf("%f", group.value), + strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the raw values accumulation group.RecentRawValues = make([]float64, 0) From cad6c49652baff6f5e5e43a89e5d471a9002346e Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 14 Mar 2022 16:37:35 +0200 Subject: [PATCH 04/10] rename fields --- pkg/pipeline/aggregate_prom_test.go | 6 ++--- pkg/pipeline/extract/aggregate/aggregate.go | 6 ++--- .../extract/aggregate/aggregate_test.go | 2 +- .../extract/extract_aggregate_test.go | 24 +++++++++---------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index be61f4319..03aaf7fba 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -38,10 +38,10 @@ func createAgg(name, recordKey, by, agg, op string, value float64, count int, rr "aggregate": agg, by: agg, "operation": api.AggregateOperation(op), - "value": valueString, - fmt.Sprintf("%v_value", name): valueString, + "total_value": valueString, + fmt.Sprintf("%v_total_value", name): valueString, "recentRawValues": rrv, - "count": fmt.Sprintf("%v", count), + "total_count": fmt.Sprintf("%v", count), fmt.Sprintf("%v_recent_op_value", name): recentOpValue, fmt.Sprintf("%v_recent_count", name): recentCount, } diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 2281566c5..567014ecb 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -192,12 +192,12 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "record_key": aggregate.Definition.RecordKey, "by": strings.Join(aggregate.Definition.By, ","), "aggregate": string(group.normalizedValues), - "value": fmt.Sprintf("%f", group.value), + "total_value": fmt.Sprintf("%f", group.value), "recentRawValues": group.RecentRawValues, - "count": fmt.Sprintf("%d", group.count), + "total_count": fmt.Sprintf("%d", group.count), aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, aggregate.Definition.Name + "_recent_count": group.recentCount, - aggregate.Definition.Name + "_value": fmt.Sprintf("%f", group.value), + aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.value), strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the raw values accumulation diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index 78566fe54..b50ed81cd 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -132,6 +132,6 @@ func Test_GetMetrics(t *testing.T) { require.Equal(t, len(metrics), 1) require.Equal(t, metrics[0]["name"], aggregate.Definition.Name) - valueFloat64, _ := strconv.ParseFloat(fmt.Sprintf("%s", metrics[0]["value"]), 64) + valueFloat64, _ := strconv.ParseFloat(fmt.Sprintf("%s", metrics[0]["total_value"]), 64) require.Equal(t, valueFloat64, float64(7)) } diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index b292d55db..ca9a53049 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -31,18 +31,18 @@ import ( func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { valueString := fmt.Sprintf("%f", value) return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "value": valueString, - fmt.Sprintf("%v_value", name): valueString, - "recentRawValues": rrv, - "count": fmt.Sprintf("%v", count), - "recent_op_value": recentOpValue, - "recent_count": recentCount, + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + fmt.Sprintf("%v_total_value", name): valueString, + "recentRawValues": rrv, + "total_count": fmt.Sprintf("%v", count), + name + "_recent_op_value": recentOpValue, + name + "_recent_count": recentCount, } } From 2c8e3fb0226e8f6c8ea1ffd9d57428ecaa390b4e Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 14 Mar 2022 16:40:57 +0200 Subject: [PATCH 05/10] Update a comment --- pkg/pipeline/extract/aggregate/aggregate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 567014ecb..4eae81710 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -200,7 +200,7 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.value), strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) - // Once reported, we reset the raw values accumulation + // Once reported, we reset the recentXXX fields group.RecentRawValues = make([]float64, 0) group.recentCount = 0 initVal, err := initValue(string(aggregate.Definition.Operation)) From 14db1674a50682c11dedf57c071ec32de709a0af Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Mon, 14 Mar 2022 16:47:39 +0200 Subject: [PATCH 06/10] Update network_definitions --- .../kubernetes/flowlogs-pipeline.conf.yaml | 30 +++++++++---------- .../bandwidth_per_network_service.yaml | 2 +- .../bandwidth_per_src_dest_subnet.yaml | 2 +- .../bandwidth_per_src_subnet.yaml | 2 +- .../connection_rate_per_dest_subnet.yaml | 2 +- .../connection_rate_per_src_subnet.yaml | 2 +- .../connection_rate_per_tcp_flags.yaml | 2 +- .../connections_per_dst_as.yaml | 2 +- .../connections_per_src_as.yaml | 2 +- .../count_per_src_dest_subnet.yaml | 2 +- .../egress_bandwidth_per_dest_subnet.yaml | 2 +- .../egress_bandwidth_per_namespace.yaml | 2 +- .../geo-location_rate_per_dest.yaml | 2 +- network_definitions/mice_elephants.yaml | 4 +-- .../network_services_count.yaml | 2 +- 15 files changed, 30 insertions(+), 30 deletions(-) diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 7bb7f28c5..74563154a 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -165,105 +165,105 @@ parameters: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_value + valuekey: bandwidth_network_service_recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_value + valuekey: bandwidth_source_destination_subnet_recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_value + valuekey: bandwidth_source_subnet_recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_count_value + valuekey: dest_connection_subnet_recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_value + valuekey: src_connection_count_recent_count labels: - by - aggregate buckets: [] - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_count_value + valuekey: TCPFlags_recent_count labels: - by - aggregate buckets: [] - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_count_value + valuekey: dst_as_connection_recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_as type: counter - valuekey: src_as_connection_count_value + valuekey: src_as_connection_recent_count labels: - by - aggregate buckets: [] - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_value + valuekey: count_source_destination_subnet_recent_count labels: - by - aggregate buckets: [] - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_value + valuekey: bandwidth_destination_subnet_recent_op_value labels: - by - aggregate buckets: [] - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_value + valuekey: bandwidth_namespace_recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_count_value + valuekey: dest_connection_location_recent_count labels: - by - aggregate buckets: [] - name: mice_count type: counter - valuekey: mice_count_value + valuekey: mice_count_recent_count labels: - by - aggregate buckets: [] - name: elephant_count type: counter - valuekey: elephant_count_value + valuekey: elephant_count_recent_count labels: - by - aggregate buckets: [] - name: service_count type: counter - valuekey: dest_service_count_value + valuekey: dest_service_recent_count labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_network_service.yaml b/network_definitions/bandwidth_per_network_service.yaml index 2e9a89a9c..0af3e36c0 100644 --- a/network_definitions/bandwidth_per_network_service.yaml +++ b/network_definitions/bandwidth_per_network_service.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_value + valuekey: bandwidth_network_service_recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_dest_subnet.yaml b/network_definitions/bandwidth_per_src_dest_subnet.yaml index 69bb5cb8f..f4ea6b7df 100644 --- a/network_definitions/bandwidth_per_src_dest_subnet.yaml +++ b/network_definitions/bandwidth_per_src_dest_subnet.yaml @@ -34,7 +34,7 @@ encode: metrics: - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_value + valuekey: bandwidth_source_destination_subnet_recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_subnet.yaml b/network_definitions/bandwidth_per_src_subnet.yaml index 8435b2e1b..12d3117c9 100644 --- a/network_definitions/bandwidth_per_src_subnet.yaml +++ b/network_definitions/bandwidth_per_src_subnet.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_value + valuekey: bandwidth_source_subnet_recent_op_value labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index 08e87613d..e91955d85 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -32,7 +32,7 @@ encode: metrics: - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_count_value + valuekey: dest_connection_subnet_recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_src_subnet.yaml b/network_definitions/connection_rate_per_src_subnet.yaml index cf95b6465..eb11ab5d1 100644 --- a/network_definitions/connection_rate_per_src_subnet.yaml +++ b/network_definitions/connection_rate_per_src_subnet.yaml @@ -27,7 +27,7 @@ encode: metrics: - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_value + valuekey: src_connection_count_recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_tcp_flags.yaml b/network_definitions/connection_rate_per_tcp_flags.yaml index 713332e9f..a7734de7c 100644 --- a/network_definitions/connection_rate_per_tcp_flags.yaml +++ b/network_definitions/connection_rate_per_tcp_flags.yaml @@ -21,7 +21,7 @@ encode: metrics: - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_count_value + valuekey: TCPFlags_recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_dst_as.yaml b/network_definitions/connections_per_dst_as.yaml index 4960c719b..73b6644f1 100644 --- a/network_definitions/connections_per_dst_as.yaml +++ b/network_definitions/connections_per_dst_as.yaml @@ -22,7 +22,7 @@ encode: metrics: - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_count_value + valuekey: dst_as_connection_recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_src_as.yaml b/network_definitions/connections_per_src_as.yaml index 7d761658a..2d8d7412e 100644 --- a/network_definitions/connections_per_src_as.yaml +++ b/network_definitions/connections_per_src_as.yaml @@ -22,7 +22,7 @@ encode: metrics: - name: connections_per_source_as type: counter - valuekey: src_as_connection_count_value + valuekey: src_as_connection_recent_count labels: - by - aggregate diff --git a/network_definitions/count_per_src_dest_subnet.yaml b/network_definitions/count_per_src_dest_subnet.yaml index d30a1f5b1..67cf68dca 100644 --- a/network_definitions/count_per_src_dest_subnet.yaml +++ b/network_definitions/count_per_src_dest_subnet.yaml @@ -33,7 +33,7 @@ encode: metrics: - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_value + valuekey: count_source_destination_subnet_recent_count labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_dest_subnet.yaml b/network_definitions/egress_bandwidth_per_dest_subnet.yaml index cad589a27..58d53bba3 100644 --- a/network_definitions/egress_bandwidth_per_dest_subnet.yaml +++ b/network_definitions/egress_bandwidth_per_dest_subnet.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_value + valuekey: bandwidth_destination_subnet_recent_op_value labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_namespace.yaml b/network_definitions/egress_bandwidth_per_namespace.yaml index 2b6fabd42..dcb0377fc 100644 --- a/network_definitions/egress_bandwidth_per_namespace.yaml +++ b/network_definitions/egress_bandwidth_per_namespace.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_value + valuekey: bandwidth_namespace_recent_op_value labels: - by - aggregate diff --git a/network_definitions/geo-location_rate_per_dest.yaml b/network_definitions/geo-location_rate_per_dest.yaml index 2502f4b66..db3426723 100644 --- a/network_definitions/geo-location_rate_per_dest.yaml +++ b/network_definitions/geo-location_rate_per_dest.yaml @@ -28,7 +28,7 @@ encode: metrics: - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_count_value + valuekey: dest_connection_location_recent_count labels: - by - aggregate diff --git a/network_definitions/mice_elephants.yaml b/network_definitions/mice_elephants.yaml index dd31e8975..4907054f0 100644 --- a/network_definitions/mice_elephants.yaml +++ b/network_definitions/mice_elephants.yaml @@ -36,13 +36,13 @@ encode: metrics: - name: mice_count type: counter - valuekey: mice_count_value + valuekey: mice_count_recent_count labels: - by - aggregate - name: elephant_count type: counter - valuekey: elephant_count_value + valuekey: elephant_count_recent_count labels: - by - aggregate diff --git a/network_definitions/network_services_count.yaml b/network_definitions/network_services_count.yaml index 94def0563..87a55d6ab 100644 --- a/network_definitions/network_services_count.yaml +++ b/network_definitions/network_services_count.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: service_count type: counter - valuekey: dest_service_count_value + valuekey: dest_service_recent_count labels: - by - aggregate From 17460dac62003b51fae136268e28f0d39612981a Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 20 Mar 2022 11:17:55 +0200 Subject: [PATCH 07/10] Address Eran's comment: Panic in initValue() on unknown operation --- pkg/pipeline/extract/aggregate/aggregate.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 4eae81710..ed0a0b520 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -99,14 +99,15 @@ func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, Normaliz return nil, normalizedValues } -func initValue(operation string) (float64, error) { +func initValue(operation string) float64 { switch operation { case OperationSum, OperationAvg, OperationMax, OperationCount: - return 0, nil + return 0 case OperationMin: - return math.MaxFloat64, nil + return math.MaxFloat64 default: - return 0, fmt.Errorf("unkown operation %v", operation) + log.Panicf("unkown operation %v", operation) + return 0 } } @@ -114,10 +115,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu groupState, ok := aggregate.Groups[normalizedValues] if !ok { groupState = &GroupState{normalizedValues: normalizedValues} - initVal, err := initValue(string(aggregate.Definition.Operation)) - if err != nil { - return err - } + initVal := initValue(string(aggregate.Definition.Operation)) groupState.value = initVal groupState.recentOpValue = initVal aggregate.Groups[normalizedValues] = groupState @@ -203,10 +201,7 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { // Once reported, we reset the recentXXX fields group.RecentRawValues = make([]float64, 0) group.recentCount = 0 - initVal, err := initValue(string(aggregate.Definition.Operation)) - if err != nil { - log.Errorf("Error: %v", err) - } + initVal := initValue(string(aggregate.Definition.Operation)) group.recentOpValue = initVal } From 6f53a6d8c8c54e21e37a50dbae4d92711c58c6be Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 20 Mar 2022 11:33:59 +0200 Subject: [PATCH 08/10] Address Eran's comment: add prefix "total" to count and value --- pkg/pipeline/extract/aggregate/aggregate.go | 24 +++++++++---------- .../extract/aggregate/aggregate_test.go | 4 ++-- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index ed0a0b520..61746561a 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -50,8 +50,8 @@ type GroupState struct { RecentRawValues []float64 recentOpValue float64 recentCount int - value float64 - count int + totalValue float64 + totalCount int } func (aggregate Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bool) { @@ -116,7 +116,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu if !ok { groupState = &GroupState{normalizedValues: normalizedValues} initVal := initValue(string(aggregate.Definition.Operation)) - groupState.value = initVal + groupState.totalValue = initVal groupState.recentOpValue = initVal aggregate.Groups[normalizedValues] = groupState } @@ -126,7 +126,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu operation := aggregate.Definition.Operation if operation == OperationCount { - groupState.value = float64(groupState.count + 1) + groupState.totalValue = float64(groupState.totalCount + 1) groupState.recentOpValue = float64(groupState.recentCount + 1) groupState.RecentRawValues = append(groupState.RecentRawValues, 1) } else { @@ -138,16 +138,16 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu groupState.RecentRawValues = append(groupState.RecentRawValues, valueFloat64) switch operation { case OperationSum: - groupState.value += valueFloat64 + groupState.totalValue += valueFloat64 groupState.recentOpValue += valueFloat64 case OperationMax: - groupState.value = math.Max(groupState.value, valueFloat64) + groupState.totalValue = math.Max(groupState.totalValue, valueFloat64) groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64) case OperationMin: - groupState.value = math.Min(groupState.value, valueFloat64) + groupState.totalValue = math.Min(groupState.totalValue, valueFloat64) groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64) case OperationAvg: - groupState.value = (groupState.value*float64(groupState.count) + valueFloat64) / float64(groupState.count+1) + groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1) groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) } } @@ -155,7 +155,7 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu } // update count - groupState.count += 1 + groupState.totalCount += 1 groupState.recentCount += 1 return nil @@ -190,12 +190,12 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "record_key": aggregate.Definition.RecordKey, "by": strings.Join(aggregate.Definition.By, ","), "aggregate": string(group.normalizedValues), - "total_value": fmt.Sprintf("%f", group.value), + "total_value": fmt.Sprintf("%f", group.totalValue), "recentRawValues": group.RecentRawValues, - "total_count": fmt.Sprintf("%d", group.count), + "total_count": fmt.Sprintf("%d", group.totalCount), aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, aggregate.Definition.Name + "_recent_count": group.recentCount, - aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.value), + aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.totalValue), strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index b50ed81cd..81a47ef0a 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -116,8 +116,8 @@ func Test_Evaluate(t *testing.T) { err := aggregate.Evaluate(entries) require.Equal(t, err, nil) - require.Equal(t, aggregate.Groups[normalizedValues].count, 2) - require.Equal(t, aggregate.Groups[normalizedValues].value, float64(7)) + require.Equal(t, aggregate.Groups[normalizedValues].totalCount, 2) + require.Equal(t, aggregate.Groups[normalizedValues].totalValue, float64(7)) } func Test_GetMetrics(t *testing.T) { From 07abc65719b0c41dd17453774a0606a4e160c5f5 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 20 Mar 2022 15:30:16 +0200 Subject: [PATCH 09/10] Address Eran's comment: change struct member to be unexported --- pkg/pipeline/extract/aggregate/aggregate.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 61746561a..d3fc34b7e 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -47,7 +47,7 @@ type Aggregate struct { type GroupState struct { normalizedValues NormalizedValues - RecentRawValues []float64 + recentRawValues []float64 recentOpValue float64 recentCount int totalValue float64 @@ -128,14 +128,14 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu if operation == OperationCount { groupState.totalValue = float64(groupState.totalCount + 1) groupState.recentOpValue = float64(groupState.recentCount + 1) - groupState.RecentRawValues = append(groupState.RecentRawValues, 1) + groupState.recentRawValues = append(groupState.recentRawValues, 1) } else { if recordKey != "" { value, ok := entry[recordKey] if ok { valueString := fmt.Sprintf("%v", value) valueFloat64, _ := strconv.ParseFloat(valueString, 64) - groupState.RecentRawValues = append(groupState.RecentRawValues, valueFloat64) + groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) switch operation { case OperationSum: groupState.totalValue += valueFloat64 @@ -191,7 +191,7 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "by": strings.Join(aggregate.Definition.By, ","), "aggregate": string(group.normalizedValues), "total_value": fmt.Sprintf("%f", group.totalValue), - "recentRawValues": group.RecentRawValues, + "recentRawValues": group.recentRawValues, "total_count": fmt.Sprintf("%d", group.totalCount), aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, aggregate.Definition.Name + "_recent_count": group.recentCount, @@ -199,7 +199,7 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields - group.RecentRawValues = make([]float64, 0) + group.recentRawValues = make([]float64, 0) group.recentCount = 0 initVal := initValue(string(aggregate.Definition.Operation)) group.recentOpValue = initVal From 9a07363d13af111dd978bef066b78c40360fe08c Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 20 Mar 2022 15:48:38 +0200 Subject: [PATCH 10/10] Address Eran's comment: rename and move common test func to test/utils --- pkg/pipeline/aggregate_prom_test.go | 36 +++-------- .../extract/extract_aggregate_test.go | 60 +++++++------------ pkg/test/utils.go | 19 ++++++ 3 files changed, 47 insertions(+), 68 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 03aaf7fba..2dc0eddb3 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -17,10 +17,8 @@ package pipeline import ( - "fmt" "testing" - "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" @@ -29,24 +27,6 @@ import ( "github.com/stretchr/testify/require" ) -func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { - valueString := fmt.Sprintf("%f", value) - return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "total_value": valueString, - fmt.Sprintf("%v_total_value", name): valueString, - "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", count), - fmt.Sprintf("%v_recent_op_value", name): recentOpValue, - fmt.Sprintf("%v_recent_count", name): recentCount, - } -} - func createEncodeOutput(name string, labels map[string]string, value float64) config.GenericMap { gm := config.GenericMap{ "Name": name, @@ -135,10 +115,10 @@ parameters: {"service": "tcp", "bytes": 2.0}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2), - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), @@ -158,10 +138,10 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2), - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index ca9a53049..1648adf50 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -18,34 +18,14 @@ package extract import ( - "fmt" "testing" - "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" ) -func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { - valueString := fmt.Sprintf("%f", value) - return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "total_value": valueString, - fmt.Sprintf("%v_total_value", name): valueString, - "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", count), - name + "_recent_op_value": recentOpValue, - name + "_recent_count": recentCount, - } -} - // This tests extract_aggregate as a whole. It can be thought of as an integration test between extract_aggregate.go and // aggregate.go and aggregates.go. The test sends flows in 2 batches and verifies the extractor's output after each // batch. The output of the 2nd batch depends on the 1st batch. @@ -112,16 +92,16 @@ parameters: {"service": "tcp", "bytes": 2}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}, 30, 2), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}, 3, 2), - createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}, 20, 2), - createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}, 2, 2), - createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}, 10, 2), - createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}, 1, 2), - createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}, 15, 2), - createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}, 1.5, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}, 3, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}, 20, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}, 2, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}, 10, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}, 1, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}, 15, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}, 1.5, 2), }, }, { @@ -132,16 +112,16 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}, 1, 1), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}, 2, 2), - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}, 30, 1), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}, 9, 2), - createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}, 30, 1), - createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}, 5, 2), - createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}, 30, 1), - createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}, 4, 2), - createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}, 30, 1), - createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}, 4.5, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}, 9, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}, 5, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}, 4, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}, 4.5, 2), }, }, } diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 063f971f4..96108651e 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -24,6 +24,7 @@ import ( "testing" jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/spf13/viper" "github.com/stretchr/testify/require" @@ -113,3 +114,21 @@ func GetExtractMockEntry() config.GenericMap { } return entry } + +func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { + valueString := fmt.Sprintf("%f", value) + return config.GenericMap{ + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + fmt.Sprintf("%v_total_value", name): valueString, + "recentRawValues": rrv, + "total_count": fmt.Sprintf("%v", count), + name + "_recent_op_value": recentOpValue, + name + "_recent_count": recentCount, + } +}