diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 74563154a..1262da4e7 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -165,105 +165,150 @@ parameters: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_recent_op_value + filter: + key: name + value: bandwidth_network_service + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_recent_op_value + filter: + key: name + value: bandwidth_source_destination_subnet + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_recent_op_value + filter: + key: name + value: bandwidth_source_subnet + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_recent_count + filter: + key: name + value: dest_connection_subnet_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_recent_count + filter: + key: name + value: src_connection_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_recent_count + filter: + key: name + value: TCPFlags_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_recent_count + filter: + key: name + value: dst_as_connection_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: connections_per_source_as type: counter - valuekey: src_as_connection_recent_count + filter: + key: name + value: src_as_connection_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_recent_count + filter: + key: name + value: count_source_destination_subnet + valuekey: recent_count labels: - by - aggregate buckets: [] - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_recent_op_value + filter: + key: name + value: bandwidth_destination_subnet + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_recent_op_value + filter: + key: name + value: bandwidth_namespace + valuekey: recent_op_value labels: - by - aggregate buckets: [] - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_recent_count + filter: + key: name + value: dest_connection_location_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: mice_count type: counter - valuekey: mice_count_recent_count + filter: + key: name + value: mice_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: elephant_count type: counter - valuekey: elephant_count_recent_count + filter: + key: name + value: elephant_count + valuekey: recent_count labels: - by - aggregate buckets: [] - name: service_count type: counter - valuekey: dest_service_recent_count + filter: + key: name + value: dest_service_count + valuekey: recent_count labels: - by - aggregate diff --git a/docs/api.md b/docs/api.md index 14d3e81b3..71cfc3060 100644 --- a/docs/api.md +++ b/docs/api.md @@ -10,6 +10,9 @@ Following is the supported API format for prometheus encode: gauge: single numerical value that can arbitrarily go up and down counter: monotonically increasing counter whose value can only increase histogram: counts samples in configurable buckets + filter: the criterion to filter entries by + key: the key to match and filter by + value: the value to match and filter by valuekey: entry key from which to resolve metric value labels: labels to be associated with the metric buckets: histogram buckets diff --git a/docs/confGenerator.md b/docs/confGenerator.md index ce8258e77..eb38b6815 100644 --- a/docs/confGenerator.md +++ b/docs/confGenerator.md @@ -63,11 +63,11 @@ make local-redeploy ``` > Note: Additional information on usage and deployment can be found in flowlogs-pipeline README -> Note: learning from examples and existing metric definitions is very-useful +> Note: Learning from examples and existing metric definitions is very useful. ### Network definition explained -In this section we explain how network definition are structured. This is useful for development of +In this section we explain how network definition is structured. This is useful for development of new network definitions as well as debugging and working with existing network definition. ```shell @@ -101,8 +101,9 @@ encode: (9) metrics: - name: metricName (9.2) type: metricType (9.3) - valuekey: aggregate_name_value (9.4) - labels: (9.5) + filter: {key: myKey, value: myValue} (9.4) + valuekey: value (9.5) + labels: (9.6) - by - aggregate visualization: (10) @@ -133,12 +134,15 @@ this actually moves the data from being log lines into being a metric named (8.2 > For additional details on `extract aggregates` > refer to [README.md](../README.md#aggregates). -(9) Next, the metrics from (8.2) are sent to prometheus (9.1). Make sure that (9.4) value is -set to the metric name from (8.2) with suffix `_value`. +(9) Next, the metrics from (8.2) are sent to prometheus (9.1). The metric name in prometheus will be called as the value of (9.2) with the prefix from the `config.yaml` file. -The type of the prometheus metric will be (9.3) (e.g. gauge). -Prometheus will add labels to the metric based on the (9.5) fields. +The type of the prometheus metric will be (9.3) (e.g. gauge, counter or histogram). +The filter field (9.4) determines which aggregates will take into account. +The key should be `"name"` and the value should match the aggregate name (8.2) +The value to be used by prometheus is taken from the field defined in (9.5). +For `Gauges`, use `total_value` or `total_count`. For `Counters`, use `recent_op_value` or `recent_count`. +Prometheus will add labels to the metric based on the (9.6) fields. (10) next, using grafana to visualize the metric with name from (9.2) including the prefix and using the prometheus expression from (10.1). diff --git a/network_definitions/bandwidth_per_network_service.yaml b/network_definitions/bandwidth_per_network_service.yaml index 0af3e36c0..668852c30 100644 --- a/network_definitions/bandwidth_per_network_service.yaml +++ b/network_definitions/bandwidth_per_network_service.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: bandwidth_per_network_service type: counter - valuekey: bandwidth_network_service_recent_op_value + filter: {key: name, value: bandwidth_network_service} + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_dest_subnet.yaml b/network_definitions/bandwidth_per_src_dest_subnet.yaml index f4ea6b7df..21e93a620 100644 --- a/network_definitions/bandwidth_per_src_dest_subnet.yaml +++ b/network_definitions/bandwidth_per_src_dest_subnet.yaml @@ -34,7 +34,8 @@ encode: metrics: - name: bandwidth_per_source_destination_subnet type: counter - valuekey: bandwidth_source_destination_subnet_recent_op_value + filter: {key: name, value: bandwidth_source_destination_subnet} + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/bandwidth_per_src_subnet.yaml b/network_definitions/bandwidth_per_src_subnet.yaml index 12d3117c9..3da3fe254 100644 --- a/network_definitions/bandwidth_per_src_subnet.yaml +++ b/network_definitions/bandwidth_per_src_subnet.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: bandwidth_per_source_subnet type: counter - valuekey: bandwidth_source_subnet_recent_op_value + filter: {key: name, value: bandwidth_source_subnet} + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_dest_subnet.yaml b/network_definitions/connection_rate_per_dest_subnet.yaml index e91955d85..27912020f 100644 --- a/network_definitions/connection_rate_per_dest_subnet.yaml +++ b/network_definitions/connection_rate_per_dest_subnet.yaml @@ -32,7 +32,8 @@ encode: metrics: - name: connections_per_destination_subnet type: counter - valuekey: dest_connection_subnet_recent_count + filter: {key: name, value: dest_connection_subnet_count} + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_src_subnet.yaml b/network_definitions/connection_rate_per_src_subnet.yaml index eb11ab5d1..1812dfe62 100644 --- a/network_definitions/connection_rate_per_src_subnet.yaml +++ b/network_definitions/connection_rate_per_src_subnet.yaml @@ -27,7 +27,8 @@ encode: metrics: - name: connections_per_source_subnet type: counter - valuekey: src_connection_count_recent_count + filter: {key: name, value: src_connection_count} + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connection_rate_per_tcp_flags.yaml b/network_definitions/connection_rate_per_tcp_flags.yaml index a7734de7c..af6175147 100644 --- a/network_definitions/connection_rate_per_tcp_flags.yaml +++ b/network_definitions/connection_rate_per_tcp_flags.yaml @@ -21,7 +21,8 @@ encode: metrics: - name: connections_per_tcp_flags type: counter - valuekey: TCPFlags_recent_count + filter: { key: name, value: TCPFlags_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_dst_as.yaml b/network_definitions/connections_per_dst_as.yaml index 73b6644f1..870b62d64 100644 --- a/network_definitions/connections_per_dst_as.yaml +++ b/network_definitions/connections_per_dst_as.yaml @@ -22,7 +22,8 @@ encode: metrics: - name: connections_per_destination_as type: counter - valuekey: dst_as_connection_recent_count + filter: { key: name, value: dst_as_connection_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/connections_per_src_as.yaml b/network_definitions/connections_per_src_as.yaml index 2d8d7412e..39bf9acef 100644 --- a/network_definitions/connections_per_src_as.yaml +++ b/network_definitions/connections_per_src_as.yaml @@ -22,7 +22,8 @@ encode: metrics: - name: connections_per_source_as type: counter - valuekey: src_as_connection_recent_count + filter: { key: name, value: src_as_connection_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/count_per_src_dest_subnet.yaml b/network_definitions/count_per_src_dest_subnet.yaml index 67cf68dca..58111aae3 100644 --- a/network_definitions/count_per_src_dest_subnet.yaml +++ b/network_definitions/count_per_src_dest_subnet.yaml @@ -33,7 +33,8 @@ encode: metrics: - name: count_per_source_destination_subnet type: counter - valuekey: count_source_destination_subnet_recent_count + filter: { key: name, value: count_source_destination_subnet } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_dest_subnet.yaml b/network_definitions/egress_bandwidth_per_dest_subnet.yaml index 58d53bba3..e8e334972 100644 --- a/network_definitions/egress_bandwidth_per_dest_subnet.yaml +++ b/network_definitions/egress_bandwidth_per_dest_subnet.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: egress_per_destination_subnet type: counter - valuekey: bandwidth_destination_subnet_recent_op_value + filter: { key: name, value: bandwidth_destination_subnet } + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/egress_bandwidth_per_namespace.yaml b/network_definitions/egress_bandwidth_per_namespace.yaml index dcb0377fc..ae0ab8bef 100644 --- a/network_definitions/egress_bandwidth_per_namespace.yaml +++ b/network_definitions/egress_bandwidth_per_namespace.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: egress_per_namespace type: counter - valuekey: bandwidth_namespace_recent_op_value + filter: { key: name, value: bandwidth_namespace } + valuekey: recent_op_value labels: - by - aggregate diff --git a/network_definitions/geo-location_rate_per_dest.yaml b/network_definitions/geo-location_rate_per_dest.yaml index db3426723..c9e2d8fff 100644 --- a/network_definitions/geo-location_rate_per_dest.yaml +++ b/network_definitions/geo-location_rate_per_dest.yaml @@ -28,7 +28,8 @@ encode: metrics: - name: connections_per_destination_location type: counter - valuekey: dest_connection_location_recent_count + filter: { key: name, value: dest_connection_location_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/mice_elephants.yaml b/network_definitions/mice_elephants.yaml index 4907054f0..ddcdf7a47 100644 --- a/network_definitions/mice_elephants.yaml +++ b/network_definitions/mice_elephants.yaml @@ -36,13 +36,15 @@ encode: metrics: - name: mice_count type: counter - valuekey: mice_count_recent_count + filter: { key: name, value: mice_count } + valuekey: recent_count labels: - by - aggregate - name: elephant_count type: counter - valuekey: elephant_count_recent_count + filter: { key: name, value: elephant_count } + valuekey: recent_count labels: - by - aggregate diff --git a/network_definitions/network_services_count.yaml b/network_definitions/network_services_count.yaml index 87a55d6ab..d9b47f9db 100644 --- a/network_definitions/network_services_count.yaml +++ b/network_definitions/network_services_count.yaml @@ -29,7 +29,8 @@ encode: metrics: - name: service_count type: counter - valuekey: dest_service_recent_count + filter: { key: name, value: dest_service_count } + valuekey: recent_count labels: - by - aggregate diff --git a/pkg/api/encode_prom.go b/pkg/api/encode_prom.go index 57670ea17..37c1a2fc8 100644 --- a/pkg/api/encode_prom.go +++ b/pkg/api/encode_prom.go @@ -35,11 +35,17 @@ func PromEncodeOperationName(operation string) string { } type PromMetricsItem struct { - Name string `yaml:"name" doc:"the metric name"` - Type string `yaml:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` - ValueKey string `yaml:"valuekey" doc:"entry key from which to resolve metric value"` - Labels []string `yaml:"labels" doc:"labels to be associated with the metric"` - Buckets []float64 `yaml:"buckets" doc:"histogram buckets"` + Name string `yaml:"name" doc:"the metric name"` + Type string `yaml:"type" enum:"PromEncodeOperationEnum" doc:"one of the following:"` + Filter PromMetricsFilter `yaml:"filter" doc:"the criterion to filter entries by"` + ValueKey string `yaml:"valuekey" doc:"entry key from which to resolve metric value"` + Labels []string `yaml:"labels" doc:"labels to be associated with the metric"` + Buckets []float64 `yaml:"buckets" doc:"histogram buckets"` } type PromMetricsItems []PromMetricsItem + +type PromMetricsFilter struct { + Key string `yaml:"key" doc:"the key to match and filter by"` + Value string `yaml:"value" doc:"the value to match and filter by"` +} diff --git a/pkg/confgen/encode.go b/pkg/confgen/encode.go index 33e6fd09b..b32059b43 100644 --- a/pkg/confgen/encode.go +++ b/pkg/confgen/encode.go @@ -41,16 +41,6 @@ func (cg *ConfGen) parseEncode(encode *map[string]interface{}) (*api.PromEncode, return nil, err } - // validate that ValueKey is unique - for _, existingPromMetric := range cg.promMetrics { - for _, newPromMetric := range prom.Metrics { - if existingPromMetric.ValueKey == newPromMetric.ValueKey { - log.Panicf("error in parseEncode: ValueKey %s overlaps, metric encoding ignored !!!! ", newPromMetric.ValueKey) - return nil, err - } - } - } - cg.promMetrics = append(cg.promMetrics, prom.Metrics...) return &prom, nil } diff --git a/pkg/pipeline/aggregate_prom_test.go b/pkg/pipeline/aggregate_prom_test.go index 2dc0eddb3..bb3d68af9 100644 --- a/pkg/pipeline/aggregate_prom_test.go +++ b/pkg/pipeline/aggregate_prom_test.go @@ -72,13 +72,15 @@ parameters: metrics: - name: flow_count type: counter - valuekey: bandwidth_count_recent_count + filter: {key: name, value: bandwidth_count} + valuekey: recent_count labels: - service - name: bytes_sum type: counter - valuekey: bandwidth_sum_recent_op_value + filter: {key: name, value: bandwidth_sum} + valuekey: recent_op_value labels: - service diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index a35e87e21..f3ab97797 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -43,8 +43,14 @@ type PromMetric struct { promHist *prometheus.HistogramVec } +type keyValuePair struct { + key string + value string +} + type metricInfo struct { input string + filter keyValuePair labelNames []string PromMetric } @@ -97,14 +103,20 @@ func (e *encodeProm) Encode(metrics []config.GenericMap) []config.GenericMap { return out } -func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap { - log.Debugf("entering EncodeMetric metric = %v", metric) +func (e *encodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap { + log.Debugf("entering EncodeMetric. metricRecord = %v", metricRecord) // TODO: We may need different handling for histograms out := make([]config.GenericMap, 0) for metricName, mInfo := range e.metrics { - metricValue, ok := metric[mInfo.input] + val, keyFound := metricRecord[mInfo.filter.key] + shouldKeepRecord := keyFound && val == mInfo.filter.value + if !shouldKeepRecord { + continue + } + + metricValue, ok := metricRecord[mInfo.input] if !ok { - log.Debugf("field %v is missing", metricName) + log.Errorf("field %v is missing", mInfo.input) continue } metricValueString := fmt.Sprintf("%v", metricValue) @@ -116,7 +128,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap log.Debugf("metricName = %v, metricValue = %v, valueFloat = %v", metricName, metricValue, valueFloat) entryLabels := make(map[string]string, len(mInfo.labelNames)) for _, t := range mInfo.labelNames { - entryLabels[t] = fmt.Sprintf("%v", metric[t]) + entryLabels[t] = fmt.Sprintf("%v", metricRecord[t]) } entry := entryInfo{ eInfo: entrySignature{ @@ -135,7 +147,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap cEntry := e.saveEntryInCache(entry, entryLabels) cEntry.PromMetric.metricType = mInfo.PromMetric.metricType - // push the metric to prometheus + // push the metric record to prometheus switch mInfo.PromMetric.metricType { case api.PromEncodeOperationName("Gauge"): mInfo.promGauge.With(entryLabels).Set(valueFloat) @@ -144,7 +156,7 @@ func (e *encodeProm) EncodeMetric(metric config.GenericMap) []config.GenericMap mInfo.promCounter.With(entryLabels).Add(valueFloat) cEntry.PromMetric.promCounter = mInfo.promCounter case api.PromEncodeOperationName("Histogram"): - for _, v := range metric["recentRawValues"].([]float64) { + for _, v := range metricRecord["recentRawValues"].([]float64) { mInfo.promHist.With(entryLabels).Observe(v) } cEntry.PromMetric.promHist = mInfo.promHist @@ -302,7 +314,11 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { continue } metrics[mInfo.Name] = metricInfo{ - input: mInfo.ValueKey, + input: mInfo.ValueKey, + filter: keyValuePair{ + key: mInfo.Filter.Key, + value: mInfo.Filter.Value, + }, labelNames: labels, PromMetric: pMetric, } diff --git a/pkg/pipeline/encode/encode_prom_test.go b/pkg/pipeline/encode/encode_prom_test.go index 43094f3bc..f25d786e3 100644 --- a/pkg/pipeline/encode/encode_prom_test.go +++ b/pkg/pipeline/encode/encode_prom_test.go @@ -44,6 +44,7 @@ parameters: metrics: - name: Bytes type: gauge + filter: {key: dstAddr, value: 10.1.2.4} valuekey: bytes labels: - srcAddr @@ -51,6 +52,7 @@ parameters: - srcPort - name: Packets type: counter + filter: {key: dstAddr, value: 10.1.2.4} valuekey: packets labels: - srcAddr @@ -161,7 +163,11 @@ func Test_EncodeAggregate(t *testing.T) { prefix: "test_", metrics: map[string]metricInfo{ "gauge": { - input: "test_aggregate_value", + input: "test_aggregate_value", + filter: keyValuePair{ + key: "name", + value: "test_aggregate", + }, labelNames: []string{"by", "aggregate"}, }, }, diff --git a/pkg/pipeline/extract/aggregate/aggregate.go b/pkg/pipeline/extract/aggregate/aggregate.go index d3fc34b7e..6cc32f524 100644 --- a/pkg/pipeline/extract/aggregate/aggregate.go +++ b/pkg/pipeline/extract/aggregate/aggregate.go @@ -183,7 +183,6 @@ func (aggregate Aggregate) Evaluate(entries []config.GenericMap) error { func (aggregate Aggregate) GetMetrics() []config.GenericMap { var metrics []config.GenericMap for _, group := range aggregate.Groups { - // TODO: remove prefixes when filtering is implemented in prom encode. metrics = append(metrics, config.GenericMap{ "name": aggregate.Definition.Name, "operation": aggregate.Definition.Operation, @@ -193,10 +192,9 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap { "total_value": fmt.Sprintf("%f", group.totalValue), "recentRawValues": group.recentRawValues, "total_count": fmt.Sprintf("%d", group.totalCount), - aggregate.Definition.Name + "_recent_op_value": group.recentOpValue, - aggregate.Definition.Name + "_recent_count": group.recentCount, - aggregate.Definition.Name + "_total_value": fmt.Sprintf("%f", group.totalValue), - strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), + "recent_op_value": group.recentOpValue, + "recent_count": group.recentCount, + strings.Join(aggregate.Definition.By, "_"): string(group.normalizedValues), }) // Once reported, we reset the recentXXX fields group.recentRawValues = make([]float64, 0) diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 96108651e..8b8dc5878 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -118,17 +118,16 @@ func GetExtractMockEntry() config.GenericMap { func CreateMockAgg(name, recordKey, by, agg, op string, value float64, count int, rrv []float64, recentOpValue float64, recentCount int) config.GenericMap { valueString := fmt.Sprintf("%f", value) return config.GenericMap{ - "name": name, - "record_key": recordKey, - "by": by, - "aggregate": agg, - by: agg, - "operation": api.AggregateOperation(op), - "total_value": valueString, - fmt.Sprintf("%v_total_value", name): valueString, - "recentRawValues": rrv, - "total_count": fmt.Sprintf("%v", count), - name + "_recent_op_value": recentOpValue, - name + "_recent_count": recentCount, + "name": name, + "record_key": recordKey, + "by": by, + "aggregate": agg, + by: agg, + "operation": api.AggregateOperation(op), + "total_value": valueString, + "recentRawValues": rrv, + "total_count": fmt.Sprintf("%v", count), + "recent_op_value": recentOpValue, + "recent_count": recentCount, } }