From 2cfa7a255bd3f0ada8c21bb8dfb968f3bfbf3eb6 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Mar 2022 13:26:39 +0200 Subject: [PATCH 01/16] Add RawValuesOperation --- docs/api.md | 2 +- pkg/api/extract_aggregate.go | 2 +- pkg/pipeline/aggregate_prom_test.go | 24 +++++----- pkg/pipeline/extract/aggregate/aggregate.go | 45 +++++++++++-------- .../extract/extract_aggregate_test.go | 40 ++++++++--------- 5 files changed, 61 insertions(+), 52 deletions(-) diff --git a/docs/api.md b/docs/api.md index 71cfc3060..21a4de150 100644 --- a/docs/api.md +++ b/docs/api.md @@ -133,6 +133,6 @@ Following is the supported API format for specifying metrics aggregations: aggregates: Name: description of aggregation result By: list of fields on which to aggregate - Operation: sum, min, max, or avg + Operation: sum, min, max, avg or raw_values RecordKey: internal field on which to perform the operation \ No newline at end of file diff --git a/pkg/api/extract_aggregate.go b/pkg/api/extract_aggregate.go index fac8d2b3c..62f685f07 100644 --- a/pkg/api/extract_aggregate.go +++ b/pkg/api/extract_aggregate.go @@ -6,6 +6,6 @@ type AggregateOperation string type AggregateDefinition struct { Name string `yaml:"Name" doc:"description of aggregation result"` By AggregateBy `yaml:"By" doc:"list of fields on which to aggregate"` - Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, or avg"` + Operation AggregateOperation `yaml:"Operation" doc:"sum, min, max, avg or raw_values"` RecordKey string `yaml:"RecordKey" doc:"internal field on which to perform the operation"` } diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index bb3d68af9..62740c103 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -117,10 +117,10 @@ parameters: {"service": "tcp", "bytes": 2.0}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10, 20}, 30, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1, 2}, 3, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, nil, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, nil, 3, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, nil, 2, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), @@ -128,8 +128,8 @@ parameters: createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3), // TODO: add the following test once raw_values operation and filters are implemented - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, nil), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, nil), }, }, { @@ -140,18 +140,18 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30}, 30, 1), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4, 5}, 9, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1}, 1, 1), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1, 1}, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, nil, 9, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, nil, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, nil, 2, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, nil), + //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, nil), }, }, } diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 6cc32f524..5714cb52e 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -30,11 +30,12 @@ import ( ) const ( - OperationSum = "sum" - OperationAvg = "avg" - OperationMax = "max" - OperationMin = "min" - OperationCount = "count" + OperationSum = "sum" + OperationAvg = "avg" + OperationMax = "max" + OperationMin = "min" + OperationCount = "count" + OperationRawValues = "raw_values" ) type Labels map[string]string @@ -99,12 +100,15 @@ func (aggregate Aggregate) FilterEntry(entry config.GenericMap) (error, Normaliz return nil, normalizedValues } -func initValue(operation string) float64 { +func getInitValue(operation string) float64 { switch operation { case OperationSum, OperationAvg, OperationMax, OperationCount: return 0 case OperationMin: return math.MaxFloat64 + case OperationRawValues: + // Actually, in OperationRawValues the value is ignored. + return 0 default: log.Panicf("unkown operation %v", operation) return 0 @@ -115,9 +119,12 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu groupState, ok := aggregate.Groups[normalizedValues] if !ok { groupState = &GroupState{normalizedValues: normalizedValues} - initVal := initValue(string(aggregate.Definition.Operation)) + initVal := getInitValue(string(aggregate.Definition.Operation)) groupState.totalValue = initVal groupState.recentOpValue = initVal + if aggregate.Definition.Operation == OperationRawValues { + groupState.recentRawValues = make([]float64, 0) + } aggregate.Groups[normalizedValues] = groupState } @@ -128,14 +135,12 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu if operation == OperationCount { groupState.totalValue = float64(groupState.totalCount + 1) groupState.recentOpValue = float64(groupState.recentCount + 1) - groupState.recentRawValues = append(groupState.recentRawValues, 1) } else { if recordKey != "" { value, ok := entry[recordKey] if ok { valueString := fmt.Sprintf("%v", value) valueFloat64, _ := strconv.ParseFloat(valueString, 64) - groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) switch operation { case OperationSum: groupState.totalValue += valueFloat64 @@ -149,6 +154,8 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu case OperationAvg: groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1) groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1) + case OperationRawValues: + groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64) } } } @@ -184,12 +191,13 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { metrics = append(metrics, config.GenericMap{ - "name": aggregate.Definition.Name, - "operation": aggregate.Definition.Operation, - "record_key": aggregate.Definition.RecordKey, - "by": strings.Join(aggregate.Definition.By, ","), - "aggregate": string(group.normalizedValues), - "total_value": fmt.Sprintf("%f", group.totalValue), + "name": aggregate.Definition.Name, + "operation": aggregate.Definition.Operation, + "record_key": aggregate.Definition.RecordKey, + "by": strings.Join(aggregate.Definition.By, ","), + "aggregate": string(group.normalizedValues), + "total_value": fmt.Sprintf("%f", group.totalValue), + // TODO: change to snake_case "recentRawValues": group.recentRawValues, "total_count": fmt.Sprintf("%d", group.totalCount), "recent_op_value": group.recentOpValue, @@ -197,10 +205,11 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields - group.recentRawValues = make([]float64, 0) + if aggregate.Definition.Operation == OperationRawValues { + group.recentRawValues = make([]float64, 0) + } group.recentCount = 0 - initVal := initValue(string(aggregate.Definition.Operation)) - group.recentOpValue = initVal + group.recentOpValue = getInitValue(string(aggregate.Definition.Operation)) } return metrics diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 1648adf50..77fdd5892 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -92,16 +92,16 @@ parameters: {"service": "tcp", "bytes": 2}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, []float64{1.0, 1.0}, 2, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, []float64{10.0, 20.0}, 30, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, []float64{1.0, 2.0}, 3, 2), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, []float64{10.0, 20.0}, 20, 2), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, []float64{1.0, 2.0}, 2, 2), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, []float64{10.0, 20.0}, 10, 2), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, []float64{1.0, 2.0}, 1, 2), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, []float64{10.0, 20.0}, 15, 2), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, []float64{1.0, 2.0}, 1.5, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 30, 2, nil, 30, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, nil, 3, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 20, 2, nil, 20, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 2, nil, 10, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, nil, 1, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, nil, 15, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, nil, 1.5, 2), }, }, { @@ -112,16 +112,16 @@ parameters: {"service": "tcp", "bytes": 5}, }, expectedAggs: []config.GenericMap{ - test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, []float64{1.0}, 1, 1), - test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, []float64{1.0, 1.0}, 2, 2), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, []float64{4.0, 5.0}, 9, 2), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, []float64{4.0, 5.0}, 5, 2), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, []float64{4.0, 5.0}, 4, 2), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, []float64{30.0}, 30, 1), - test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, []float64{4.0, 5.0}, 4.5, 2), + test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, nil, 1, 1), + test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, nil, 2, 2), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "http", aggregate.OperationSum, 60, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, nil, 9, 2), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "http", aggregate.OperationMax, 30, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_max", "bytes", "service", "tcp", aggregate.OperationMax, 5, 4, nil, 5, 2), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "http", aggregate.OperationMin, 10, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, nil, 4, 2), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, nil, 30, 1), + test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, nil, 4.5, 2), }, }, } From bb6321d19cd8e591a8d6b5951fbf06a3cd2602de Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Mar 2022 14:00:42 +0200 Subject: [PATCH 02/16] Rename variable --- pkg/test/utils.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 8b8dc5878..8a7705a06 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -115,8 +115,8 @@ func GetExtractMockEntry() config.GenericMap { return entry } -func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { - valueString := fmt.Sprintf("%f", value) +func CreateMockAgg(name, recordKey, by, agg, op string, totalValue float64, totalCount int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { + valueString := fmt.Sprintf("%f", totalValue) return config.GenericMap{ "name": name, "record_key": recordKey, @@ -126,7 +126,7 @@ func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int "operation": api.AggregateOperation(op), "total_value": valueString, "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", count), + "total_count": fmt.Sprintf("%v", totalCount), "recent_op_value": recentOpValue, "recent_count": recentCount, } From 0a2a2b7164fabeaa3c9be2e6b62490774863c8a1 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Mar 2022 14:03:22 +0200 Subject: [PATCH 03/16] Add tests for raw_values operation --- pkg/pipeline/aggregate_prom_test.go | 10 ++++++++++ pkg/pipeline/extract/extract_aggregate_test.go | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 62740c103..ae48f204b 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -62,6 +62,12 @@ parameters: - service operation: count recordkey: + + - name: bandwidth_raw_values + by: + - service + operation: raw_values + recordkey: bytes - name: encode encode: type: prom @@ -121,6 +127,8 @@ parameters: test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 3, 2, nil, 3, 2), test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 2, 2, nil, 2, 2), test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 2, 2, nil, 2, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 2, []float64{10, 20}, 0, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 2, []float64{1, 2}, 0, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), @@ -144,6 +152,8 @@ parameters: test.CreateMockAgg("bandwidth_sum", "bytes", "service", "tcp", aggregate.OperationSum, 12, 4, nil, 9, 2), test.CreateMockAgg("bandwidth_count", "", "service", "http", aggregate.OperationCount, 3, 3, nil, 1, 1), test.CreateMockAgg("bandwidth_count", "", "service", "tcp", aggregate.OperationCount, 4, 4, nil, 2, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 3, []float64{30}, 0, 1), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 4, []float64{4, 5}, 0, 2), }, expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), diff --git a/pkg/pipeline/extract/extract_aggregate_test.go b/pkg/pipeline/extract/extract_aggregate_test.go index 77fdd5892..17c030b58 100644 --- a/pkg/pipeline/extract/extract_aggregate_test.go +++ b/pkg/pipeline/extract/extract_aggregate_test.go @@ -68,6 +68,12 @@ parameters: - service operation: avg recordkey: bytes + + - name: bandwidth_raw_values + by: + - service + operation: raw_values + recordkey: bytes ` var err error @@ -102,6 +108,8 @@ parameters: test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 2, nil, 1, 2), test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 15, 2, nil, 15, 2), test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 1.5, 2, nil, 1.5, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 2, []float64{10, 20}, 0, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 2, []float64{1, 2}, 0, 2), }, }, { @@ -122,6 +130,8 @@ parameters: test.CreateMockAgg("bandwidth_min", "bytes", "service", "tcp", aggregate.OperationMin, 1, 4, nil, 4, 2), test.CreateMockAgg("bandwidth_avg", "bytes", "service", "http", aggregate.OperationAvg, 20, 3, nil, 30, 1), test.CreateMockAgg("bandwidth_avg", "bytes", "service", "tcp", aggregate.OperationAvg, 3, 4, nil, 4.5, 2), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "http", aggregate.OperationRawValues, 0, 3, []float64{30}, 0, 1), + test.CreateMockAgg("bandwidth_raw_values", "bytes", "service", "tcp", aggregate.OperationRawValues, 0, 4, []float64{4, 5}, 0, 2), }, }, } From 52fcd015974d7f10c776c8c27a405d38ea5c6f64 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Mar 2022 14:09:02 +0200 Subject: [PATCH 04/16] Change key to snake_case --- pkg/pipeline/aggregate_prom_test.go | 2 +- pkg/pipeline/encode/encode_prom.go | 2 +- pkg/pipeline/extract/aggregate/aggregate.go | 8 ++--- pkg/test/utils.go | 35 ++++++++++----------- 4 files changed, 23 insertions(+), 24 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index ae48f204b..c9be068f1 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -92,7 +92,7 @@ parameters: # - name: bytes_histogram # type: histogram -# valuekey: recentRawValues +# valuekey: recent_raw_values # labels: # - service ` diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index f3ab97797..d8dbf13a9 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -156,7 +156,7 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener mInfo.promCounter.With(entryLabels).Add(valueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): - for _, v := range metricRecord["recentRawValues"].([]float64) { + for _, v := range metricRecord["recent_raw_values"].([]float64) { mInfo.promHist.With(entryLabels).Observe(v) } cEntry.PromMetric.promHist = mInfo.promHist diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 5714cb52e..827db1839 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -198,10 +198,10 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "aggregate": string(group.normalizedValues), "total_value": fmt.Sprintf("%f", group.totalValue), // TODO: change to snake_case - "recentRawValues": group.recentRawValues, - "total_count": fmt.Sprintf("%d", group.totalCount), - "recent_op_value": group.recentOpValue, - "recent_count": group.recentCount, + "recent_raw_values": group.recentRawValues, + "total_count": fmt.Sprintf("%d", group.totalCount), + "recent_op_value": group.recentOpValue, + "recent_count": group.recentCount, strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 8a7705a06..2042e7982 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -104,13 +104,12 @@ func InitConfig(t *testing.T, conf string) *viper.Viper { func GetExtractMockEntry() config.GenericMap { entry := config.GenericMap{ - "srcAddr": "10.1.2.3", - "dstAddr": "10.1.2.4", - "srcPort": "9001", - "dstPort": "39504", - "bytes": "1234", - "packets": "34", - "recentRawValues": []float64{1.1, 2.2}, + "srcAddr": "10.1.2.3", + "dstAddr": "10.1.2.4", + "srcPort": "9001", + "dstPort": "39504", + "bytes": "1234", + "packets": "34", } return entry } @@ -118,16 +117,16 @@ func GetExtractMockEntry() config.GenericMap { func CreateMockAgg(name, recordKey, by, agg, op string, totalValue float64, totalCount int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { valueString := fmt.Sprintf("%f", totalValue) return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "total_value": valueString, - "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", totalCount), - "recent_op_value": recentOpValue, - "recent_count": recentCount, + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + "recent_raw_values": rrv, + "total_count": fmt.Sprintf("%v", totalCount), + "recent_op_value": recentOpValue, + "recent_count": recentCount, } } From 8e19a03f12f9684544f3c1680c7bf50bac0c3fbd Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Mar 2022 15:57:30 +0200 Subject: [PATCH 05/16] Add a hack to make histograms work --- pkg/pipeline/aggregate_prom_test.go | 22 ++++++++++++---------- pkg/pipeline/encode/encode_prom.go | 3 ++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index c9be068f1..895d65066 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -90,11 +90,12 @@ parameters: labels: - service -# - name: bytes_histogram -# type: histogram -# valuekey: recent_raw_values -# labels: -# - service + - name: bytes_histogram + type: histogram + filter: {key: name, value: bandwidth_raw_values} + valuekey: recent_raw_values + labels: + - service ` var err error @@ -135,9 +136,9 @@ parameters: createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3), - // TODO: add the following test once raw_values operation and filters are implemented - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, nil), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, nil), + // TODO: fix the value of the following 2 entries + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, 0), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, 0), }, }, { @@ -160,8 +161,9 @@ parameters: createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, nil), - //createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, nil), + // TODO: fix the value of the following 2 entries + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, 0), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, 0), }, }, } diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index d8dbf13a9..1fb1257b3 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -121,7 +121,8 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener } metricValueString := fmt.Sprintf("%v", metricValue) valueFloat, err := strconv.ParseFloat(metricValueString, 64) - if err != nil { + // TODO: fix hack + if err != nil && mInfo.input != "recent_raw_values" { log.Debugf("field cannot be converted to float: %v, %s", metricValue, metricValueString) continue } From a74e304a0457592285655b357b81111201562cb4 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Mar 2022 17:46:20 +0300 Subject: [PATCH 06/16] Remove unused field --- pkg/pipeline/encode/encode_prom.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 1fb1257b3..8c373b9df 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -62,7 +62,6 @@ type entrySignature struct { type entryInfo struct { eInfo entrySignature - value float64 } type metricCacheEntry struct { @@ -136,7 +135,6 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener Name: e.prefix + metricName, Labels: entryLabels, }, - value: valueFloat, } entryMap := map[string]interface{}{ // TODO: change to lower case From 53c9aa93b59c08e3da032a814ce145d5d8b78bd9 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Mar 2022 17:50:54 +0300 Subject: [PATCH 07/16] Pass on unparsed value --- pkg/pipeline/encode/encode_prom.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 8c373b9df..e18be0847 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -140,7 +140,7 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener // TODO: change to lower case "Name": e.prefix + metricName, "Labels": entryLabels, - "value": valueFloat, + "value": metricValue, } out = append(out, entryMap) From 3bb3995052a51e83f235365e6ac21d44a2be3b5c Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 27 Mar 2022 18:01:45 +0300 Subject: [PATCH 08/16] Fix tests --- pkg/pipeline/aggregate_prom_test.go | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 895d65066..188edeba7 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -func createEncodeOutput(name string, labels map[string]string, value float64) config.GenericMap { +func createEncodeOutput(name string, labels map[string]string, value interface{}) config.GenericMap { gm := config.GenericMap{ "Name": name, "Labels": labels, @@ -134,11 +134,10 @@ parameters: expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 2), createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3), - // TODO: fix the value of the following 2 entries - createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, 0), - createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, 0), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30.0), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 3.0), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{10, 20}), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{1, 2}), }, }, { @@ -159,11 +158,10 @@ parameters: expectedEncode: []config.GenericMap{ createEncodeOutput("test_flow_count", map[string]string{"service": "http"}, 1), createEncodeOutput("test_flow_count", map[string]string{"service": "tcp"}, 2), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30), - createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9), - // TODO: fix the value of the following 2 entries - createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, 0), - createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, 0), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "http"}, 30.0), + createEncodeOutput("test_bytes_sum", map[string]string{"service": "tcp"}, 9.0), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "http"}, []float64{30}), + createEncodeOutput("test_bytes_histogram", map[string]string{"service": "tcp"}, []float64{4, 5}), }, }, } From 8d5008eb998ff1233b99ce8a2d8aa8f45b5312f4 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 29 Mar 2022 11:00:31 +0300 Subject: [PATCH 09/16] Fix tests --- pkg/pipeline/encode/encode_prom_test.go | 23 +++++++++++------------ pkg/test/utils.go | 8 ++++---- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index f25d786e3..631e388d9 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -108,19 +108,19 @@ func Test_NewEncodeProm(t *testing.T) { gEntryInfo1 := config.GenericMap{ "Name": "test_Bytes", "Labels": entryLabels1, - "value": float64(1234), + "value": 1234, } gEntryInfo2 := config.GenericMap{ "Name": "test_Packets", "Labels": entryLabels2, - "value": float64(34), + "value": 34, } require.Contains(t, output, gEntryInfo1) require.Contains(t, output, gEntryInfo2) gaugeA, err := gInfo.promGauge.GetMetricWith(entryLabels1) require.Equal(t, nil, err) bytesA := testutil.ToFloat64(gaugeA) - require.Equal(t, gEntryInfo1["value"], bytesA) + require.Equal(t, gEntryInfo1["value"], int(bytesA)) // verify entries are in cache; one for the gauge and one for the counter entriesMap := encodeProm.mCache @@ -148,14 +148,13 @@ func Test_NewEncodeProm(t *testing.T) { func Test_EncodeAggregate(t *testing.T) { metrics := []config.GenericMap{{ - "name": "test_aggregate", - "operation": "sum", - "record_key": "IP", - "by": "[dstIP srcIP]", - "aggregate": "20.0.0.2,10.0.0.1", - "value": "7", - "test_aggregate" + "_value": "7", - "count": "1", + "name": "test_aggregate", + "operation": "sum", + "record_key": "IP", + "by": "[dstIP srcIP]", + "aggregate": "20.0.0.2,10.0.0.1", + "value": 7.0, + "count": 1, }} newEncode := &encodeProm{ @@ -163,7 +162,7 @@ func Test_EncodeAggregate(t *testing.T) { prefix: "test_", metrics: map[string]metricInfo{ "gauge": { - input: "test_aggregate_value", + input: "value", filter: keyValuePair{ key: "name", value: "test_aggregate", diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 2042e7982..c8cf35342 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -106,10 +106,10 @@ func GetExtractMockEntry() config.GenericMap { entry := config.GenericMap{ "srcAddr": "10.1.2.3", "dstAddr": "10.1.2.4", - "srcPort": "9001", - "dstPort": "39504", - "bytes": "1234", - "packets": "34", + "srcPort": 9001, + "dstPort": 39504, + "bytes": 1234, + "packets": 34, } return entry } From 0d1bf3cfc6a690414c53ac22e01d1b602cb41170 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 29 Mar 2022 11:25:56 +0300 Subject: [PATCH 10/16] Fix histogram value source --- pkg/pipeline/encode/encode_prom.go | 42 +++++++++++++------------ pkg/pipeline/utils/convert.go | 49 ++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 19 deletions(-) create mode 100644 pkg/pipeline/utils/convert.go diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index e18be0847..645b15faf 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -22,7 +22,6 @@ import ( "fmt" "net/http" "os" - "strconv" "sync" "time" @@ -118,14 +117,6 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener log.Errorf("field %v is missing", mInfo.input) continue } - metricValueString := fmt.Sprintf("%v", metricValue) - valueFloat, err := strconv.ParseFloat(metricValueString, 64) - // TODO: fix hack - if err != nil && mInfo.input != "recent_raw_values" { - log.Debugf("field cannot be converted to float: %v, %s", metricValue, metricValueString) - continue - } - log.Debugf("metricName = %v, metricValue = %v, valueFloat = %v", metricName, metricValue, valueFloat) entryLabels := make(map[string]string, len(mInfo.labelNames)) for _, t := range mInfo.labelNames { entryLabels[t] = fmt.Sprintf("%v", metricRecord[t]) @@ -136,30 +127,43 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener Labels: entryLabels, }, } - entryMap := map[string]interface{}{ - // TODO: change to lower case - "Name": e.prefix + metricName, - "Labels": entryLabels, - "value": metricValue, - } - out = append(out, entryMap) cEntry := e.saveEntryInCache(entry, entryLabels) cEntry.PromMetric.metricType = mInfo.PromMetric.metricType // push the metric record to prometheus switch mInfo.PromMetric.metricType { case api.PromEncodeOperationName("Gauge"): - mInfo.promGauge.With(entryLabels).Set(valueFloat) + metricValueFloat, err := utils.ConvertToFloat64(metricValue) + if err != nil { + log.Errorf("field cannot be converted to float: %v, %v", metricValue, err) + continue + } + mInfo.promGauge.With(entryLabels).Set(metricValueFloat) cEntry.PromMetric.promGauge = mInfo.promGauge case api.PromEncodeOperationName("Counter"): - mInfo.promCounter.With(entryLabels).Add(valueFloat) + metricValueFloat, err := utils.ConvertToFloat64(metricValue) + if err != nil { + log.Errorf("field cannot be converted to float: %v, %v", metricValue, err) + continue + } + mInfo.promCounter.With(entryLabels).Add(metricValueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): - for _, v := range metricRecord["recent_raw_values"].([]float64) { + // TODO: Check what happens if not a slice + metricValueSlice := metricValue.([]float64) + for _, v := range metricValueSlice { mInfo.promHist.With(entryLabels).Observe(v) } cEntry.PromMetric.promHist = mInfo.promHist } + + entryMap := map[string]interface{}{ + // TODO: change to lower case + "Name": e.prefix + metricName, + "Labels": entryLabels, + "value": metricValue, + } + out = append(out, entryMap) } return out } diff --git a/pkg/pipeline/utils/convert.go b/pkg/pipeline/utils/convert.go new file mode 100644 index 000000000..e5feeeadf --- /dev/null +++ b/pkg/pipeline/utils/convert.go @@ -0,0 +1,49 @@ +package utils + +import ( + "fmt" + "math" + "reflect" + "strconv" +) + +var floatType = reflect.TypeOf(float64(0)) +var stringType = reflect.TypeOf("") + +// ConvertToFloat64 converts an unknown type to float +// Based on https://stackoverflow.com/a/20767884/2749989 +func ConvertToFloat64(unk interface{}) (float64, error) { + switch i := unk.(type) { + case float64: + return i, nil + case float32: + return float64(i), nil + case int64: + return float64(i), nil + case int32: + return float64(i), nil + case int: + return float64(i), nil + case uint64: + return float64(i), nil + case uint32: + return float64(i), nil + case uint: + return float64(i), nil + case string: + return strconv.ParseFloat(i, 64) + default: + v := reflect.ValueOf(unk) + v = reflect.Indirect(v) + if v.Type().ConvertibleTo(floatType) { + fv := v.Convert(floatType) + return fv.Float(), nil + } else if v.Type().ConvertibleTo(stringType) { + sv := v.Convert(stringType) + s := sv.String() + return strconv.ParseFloat(s, 64) + } else { + return math.NaN(), fmt.Errorf("Can't convert %v to float64", v.Type()) + } + } +} From 0ccd69fc23cae7c2c9bd70dab670e7b23af72eb6 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 29 Mar 2022 11:45:43 +0300 Subject: [PATCH 11/16] Add info to error message --- pkg/pipeline/encode/encode_prom.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 645b15faf..96621bd01 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -135,7 +135,7 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener case api.PromEncodeOperationName("Gauge"): metricValueFloat, err := utils.ConvertToFloat64(metricValue) if err != nil { - log.Errorf("field cannot be converted to float: %v, %v", metricValue, err) + log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue) continue } mInfo.promGauge.With(entryLabels).Set(metricValueFloat) @@ -143,14 +143,17 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener case api.PromEncodeOperationName("Counter"): metricValueFloat, err := utils.ConvertToFloat64(metricValue) if err != nil { - log.Errorf("field cannot be converted to float: %v, %v", metricValue, err) + log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue) continue } mInfo.promCounter.With(entryLabels).Add(metricValueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): - // TODO: Check what happens if not a slice - metricValueSlice := metricValue.([]float64) + metricValueSlice, ok := metricValue.([]float64) + if !ok { + log.Errorf("value is not []float64. metric: %v, key: %v, value: %v", metricName, mInfo.input, metricValue) + continue + } for _, v := range metricValueSlice { mInfo.promHist.With(entryLabels).Observe(v) } From 21d109d28aea54d79df9ff73a59820ff9a4cb176 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 29 Mar 2022 11:59:10 +0300 Subject: [PATCH 12/16] Remove a comment --- pkg/pipeline/encode/encode_prom.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 96621bd01..47aff531a 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -103,7 +103,6 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap { log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord) - // TODO: We may need different handling for histograms out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { val, keyFound := metricRecord[mInfo.filter.key] From 78a4b981fd0aebb9178e4ce72b48cb7ac089952e Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 29 Mar 2022 16:52:16 +0300 Subject: [PATCH 13/16] Update docs --- docs/confGenerator.md | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/docs/confGenerator.md b/docs/confGenerator.md index eb38b6815..9ad49da64 100644 --- a/docs/confGenerator.md +++ b/docs/confGenerator.md @@ -134,15 +134,17 @@ this actually moves the data from being log lines into being a metric named (8.2 > For additional details on `extract aggregates` > refer to [README.md](../README.md#aggregates). -(9) Next, the metrics from (8.2) are sent to prometheus (9.1). +(9) Next, the metrics from (8.2) are sent to prometheus (9.1).
The metric name in prometheus will be called as the value of (9.2) with -the prefix from the `config.yaml` file. -The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram). -The filter field (9.4) determines which aggregates will take into account. -The key should be `"name"` and the value should match the aggregate name (8.2) -The value to be used by prometheus is taken from the field defined in (9.5). -For `Gauges`, use `total_value` or `total_count`. For `Counters`, use `recent_op_value` or `recent_count`. -Prometheus will add labels to the metric based on the (9.6) fields. +the prefix from the `config.yaml` file.
+The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram).
+The filter field (9.4) determines which aggregates will be taken into account.
+The key should be `"name"` and the value should match the aggregate name (8.2).
+The value to be used by prometheus is taken from the field defined in (9.5).
+For `Gauge`, use `total_value` or `total_count`.
+For `Counter`, use `recent_op_value` or `recent_count`.
+For `Histogram`, use `recent_raw_values`.
+Prometheus will add labels to the metric based on the (9.6) fields.
(10) next, using grafana to visualize the metric with name from (9.2) including the prefix and using the prometheus expression from (10.1). From 03842db752cbb394d7ab4a3d625e574c653a9649 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 30 Mar 2022 11:58:11 +0300 Subject: [PATCH 14/16] Add documentation --- README.md | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8472893b8..9f0e65cad 100644 --- a/README.md +++ b/README.md @@ -463,7 +463,7 @@ It is possible to define aggregates per `srcIP` or per `dstIP` of per the tuple to capture the `sum`, `min`, `avg` etc. of the values in the field `value`. For example, configuration record for aggregating field `value` as -average for `srcIP`x`dstIP` tuples will look like this:: +average for `srcIP`x`dstIP` tuples will look like this: ```yaml pipeline: @@ -482,6 +482,31 @@ parameters: RecordKey: "value" ``` +The output fields of the aggregates stage are: +- `name` +- `operation` +- `record_key` +- `by` +- `aggregate` +- `total_value`: the total aggregate value +- `total_count`: the total count +- `recent_raw_values`: a slice with the raw values of the recent batch +- `recent_op_value`: the aggregate value of the recent batch +- `recent_count`: the count of flowlogs in the recent batch + +These fields are used by the next stage (for example `prom` encoder). +The pipeline processes flowlogs in batches. +The output fields with `recent_` prefix are related to the recent batch. +They are needed when exposing metrics in Prometheus using Counters and Histograms. +Prometheus Counters API accepts the amount to be added to the counter and not the final value as in Gauges. +In this case, `recent_op_value` and `recent_count` should be used as the `valuekey`. +The API of Histograms accepts the sample value, so it could be added to the right bucket. +In this case, we are interested in the raw values of the records in the aggregation group. +No aggregate operation is needed and it should be set `raw_values`. The `valuekey` should be set to `recent_raw_values`. + +**Note**: `recent_raw_values` is filled only when the operation is `raw_values`. + + ### Prometheus encoder The prometheus encoder specifies which metrics to export to prometheus and which labels should be associated with those metrics. From 45444521b847faf5efa2b9d3dc1093602cb84056 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 30 Mar 2022 11:58:34 +0300 Subject: [PATCH 15/16] Remove a comment --- pkg/pipeline/extract/aggregate/aggregate.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index 827db1839..35a24b968 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -191,15 +191,14 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { metrics = append(metrics, config.GenericMap{ - "name": aggregate.Definition.Name, - "operation": aggregate.Definition.Operation, - "record_key": aggregate.Definition.RecordKey, - "by": strings.Join(aggregate.Definition.By, ","), - "aggregate": string(group.normalizedValues), - "total_value": fmt.Sprintf("%f", group.totalValue), - // TODO: change to snake_case - "recent_raw_values": group.recentRawValues, + "name": aggregate.Definition.Name, + "operation": aggregate.Definition.Operation, + "record_key": aggregate.Definition.RecordKey, + "by": strings.Join(aggregate.Definition.By, ","), + "aggregate": string(group.normalizedValues), + "total_value": fmt.Sprintf("%f", group.totalValue), "total_count": fmt.Sprintf("%d", group.totalCount), + "recent_raw_values": group.recentRawValues, "recent_op_value": group.recentOpValue, "recent_count": group.recentCount, strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), From d5cfdbfdd1dcd4659006e91485bfa9e3381cc87a Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 31 Mar 2022 16:18:38 +0300 Subject: [PATCH 16/16] Address Eran's comment: fix wording --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9f0e65cad..ae40a828c 100644 --- a/README.md +++ b/README.md @@ -498,9 +498,9 @@ These fields are used by the next stage (for example `prom` encoder). The pipeline processes flowlogs in batches. The output fields with `recent_` prefix are related to the recent batch. They are needed when exposing metrics in Prometheus using Counters and Histograms. -Prometheus Counters API accepts the amount to be added to the counter and not the final value as in Gauges. +Prometheus Counters API accepts the delta amount to be added to the counter and not the total value as in Gauges. In this case, `recent_op_value` and `recent_count` should be used as the `valuekey`. -The API of Histograms accepts the sample value, so it could be added to the right bucket. +The API of Histograms accepts the sample value, so it could be added to the appropriate bucket. In this case, we are interested in the raw values of the records in the aggregation group. No aggregate operation is needed and it should be set `raw_values`. The `valuekey` should be set to `recent_raw_values`.