diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 7bb7f28c5..74563154a 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -165,105 +165,105 @@ parameters: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_value + valuekey: bandwidth_network_service_recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_value + valuekey: bandwidth_source_destination_subnet_recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_value + valuekey: bandwidth_source_subnet_recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_count_value + valuekey: dest_connection_subnet_recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_value + valuekey: src_connection_count_recent_count labels: - by - aggregate buckets: [] - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_count_value + valuekey: TCPFlags_recent_count labels: - by - aggregate buckets: [] - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_count_value + valuekey: dst_as_connection_recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_as type: counter - valuekey: src_as_connection_count_value + valuekey: src_as_connection_recent_count labels: - by - aggregate buckets: [] - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_value + valuekey: count_source_destination_subnet_recent_count labels: - by - aggregate buckets: [] - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_value + valuekey: bandwidth_destination_subnet_recent_op_value labels: - by - aggregate buckets: [] - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_value + valuekey: bandwidth_namespace_recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_count_value + valuekey: dest_connection_location_recent_count labels: - by - aggregate buckets: [] - name: mice_count type: counter - valuekey: mice_count_value + valuekey: mice_count_recent_count labels: - by - aggregate buckets: [] - name: elephant_count type: counter - valuekey: elephant_count_value + valuekey: elephant_count_recent_count labels: - by - aggregate buckets: [] - name: service_count type: counter - valuekey: dest_service_count_value + valuekey: dest_service_recent_count labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_network_service.yaml b/network_definitions/bandwidth_per_network_service.yaml index 2e9a89a9c..0af3e36c0 100644 --- a/network_definitions/bandwidth_per_network_service.yaml +++ b/network_definitions/bandwidth_per_network_service.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_value + valuekey: bandwidth_network_service_recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_dest_subnet.yaml b/network_definitions/bandwidth_per_src_dest_subnet.yaml index 69bb5cb8f..f4ea6b7df 100644 --- a/network_definitions/bandwidth_per_src_dest_subnet.yaml +++ b/network_definitions/bandwidth_per_src_dest_subnet.yaml @@ -34,7 +34,7 @@ encode: metrics: - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_value + valuekey: bandwidth_source_destination_subnet_recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_subnet.yaml b/network_definitions/bandwidth_per_src_subnet.yaml index 8435b2e1b..12d3117c9 100644 --- a/network_definitions/bandwidth_per_src_subnet.yaml +++ b/network_definitions/bandwidth_per_src_subnet.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_value + valuekey: bandwidth_source_subnet_recent_op_value labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index 08e87613d..e91955d85 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -32,7 +32,7 @@ encode: metrics: - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_count_value + valuekey: dest_connection_subnet_recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_src_subnet.yaml b/network_definitions/connection_rate_per_src_subnet.yaml index cf95b6465..eb11ab5d1 100644 --- a/network_definitions/connection_rate_per_src_subnet.yaml +++ b/network_definitions/connection_rate_per_src_subnet.yaml @@ -27,7 +27,7 @@ encode: metrics: - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_value + valuekey: src_connection_count_recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_tcp_flags.yaml b/network_definitions/connection_rate_per_tcp_flags.yaml index 713332e9f..a7734de7c 100644 --- a/network_definitions/connection_rate_per_tcp_flags.yaml +++ b/network_definitions/connection_rate_per_tcp_flags.yaml @@ -21,7 +21,7 @@ encode: metrics: - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_count_value + valuekey: TCPFlags_recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_dst_as.yaml b/network_definitions/connections_per_dst_as.yaml index 4960c719b..73b6644f1 100644 --- a/network_definitions/connections_per_dst_as.yaml +++ b/network_definitions/connections_per_dst_as.yaml @@ -22,7 +22,7 @@ encode: metrics: - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_count_value + valuekey: dst_as_connection_recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_src_as.yaml b/network_definitions/connections_per_src_as.yaml index 7d761658a..2d8d7412e 100644 --- a/network_definitions/connections_per_src_as.yaml +++ b/network_definitions/connections_per_src_as.yaml @@ -22,7 +22,7 @@ encode: metrics: - name: connections_per_source_as type: counter - valuekey: src_as_connection_count_value + valuekey: src_as_connection_recent_count labels: - by - aggregate diff --git a/network_definitions/count_per_src_dest_subnet.yaml b/network_definitions/count_per_src_dest_subnet.yaml index d30a1f5b1..67cf68dca 100644 --- a/network_definitions/count_per_src_dest_subnet.yaml +++ b/network_definitions/count_per_src_dest_subnet.yaml @@ -33,7 +33,7 @@ encode: metrics: - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_value + valuekey: count_source_destination_subnet_recent_count labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_dest_subnet.yaml b/network_definitions/egress_bandwidth_per_dest_subnet.yaml index cad589a27..58d53bba3 100644 --- a/network_definitions/egress_bandwidth_per_dest_subnet.yaml +++ b/network_definitions/egress_bandwidth_per_dest_subnet.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_value + valuekey: bandwidth_destination_subnet_recent_op_value labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_namespace.yaml b/network_definitions/egress_bandwidth_per_namespace.yaml index 2b6fabd42..dcb0377fc 100644 --- a/network_definitions/egress_bandwidth_per_namespace.yaml +++ b/network_definitions/egress_bandwidth_per_namespace.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_value + valuekey: bandwidth_namespace_recent_op_value labels: - by - aggregate diff --git a/network_definitions/geo-location_rate_per_dest.yaml b/network_definitions/geo-location_rate_per_dest.yaml index 2502f4b66..db3426723 100644 --- a/network_definitions/geo-location_rate_per_dest.yaml +++ b/network_definitions/geo-location_rate_per_dest.yaml @@ -28,7 +28,7 @@ encode: metrics: - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_count_value + valuekey: dest_connection_location_recent_count labels: - by - aggregate diff --git a/network_definitions/mice_elephants.yaml b/network_definitions/mice_elephants.yaml index dd31e8975..4907054f0 100644 --- a/network_definitions/mice_elephants.yaml +++ b/network_definitions/mice_elephants.yaml @@ -36,13 +36,13 @@ encode: metrics: - name: mice_count type: counter - valuekey: mice_count_value + valuekey: mice_count_recent_count labels: - by - aggregate - name: elephant_count type: counter - valuekey: elephant_count_value + valuekey: elephant_count_recent_count labels: - by - aggregate diff --git a/network_definitions/network_services_count.yaml b/network_definitions/network_services_count.yaml index 94def0563..87a55d6ab 100644 --- a/network_definitions/network_services_count.yaml +++ b/network_definitions/network_services_count.yaml @@ -29,7 +29,7 @@ encode: metrics: - name: service_count type: counter - valuekey: dest_service_count_value + valuekey: dest_service_recent_count labels: - by - aggregate diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go new file mode 100644 index 000000000..2dc0eddb3 --- /dev/null +++ b/pkg/pipeline/aggregate_prom_test.go @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package pipeline + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/encode" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" + "github.com/netobserv/flowlogs-pipeline/pkg/test" + "github.com/stretchr/testify/require" +) + +func createEncodeOutput(name string, labels map[string]string, value float64) config.GenericMap { + gm := config.GenericMap{ + "Name": name, + "Labels": labels, + "value": value, + } + return gm +} + +// Test_Extract_Encode tests the integration between extract_aggregate and encode_prom. +// The test sends flows in 2 batches. Each batch is passed through the extractor and the encoder. +// The output of each stage is verified. +// The output of the 2nd batch depends on the 1st batch. +func Test_Extract_Encode(t *testing.T) { + // Setup + yamlConfig := ` +pipeline: + - name: extract + - name: encode +parameters: + - name: extract + extract: + type: aggregates + aggregates: + - name: bandwidth_sum + by: + - service + operation: sum + recordkey: bytes + + - name: bandwidth_count + by: + - service + operation: count + recordkey: + - name: encode + encode: + type: prom + prom: + port: 9103 + prefix: test_ + expirytime: 1 + metrics: + - name: flow_count + type: counter + valuekey: bandwidth_count_recent_count + labels: + - service + + - name: bytes_sum + type: counter + valuekey: bandwidth_sum_recent_op_value + labels: + - service + +# - name: bytes_histogram +# type: histogram +# valuekey: recentRawValues +# labels: +# - service +` + var err error + + v := test.InitConfig(t, yamlConfig) + require.NotNil(t, v) + + extractAggregate, err := extract.NewExtractAggregate(config.Parameters[0]) + require.NoError(t, err) + + promEncode, err := encode.NewEncodeProm(config.Parameters[1]) + require.Equal(t, err, nil) + + // Test cases + tests := []struct { + name string + inputBatch []config.GenericMap + expectedAggs []config.GenericMap + expectedEncode []config.GenericMap + }{ + { + name: "batch1", + inputBatch: []config.GenericMap{ + {"service": "http", "bytes": 10.0}, + {"service": "http", "bytes": 20.0}, + {"service": "tcp", "bytes": 1.0}, + {"service": "tcp", "bytes": 2.0}, + }, + expectedAggs: []config.GenericMap{ + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + }, + expectedEncode: []config.GenericMap{ + createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), + createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3), + // TODO: add the following test once raw_values operation and filters are implemented + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}), + }, + }, + { + name: "batch2", + inputBatch: []config.GenericMap{ + {"service": "http", "bytes": 30}, + {"service": "tcp", "bytes": 4}, + {"service": "tcp", "bytes": 5}, + }, + expectedAggs: []config.GenericMap{ + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2), + }, + expectedEncode: []config.GenericMap{ + createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), + createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualAggs := extractAggregate.Extract(tt.inputBatch) + // Since the order of the elements in the returned slice from Extract() and Encode() is non-deterministic, + // we use ElementsMatch() rather than Equals() + require.ElementsMatch(t, tt.expectedAggs, actualAggs) + + actualEncode := promEncode.Encode(actualAggs) + require.ElementsMatch(t, tt.expectedEncode, actualEncode) + }) + } +} diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 2f2a6f612..a35e87e21 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -126,6 +126,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap value: valueFloat, } entryMap := map[string]interface{}{ + // TODO: change to lower case "Name": e.prefix + metricName, "Labels": entryLabels, "value": valueFloat, @@ -140,9 +141,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap mInfo.promGauge.With(entryLabels).Set(valueFloat) cEntry.PromMetric.promGauge = mInfo.promGauge case api.PromEncodeOperationName("Counter"): - for _, v := range metric["recentRawValues"].([]float64) { - mInfo.promCounter.With(entryLabels).Add(v) - } + mInfo.promCounter.With(entryLabels).Add(valueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): for _, v := range metric["recentRawValues"].([]float64) { diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 2fda222d1..d3fc34b7e 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -47,9 +47,11 @@ type Aggregate struct { type GroupState struct { normalizedValues NormalizedValues - RecentRawValues []float64 - value float64 - count int + recentRawValues []float64 + recentOpValue float64 + recentCount int + totalValue float64 + totalCount int } func (aggregate Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bool) { @@ -97,17 +99,25 @@ func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, Normaliz return nil, normalizedValues } +func initValue(operation string) float64 { + switch operation { + case OperationSum, OperationAvg, OperationMax, OperationCount: + return 0 + case OperationMin: + return math.MaxFloat64 + default: + log.Panicf("unkown operation %v", operation) + return 0 + } +} + func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValues NormalizedValues) error { groupState, ok := aggregate.Groups[normalizedValues] if !ok { groupState = &GroupState{normalizedValues: normalizedValues} - switch string(aggregate.Definition.Operation) { - case OperationSum, OperationMax: - groupState.value = 0 - case OperationMin: - groupState.value = math.MaxFloat64 - default: - } + initVal := initValue(string(aggregate.Definition.Operation)) + groupState.totalValue = initVal + groupState.recentOpValue = initVal aggregate.Groups[normalizedValues] = groupState } @@ -116,31 +126,37 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu operation := aggregate.Definition.Operation if operation == OperationCount { - groupState.value = float64(groupState.count + 1) - groupState.RecentRawValues = append(groupState.RecentRawValues, 1) + groupState.totalValue = float64(groupState.totalCount + 1) + groupState.recentOpValue = float64(groupState.recentCount + 1) + groupState.recentRawValues = append(groupState.recentRawValues, 1) } else { if recordKey != "" { value, ok := entry[recordKey] if ok { valueString := fmt.Sprintf("%v", value) valueFloat64, _ := strconv.ParseFloat(valueString, 64) - groupState.RecentRawValues = append(groupState.RecentRawValues, valueFloat64) + groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) switch operation { case OperationSum: - groupState.value += valueFloat64 + groupState.totalValue += valueFloat64 + groupState.recentOpValue += valueFloat64 case OperationMax: - groupState.value = math.Max(groupState.value, valueFloat64) + groupState.totalValue = math.Max(groupState.totalValue, valueFloat64) + groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64) case OperationMin: - groupState.value = math.Min(groupState.value, valueFloat64) + groupState.totalValue = math.Min(groupState.totalValue, valueFloat64) + groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64) case OperationAvg: - groupState.value = (groupState.value*float64(groupState.count) + valueFloat64) / float64(groupState.count+1) + groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1) + groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) } } } } // update count - groupState.count += 1 + groupState.totalCount += 1 + groupState.recentCount += 1 return nil } @@ -167,20 +183,26 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error { func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { + // TODO: remove prefixes when filtering is implemented in prom encode. metrics = append(metrics, config.GenericMap{ - "name": aggregate.Definition.Name, - "operation": aggregate.Definition.Operation, - "record_key": aggregate.Definition.RecordKey, - "by": strings.Join(aggregate.Definition.By, ","), - "aggregate": string(group.normalizedValues), - "value": fmt.Sprintf("%f", group.value), - "recentRawValues": group.RecentRawValues, - "count": fmt.Sprintf("%d", group.count), - aggregate.Definition.Name + "_value": fmt.Sprintf("%f", group.value), - strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), + "name": aggregate.Definition.Name, + "operation": aggregate.Definition.Operation, + "record_key": aggregate.Definition.RecordKey, + "by": strings.Join(aggregate.Definition.By, ","), + "aggregate": string(group.normalizedValues), + "total_value": fmt.Sprintf("%f", group.totalValue), + "recentRawValues": group.recentRawValues, + "total_count": fmt.Sprintf("%d", group.totalCount), + aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, + aggregate.Definition.Name + "_recent_count": group.recentCount, + aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.totalValue), + strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) - // Once reported, we reset the raw values accumulation - group.RecentRawValues = make([]float64, 0) + // Once reported, we reset the recentXXX fields + group.recentRawValues = make([]float64, 0) + group.recentCount = 0 + initVal := initValue(string(aggregate.Definition.Operation)) + group.recentOpValue = initVal } return metrics diff --git a/pkg/pipeline/extract/aggregate/aggregate_test.go b/pkg/pipeline/extract/aggregate/aggregate_test.go index 78566fe54..81a47ef0a 100644 --- a/pkg/pipeline/extract/aggregate/aggregate_test.go +++ b/pkg/pipeline/extract/aggregate/aggregate_test.go @@ -116,8 +116,8 @@ func Test_Evaluate(t *testing.T) { err := aggregate.Evaluate(entries) require.Equal(t, err, nil) - require.Equal(t, aggregate.Groups[normalizedValues].count, 2) - require.Equal(t, aggregate.Groups[normalizedValues].value, float64(7)) + require.Equal(t, aggregate.Groups[normalizedValues].totalCount, 2) + require.Equal(t, aggregate.Groups[normalizedValues].totalValue, float64(7)) } func Test_GetMetrics(t *testing.T) { @@ -132,6 +132,6 @@ func Test_GetMetrics(t *testing.T) { require.Equal(t, len(metrics), 1) require.Equal(t, metrics[0]["name"], aggregate.Definition.Name) - valueFloat64, _ := strconv.ParseFloat(fmt.Sprintf("%s", metrics[0]["value"]), 64) + valueFloat64, _ := strconv.ParseFloat(fmt.Sprintf("%s", metrics[0]["total_value"]), 64) require.Equal(t, valueFloat64, float64(7)) } diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index b32e749f2..1648adf50 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -18,32 +18,14 @@ package extract import ( - "fmt" "testing" - "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/extract/aggregate" "github.com/netobserv/flowlogs-pipeline/pkg/test" "github.com/stretchr/testify/require" ) -func createAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64) config.GenericMap { - valueString := fmt.Sprintf("%f", value) - return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "value": valueString, - fmt.Sprintf("%v_value", name): valueString, - "recentRawValues": rrv, - "count": fmt.Sprintf("%v", count), - } -} - // This tests extract_aggregate as a whole. It can be thought of as an integration test between extract_aggregate.go and // aggregate.go and aggregates.go. The test sends flows in 2 batches and verifies the extractor's output after each // batch. The output of the 2nd batch depends on the 1st batch. @@ -110,16 +92,16 @@ parameters: {"service": "tcp", "bytes": 2}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}), - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}), - createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}), - createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}), - createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}), - createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}, 3, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}, 20, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}, 2, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}, 10, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}, 1, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}, 15, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}, 1.5, 2), }, }, { @@ -130,16 +112,16 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - createAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}), - createAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}), - createAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}), - createAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}), - createAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}), - createAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}), - createAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}), - createAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}), - createAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}), - createAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}, 9, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}, 5, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}, 4, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}, 30, 1), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}, 4.5, 2), }, }, } diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 063f971f4..96108651e 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -24,6 +24,7 @@ import ( "testing" jsoniter "github.com/json-iterator/go" + "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/spf13/viper" "github.com/stretchr/testify/require" @@ -113,3 +114,21 @@ func GetExtractMockEntry() config.GenericMap { } return entry } + +func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { + valueString := fmt.Sprintf("%f", value) + return config.GenericMap{ + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + fmt.Sprintf("%v_total_value", name): valueString, + "recentRawValues": rrv, + "total_count": fmt.Sprintf("%v", count), + name + "_recent_op_value": recentOpValue, + name + "_recent_count": recentCount, + } +}