From a4a1db83e37698ae41c1f638684801c89feb29df Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 13:36:00 +0200 Subject: [PATCH 01/13] Add filter to prom encoder --- pkg/api/encode_prom.go | 16 +++++++++++----- pkg/pipeline/encode/encode_prom.go | 22 ++++++++++++++++------ 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 57670ea17..37c1a2fc8 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -35,11 +35,17 @@ func PromEncodeOperationName(operation string) string { } type PromMetricsItem struct { - Name string `yaml:"name" doc:"the metric name"` - Type string `yaml:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` - ValueKey string `yaml:"valuekey" doc:"entry key from which to resolve metric value"` - Labels []string `yaml:"labels" doc:"labels to be associated with the metric"` - Buckets []float64 `yaml:"buckets" doc:"histogram buckets"` + Name string `yaml:"name" doc:"the metric name"` + Type string `yaml:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` + Filter PromMetricsFilter `yaml:"filter" doc:"the criterion to filter entries by"` + ValueKey string `yaml:"valuekey" doc:"entry key from which to resolve metric value"` + Labels []string `yaml:"labels" doc:"labels to be associated with the metric"` + Buckets []float64 `yaml:"buckets" doc:"histogram buckets"` } type PromMetricsItems []PromMetricsItem + +type PromMetricsFilter struct { + Key string `yaml:"key" doc:"the key to match and filter by"` + Value string `yaml:"value" doc:"the value to match and filter by"` +} diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index a35e87e21..47d3c668c 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -44,8 +44,10 @@ type PromMetric struct { } type metricInfo struct { - input string - labelNames []string + input string + filterKey string + filterValue string + labelNames []string PromMetric } @@ -102,9 +104,15 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap // TODO: We may need different handling for histograms out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { + val, keyFound := metric[mInfo.filterKey] + shouldKeepRecord := keyFound && val == mInfo.filterValue + if !shouldKeepRecord { + continue + } + metricValue, ok := metric[mInfo.input] if !ok { - log.Debugf("field %v is missing", metricName) + log.Errorf("field %v is missing", mInfo.input) continue } metricValueString := fmt.Sprintf("%v", metricValue) @@ -302,9 +310,11 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { continue } metrics[mInfo.Name] = metricInfo{ - input: mInfo.ValueKey, - labelNames: labels, - PromMetric: pMetric, + input: mInfo.ValueKey, + filterKey: mInfo.Filter.Key, + filterValue: mInfo.Filter.Value, + labelNames: labels, + PromMetric: pMetric, } } From 157557631b26b0173da5c5eba5bd5fb4c521113f Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 13:47:54 +0200 Subject: [PATCH 02/13] Fix tests --- pkg/pipeline/aggregate_prom_test.go | 2 ++ pkg/pipeline/encode/encode_prom_test.go | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 2dc0eddb3..62f7e528c 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -72,12 +72,14 @@ parameters: metrics: - name: flow_count type: counter + filter: {key: name, value: bandwidth_count} valuekey: bandwidth_count_recent_count labels: - service - name: bytes_sum type: counter + filter: {key: name, value: bandwidth_sum} valuekey: bandwidth_sum_recent_op_value labels: - service diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 43094f3bc..6e24bba80 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -44,6 +44,7 @@ parameters: metrics: - name: Bytes type: gauge + filter: {key: dstAddr, value: 10.1.2.4} valuekey: bytes labels: - srcAddr @@ -51,6 +52,7 @@ parameters: - srcPort - name: Packets type: counter + filter: {key: dstAddr, value: 10.1.2.4} valuekey: packets labels: - srcAddr @@ -161,8 +163,10 @@ func Test_EncodeAggregate(t *testing.T) { prefix: "test_", metrics: map[string]metricInfo{ "gauge": { - input: "test_aggregate_value", - labelNames: []string{"by", "aggregate"}, + input: "test_aggregate_value", + filterKey: "name", + filterValue: "test_aggregate", + labelNames: []string{"by", "aggregate"}, }, }, mList: list.New(), From 0e3f31e32bbed245fcabd65a53a68db6e7ebafea Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 15:03:44 +0200 Subject: [PATCH 03/13] Rename a variable metric->metricRecord I find it confusing to have an argument variable named "metric" and a data member named "metrics" --- pkg/pipeline/encode/encode_prom.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 47d3c668c..3471fb470 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -99,18 +99,18 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { return out } -func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap { - log.Debugf("entering EncodeMetric metric = %v", metric) +func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap { + log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord) // TODO: We may need different handling for histograms out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { - val, keyFound := metric[mInfo.filterKey] + val, keyFound := metricRecord[mInfo.filterKey] shouldKeepRecord := keyFound && val == mInfo.filterValue if !shouldKeepRecord { continue } - metricValue, ok := metric[mInfo.input] + metricValue, ok := metricRecord[mInfo.input] if !ok { log.Errorf("field %v is missing", mInfo.input) continue @@ -124,7 +124,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap log.Debugf("metricName = %v, metricValue = %v, valueFloat = %v", metricName, metricValue, valueFloat) entryLabels := make(map[string]string, len(mInfo.labelNames)) for _, t := range mInfo.labelNames { - entryLabels[t] = fmt.Sprintf("%v", metric[t]) + entryLabels[t] = fmt.Sprintf("%v", metricRecord[t]) } entry := entryInfo{ eInfo: entrySignature{ @@ -143,7 +143,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap cEntry := e.saveEntryInCache(entry, entryLabels) cEntry.PromMetric.metricType = mInfo.PromMetric.metricType - // push the metric to prometheus + // push the metric record to prometheus switch mInfo.PromMetric.metricType { case api.PromEncodeOperationName("Gauge"): mInfo.promGauge.With(entryLabels).Set(valueFloat) @@ -152,7 +152,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap mInfo.promCounter.With(entryLabels).Add(valueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): - for _, v := range metric["recentRawValues"].([]float64) { + for _, v := range metricRecord["recentRawValues"].([]float64) { mInfo.promHist.With(entryLabels).Observe(v) } cEntry.PromMetric.promHist = mInfo.promHist From 98c7111682d5b5b48d26be9ec0202ee4d3661617 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 15:12:08 +0200 Subject: [PATCH 04/13] Remove prefix --- pkg/pipeline/aggregate_prom_test.go | 4 ++-- pkg/pipeline/extract/aggregate/aggregate.go | 8 +++----- pkg/test/utils.go | 4 ++-- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 62f7e528c..bb3d68af9 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -73,14 +73,14 @@ parameters: - name: flow_count type: counter filter: {key: name, value: bandwidth_count} - valuekey: bandwidth_count_recent_count + valuekey: recent_count labels: - service - name: bytes_sum type: counter filter: {key: name, value: bandwidth_sum} - valuekey: bandwidth_sum_recent_op_value + valuekey: recent_op_value labels: - service diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index d3fc34b7e..6cc32f524 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -183,7 +183,6 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error { func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { - // TODO: remove prefixes when filtering is implemented in prom encode. metrics = append(metrics, config.GenericMap{ "name": aggregate.Definition.Name, "operation": aggregate.Definition.Operation, @@ -193,10 +192,9 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "total_value": fmt.Sprintf("%f", group.totalValue), "recentRawValues": group.recentRawValues, "total_count": fmt.Sprintf("%d", group.totalCount), - aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, - aggregate.Definition.Name + "_recent_count": group.recentCount, - aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.totalValue), - strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), + "recent_op_value": group.recentOpValue, + "recent_count": group.recentCount, + strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields group.recentRawValues = make([]float64, 0) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 96108651e..beac13c49 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -128,7 +128,7 @@ func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int fmt.Sprintf("%v_total_value", name): valueString, "recentRawValues": rrv, "total_count": fmt.Sprintf("%v", count), - name + "_recent_op_value": recentOpValue, - name + "_recent_count": recentCount, + "recent_op_value": recentOpValue, + "recent_count": recentCount, } } From 3c5ed3c164f06481014a0aa2bd0e654c3f824f96 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 15:17:38 +0200 Subject: [PATCH 05/13] Remove unique ValueKey constraint --- pkg/confgen/encode.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/pkg/confgen/encode.go b/pkg/confgen/encode.go index 33e6fd09b..f2c36fb06 100644 --- a/pkg/confgen/encode.go +++ b/pkg/confgen/encode.go @@ -40,17 +40,7 @@ func (cg *ConfGen) parseEncode(encode *map[string]interface{}) (*api.PromEncode, log.Debugf("Unmarshal aggregate.Definitions err: %v ", err) return nil, err } - - // validate that ValueKey is unique - for _, existingPromMetric := range cg.promMetrics { - for _, newPromMetric := range prom.Metrics { - if existingPromMetric.ValueKey == newPromMetric.ValueKey { - log.Panicf("error in parseEncode: ValueKey %s overlaps, metric encoding ignored !!!! ", newPromMetric.ValueKey) - return nil, err - } - } - } - + cg.promMetrics = append(cg.promMetrics, prom.Metrics...) return &prom, nil } From c02716ef5bed5369f1758f12e2b3854ae480e571 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 15:18:11 +0200 Subject: [PATCH 06/13] Update network_definitions --- .../kubernetes/flowlogs-pipeline.conf.yaml | 75 +++++++++++++++---- .../bandwidth_per_network_service.yaml | 3 +- .../bandwidth_per_src_dest_subnet.yaml | 3 +- .../bandwidth_per_src_subnet.yaml | 3 +- .../connection_rate_per_dest_subnet.yaml | 3 +- .../connection_rate_per_src_subnet.yaml | 3 +- .../connection_rate_per_tcp_flags.yaml | 3 +- .../connections_per_dst_as.yaml | 3 +- .../connections_per_src_as.yaml | 3 +- .../count_per_src_dest_subnet.yaml | 3 +- .../egress_bandwidth_per_dest_subnet.yaml | 3 +- .../egress_bandwidth_per_namespace.yaml | 3 +- .../geo-location_rate_per_dest.yaml | 3 +- network_definitions/mice_elephants.yaml | 6 +- .../network_services_count.yaml | 3 +- 15 files changed, 90 insertions(+), 30 deletions(-) diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 74563154a..1262da4e7 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -165,105 +165,150 @@ parameters: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_recent_op_value + filter: + key: name + value: bandwidth_network_service + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_recent_op_value + filter: + key: name + value: bandwidth_source_destination_subnet + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_recent_op_value + filter: + key: name + value: bandwidth_source_subnet + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_recent_count + filter: + key: name + value: dest_connection_subnet_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_recent_count + filter: + key: name + value: src_connection_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_recent_count + filter: + key: name + value: TCPFlags_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_recent_count + filter: + key: name + value: dst_as_connection_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_as type: counter - valuekey: src_as_connection_recent_count + filter: + key: name + value: src_as_connection_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_recent_count + filter: + key: name + value: count_source_destination_subnet + valuekey: recent_count labels: - by - aggregate buckets: [] - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_recent_op_value + filter: + key: name + value: bandwidth_destination_subnet + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_recent_op_value + filter: + key: name + value: bandwidth_namespace + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_recent_count + filter: + key: name + value: dest_connection_location_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: mice_count type: counter - valuekey: mice_count_recent_count + filter: + key: name + value: mice_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: elephant_count type: counter - valuekey: elephant_count_recent_count + filter: + key: name + value: elephant_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: service_count type: counter - valuekey: dest_service_recent_count + filter: + key: name + value: dest_service_count + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_network_service.yaml b/network_definitions/bandwidth_per_network_service.yaml index 0af3e36c0..668852c30 100644 --- a/network_definitions/bandwidth_per_network_service.yaml +++ b/network_definitions/bandwidth_per_network_service.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_recent_op_value + filter: {key: name, value: bandwidth_network_service} + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_dest_subnet.yaml b/network_definitions/bandwidth_per_src_dest_subnet.yaml index f4ea6b7df..21e93a620 100644 --- a/network_definitions/bandwidth_per_src_dest_subnet.yaml +++ b/network_definitions/bandwidth_per_src_dest_subnet.yaml @@ -34,7 +34,8 @@ encode: metrics: - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_recent_op_value + filter: {key: name, value: bandwidth_source_destination_subnet} + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_subnet.yaml b/network_definitions/bandwidth_per_src_subnet.yaml index 12d3117c9..3da3fe254 100644 --- a/network_definitions/bandwidth_per_src_subnet.yaml +++ b/network_definitions/bandwidth_per_src_subnet.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_recent_op_value + filter: {key: name, value: bandwidth_source_subnet} + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index e91955d85..27912020f 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -32,7 +32,8 @@ encode: metrics: - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_recent_count + filter: {key: name, value: dest_connection_subnet_count} + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_src_subnet.yaml b/network_definitions/connection_rate_per_src_subnet.yaml index eb11ab5d1..1812dfe62 100644 --- a/network_definitions/connection_rate_per_src_subnet.yaml +++ b/network_definitions/connection_rate_per_src_subnet.yaml @@ -27,7 +27,8 @@ encode: metrics: - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_recent_count + filter: {key: name, value: src_connection_count} + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_tcp_flags.yaml b/network_definitions/connection_rate_per_tcp_flags.yaml index a7734de7c..af6175147 100644 --- a/network_definitions/connection_rate_per_tcp_flags.yaml +++ b/network_definitions/connection_rate_per_tcp_flags.yaml @@ -21,7 +21,8 @@ encode: metrics: - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_recent_count + filter: { key: name, value: TCPFlags_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_dst_as.yaml b/network_definitions/connections_per_dst_as.yaml index 73b6644f1..870b62d64 100644 --- a/network_definitions/connections_per_dst_as.yaml +++ b/network_definitions/connections_per_dst_as.yaml @@ -22,7 +22,8 @@ encode: metrics: - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_recent_count + filter: { key: name, value: dst_as_connection_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_src_as.yaml b/network_definitions/connections_per_src_as.yaml index 2d8d7412e..39bf9acef 100644 --- a/network_definitions/connections_per_src_as.yaml +++ b/network_definitions/connections_per_src_as.yaml @@ -22,7 +22,8 @@ encode: metrics: - name: connections_per_source_as type: counter - valuekey: src_as_connection_recent_count + filter: { key: name, value: src_as_connection_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/count_per_src_dest_subnet.yaml b/network_definitions/count_per_src_dest_subnet.yaml index 67cf68dca..58111aae3 100644 --- a/network_definitions/count_per_src_dest_subnet.yaml +++ b/network_definitions/count_per_src_dest_subnet.yaml @@ -33,7 +33,8 @@ encode: metrics: - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_recent_count + filter: { key: name, value: count_source_destination_subnet } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_dest_subnet.yaml b/network_definitions/egress_bandwidth_per_dest_subnet.yaml index 58d53bba3..e8e334972 100644 --- a/network_definitions/egress_bandwidth_per_dest_subnet.yaml +++ b/network_definitions/egress_bandwidth_per_dest_subnet.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_recent_op_value + filter: { key: name, value: bandwidth_destination_subnet } + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_namespace.yaml b/network_definitions/egress_bandwidth_per_namespace.yaml index dcb0377fc..ae0ab8bef 100644 --- a/network_definitions/egress_bandwidth_per_namespace.yaml +++ b/network_definitions/egress_bandwidth_per_namespace.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_recent_op_value + filter: { key: name, value: bandwidth_namespace } + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/geo-location_rate_per_dest.yaml b/network_definitions/geo-location_rate_per_dest.yaml index db3426723..c9e2d8fff 100644 --- a/network_definitions/geo-location_rate_per_dest.yaml +++ b/network_definitions/geo-location_rate_per_dest.yaml @@ -28,7 +28,8 @@ encode: metrics: - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_recent_count + filter: { key: name, value: dest_connection_location_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/mice_elephants.yaml b/network_definitions/mice_elephants.yaml index 4907054f0..ddcdf7a47 100644 --- a/network_definitions/mice_elephants.yaml +++ b/network_definitions/mice_elephants.yaml @@ -36,13 +36,15 @@ encode: metrics: - name: mice_count type: counter - valuekey: mice_count_recent_count + filter: { key: name, value: mice_count } + valuekey: recent_count labels: - by - aggregate - name: elephant_count type: counter - valuekey: elephant_count_recent_count + filter: { key: name, value: elephant_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/network_services_count.yaml b/network_definitions/network_services_count.yaml index 87a55d6ab..d9b47f9db 100644 --- a/network_definitions/network_services_count.yaml +++ b/network_definitions/network_services_count.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: service_count type: counter - valuekey: dest_service_recent_count + filter: { key: name, value: dest_service_count } + valuekey: recent_count labels: - by - aggregate From 058b781cc1ca5161458ddf8f17fbef132fef3ac3 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 15:18:28 +0200 Subject: [PATCH 07/13] Update api.md --- docs/api.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/api.md b/docs/api.md index 14d3e81b3..71cfc3060 100644 --- a/docs/api.md +++ b/docs/api.md @@ -10,6 +10,9 @@ Following is the supported API format for prometheus encode: gauge: single numerical value that can arbitrarily go up and down counter: monotonically increasing counter whose value can only increase histogram: counts samples in configurable buckets + filter: the criterion to filter entries by + key: the key to match and filter by + value: the value to match and filter by valuekey: entry key from which to resolve metric value labels: labels to be associated with the metric buckets: histogram buckets From 84f9513c66376cf5f5963d5c3f38333b11d3e4be Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 16:43:03 +0200 Subject: [PATCH 08/13] Fix grammer --- docs/confGenerator.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/confGenerator.md b/docs/confGenerator.md index ce8258e77..860d432a7 100644 --- a/docs/confGenerator.md +++ b/docs/confGenerator.md @@ -63,11 +63,11 @@ make local-redeploy ``` > Note: Additional information on usage and deployment can be found in flowlogs-pipeline README -> Note: learning from examples and existing metric definitions is very-useful +> Note: Learning from examples and existing metric definitions is very useful. ### Network definition explained -In this section we explain how network definition are structured. This is useful for development of +In this section we explain how network definition is structured. This is useful for development of new network definitions as well as debugging and working with existing network definition. ```shell From 5e5ad8090161bdb8a3f263c8a0db1e1bd70e2c64 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 16:59:16 +0200 Subject: [PATCH 09/13] Update docs --- docs/confGenerator.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/confGenerator.md b/docs/confGenerator.md index 860d432a7..0d09a0f3f 100644 --- a/docs/confGenerator.md +++ b/docs/confGenerator.md @@ -101,8 +101,9 @@ encode: (9) metrics: - name: metricName (9.2) type: metricType (9.3) - valuekey: aggregate_name_value (9.4) - labels: (9.5) + filter: {key: myKey, value: myValue} (9.4) + valuekey: value (9.5) + labels: (9.6) - by - aggregate visualization: (10) @@ -133,12 +134,15 @@ this actually moves the data from being log lines into being a metric named (8.2 > For additional details on `extract aggregates` > refer to [README.md](../README.md#aggregates). -(9) Next, the metrics from (8.2) are sent to prometheus (9.1). Make sure that (9.4) value is -set to the metric name from (8.2) with suffix `_value`. +(9) Next, the metrics from (8.2) are sent to prometheus (9.1). The metric name in prometheus will be called as the value of (9.2) with the prefix from the `config.yaml` file. The type of the prometheus metric will be (9.3) (e.g. gauge). -Prometheus will add labels to the metric based on the (9.5) fields. +The filter field (9.4) determines which aggregates will take into account. +The key should be `"name"` and the value should match the aggregate name (8.2) +The value to be used by prometheus is taken from the field defined in (9.5). +For `Gauges`, use `value` and `Counters`, use `recent_op_value`. +Prometheus will add labels to the metric based on the (9.6) fields. (10) next, using grafana to visualize the metric with name from (9.2) including the prefix and using the prometheus expression from (10.1). From ee9acdaded0fb9eff3b983cbdac47e0e5b45875c Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 15 Mar 2022 17:10:15 +0200 Subject: [PATCH 10/13] Make linter happy --- pkg/confgen/encode.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/confgen/encode.go b/pkg/confgen/encode.go index f2c36fb06..b32059b43 100644 --- a/pkg/confgen/encode.go +++ b/pkg/confgen/encode.go @@ -40,7 +40,7 @@ func (cg *ConfGen) parseEncode(encode *map[string]interface{}) (*api.PromEncode, log.Debugf("Unmarshal aggregate.Definitions err: %v ", err) return nil, err } - + cg.promMetrics = append(cg.promMetrics, prom.Metrics...) return &prom, nil } From a0366ed6feebe7030d4993a2e92adda676260813 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 20 Mar 2022 17:37:43 +0200 Subject: [PATCH 11/13] fixup --- pkg/test/utils.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index beac13c49..8b8dc5878 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -118,17 +118,16 @@ func GetExtractMockEntry() config.GenericMap { func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { valueString := fmt.Sprintf("%f", value) return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "total_value": valueString, - fmt.Sprintf("%v_total_value", name): valueString, - "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", count), - "recent_op_value": recentOpValue, - "recent_count": recentCount, + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + "recentRawValues": rrv, + "total_count": fmt.Sprintf("%v", count), + "recent_op_value": recentOpValue, + "recent_count": recentCount, } } From 9ad430c11f3d5bdf7ad998fefe25f911b5bffd67 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Sun, 20 Mar 2022 17:49:40 +0200 Subject: [PATCH 12/13] Address Eran's comment: extract key&value to sub-struct --- pkg/pipeline/encode/encode_prom.go | 28 +++++++++++++++---------- pkg/pipeline/encode/encode_prom_test.go | 10 +++++---- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 3471fb470..f3ab97797 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -43,11 +43,15 @@ type PromMetric struct { promHist *prometheus.HistogramVec } +type keyValuePair struct { + key string + value string +} + type metricInfo struct { - input string - filterKey string - filterValue string - labelNames []string + input string + filter keyValuePair + labelNames []string PromMetric } @@ -104,8 +108,8 @@ func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener // TODO: We may need different handling for histograms out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { - val, keyFound := metricRecord[mInfo.filterKey] - shouldKeepRecord := keyFound && val == mInfo.filterValue + val, keyFound := metricRecord[mInfo.filter.key] + shouldKeepRecord := keyFound && val == mInfo.filter.value if !shouldKeepRecord { continue } @@ -310,11 +314,13 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { continue } metrics[mInfo.Name] = metricInfo{ - input: mInfo.ValueKey, - filterKey: mInfo.Filter.Key, - filterValue: mInfo.Filter.Value, - labelNames: labels, - PromMetric: pMetric, + input: mInfo.ValueKey, + filter: keyValuePair{ + key: mInfo.Filter.Key, + value: mInfo.Filter.Value, + }, + labelNames: labels, + PromMetric: pMetric, } } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 6e24bba80..f25d786e3 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -163,10 +163,12 @@ func Test_EncodeAggregate(t *testing.T) { prefix: "test_", metrics: map[string]metricInfo{ "gauge": { - input: "test_aggregate_value", - filterKey: "name", - filterValue: "test_aggregate", - labelNames: []string{"by", "aggregate"}, + input: "test_aggregate_value", + filter: keyValuePair{ + key: "name", + value: "test_aggregate", + }, + labelNames: []string{"by", "aggregate"}, }, }, mList: list.New(), From c97cb0ca84f9187d16ee051ebee24dd47546ad62 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 22 Mar 2022 11:56:52 +0200 Subject: [PATCH 13/13] Update docs --- docs/confGenerator.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/confGenerator.md b/docs/confGenerator.md index 0d09a0f3f..eb38b6815 100644 --- a/docs/confGenerator.md +++ b/docs/confGenerator.md @@ -137,11 +137,11 @@ this actually moves the data from being log lines into being a metric named (8.2 (9) Next, the metrics from (8.2) are sent to prometheus (9.1). The metric name in prometheus will be called as the value of (9.2) with the prefix from the `config.yaml` file. -The type of the prometheus metric will be (9.3) (e.g. gauge). +The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram). The filter field (9.4) determines which aggregates will take into account. The key should be `"name"` and the value should match the aggregate name (8.2) The value to be used by prometheus is taken from the field defined in (9.5). -For `Gauges`, use `value` and `Counters`, use `recent_op_value`. +For `Gauges`, use `total_value` or `total_count`. For `Counters`, use `recent_op_value` or `recent_count`. Prometheus will add labels to the metric based on the (9.6) fields. (10) next, using grafana to visualize the metric with name from (9.2) including the