From 0d7c266237d68b17fee378a8d14aa36f1b0c8c8b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 14 Nov 2022 22:40:08 -0800 Subject: [PATCH] Support vertical sharding for label_join and label_replace functions (#5889) * support vertical sharding for label_join and label_replace functions Signed-off-by: Ben Ye * update changelog Signed-off-by: Ben Ye * fix doc Signed-off-by: Ben Ye Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/querysharding/analyzer.go | 76 +++++++++++++++++++++--------- pkg/querysharding/analyzer_test.go | 65 ++++++++++++++++++++----- 3 files changed, 108 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6741eb6e84..aa5172239e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt. - [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart. - [#5865](https://github.com/thanos-io/thanos/pull/5865) Compact: Retry on sync metas error. +- [#5889](https://github.com/thanos-io/thanos/pull/5889) Query Frontend: Support sharding vertical sharding `label_replace` and `label_join` functions. ### Changed diff --git a/pkg/querysharding/analyzer.go b/pkg/querysharding/analyzer.go index ff7c1d04f2..dc8b16ca72 100644 --- a/pkg/querysharding/analyzer.go +++ b/pkg/querysharding/analyzer.go @@ -1,23 +1,33 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package querysharding import ( - "fmt" - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" ) -// QueryAnalyzer is an analyzer which determines -// whether a PromQL Query is shardable and using which labels. - type Analyzer interface { Analyze(string) (QueryAnalysis, error) } +// QueryAnalyzer is an analyzer which determines +// whether a PromQL Query is shardable and using which labels. type QueryAnalyzer struct{} type CachedQueryAnalyzer struct { @@ -25,11 +35,6 @@ type CachedQueryAnalyzer struct { cache *lru.Cache } -var nonShardableFuncs = []string{ - "label_join", - "label_replace", -} - // NewQueryAnalyzer creates a new QueryAnalyzer. func NewQueryAnalyzer() *CachedQueryAnalyzer { // Ignore the error check since it throws error @@ -80,14 +85,18 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { return nonShardableQuery(), err } - isShardable := true - var analysis QueryAnalysis + var ( + analysis QueryAnalysis + dynamicLabels []string + ) parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { switch n := node.(type) { case *parser.Call: - if n.Func != nil && contains(n.Func.Name, nonShardableFuncs) { - isShardable = false - return fmt.Errorf("expressions with %s are not shardable", n.Func.Name) + if n.Func != nil { + if n.Func.Name == "label_join" || n.Func.Name == "label_replace" { + dstLabel := stringFromArg(n.Args[1]) + dynamicLabels = append(dynamicLabels, dstLabel) + } } case *parser.BinaryExpr: if n.VectorMatching != nil { @@ -108,19 +117,42 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { return nil }) - if !isShardable { - return nonShardableQuery(), nil + // If currently it is shard by, it is still shardable if there is + // any label left after removing the dynamic labels. + // If currently it is shard without, it is still shardable if we + // shard without the union of the labels. + // TODO(yeya24): we can still make dynamic labels shardable if we push + // down the label_replace and label_join computation to the store level. + if len(dynamicLabels) > 0 { + analysis = analysis.scopeToLabels(dynamicLabels, false) } return analysis, nil } -func contains(needle string, haystack []string) bool { - for _, item := range haystack { - if needle == item { - return true +// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/functions.go#L1416. +func stringFromArg(e parser.Expr) string { + tmp := unwrapStepInvariantExpr(e) // Unwrap StepInvariant + unwrapParenExpr(&tmp) // Optionally unwrap ParenExpr + return tmp.(*parser.StringLiteral).Val +} + +// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2642. +// unwrapParenExpr does the AST equivalent of removing parentheses around a expression. +func unwrapParenExpr(e *parser.Expr) { + for { + if p, ok := (*e).(*parser.ParenExpr); ok { + *e = p.Expr + } else { + break } } +} - return false +// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2652. +func unwrapStepInvariantExpr(e parser.Expr) parser.Expr { + if p, ok := e.(*parser.StepInvariantExpr); ok { + return p.Expr + } + return e } diff --git a/pkg/querysharding/analyzer_test.go b/pkg/querysharding/analyzer_test.go index fe14f2f58a..b13e805667 100644 --- a/pkg/querysharding/analyzer_test.go +++ b/pkg/querysharding/analyzer_test.go @@ -32,14 +32,6 @@ func TestAnalyzeQuery(t *testing.T) { name: "outer aggregation with without grouping", expression: "count(sum without (pod) (http_requests_total))", }, - { - name: "aggregate expression with label_replace", - expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, - }, - { - name: "aggregate without expression with label_replace", - expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, - }, { name: "binary expression", expression: `http_requests_total{code="400"} / http_requests_total`, @@ -56,10 +48,6 @@ func TestAnalyzeQuery(t *testing.T) { name: "binary aggregation with different grouping labels", expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (cluster) (http_requests_total)`, }, - { - name: "binary expression with vector matching and label_replace", - expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, - }, { name: "multiple binary expressions", expression: `(http_requests_total{code="400"} + http_requests_total{code="500"}) / http_requests_total`, @@ -71,6 +59,14 @@ func TestAnalyzeQuery(t *testing.T) { / on () http_requests_total`, }, + { + name: "aggregate by expression with label_replace, sharding label is dynamic", + expression: `sum by (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + }, + { + name: "aggregate by expression with label_join, sharding label is dynamic", + expression: `sum by (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`, + }, } shardableByLabels := []testCase{ @@ -142,6 +138,36 @@ sum by (container) ( expression: `sum(rate(node_cpu_seconds_total[3h])) by (cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[3h])) by (cluster_id)`, shardingLabels: []string{"cluster_id"}, }, + { + name: "aggregate by expression with label_replace, sharding label is not dynamic", + expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + shardingLabels: []string{"pod"}, + }, + { + name: "aggregate by expression with label_join, sharding label is not dynamic", + expression: `sum by (pod) (label_join(metric, "dst_label", ",", "src_label"))`, + shardingLabels: []string{"pod"}, + }, + { + name: "label_join and aggregation on multiple labels. Can be sharded by the static one", + expression: `sum by (pod, dst_label) (label_join(metric, "dst_label", ",", "src_label"))`, + shardingLabels: []string{"pod"}, + }, + { + name: "binary expression with vector matching and label_replace", + expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, + shardingLabels: []string{"pod"}, + }, + { + name: "nested label joins", + expression: `label_join(sum by (pod) (label_join(metric, "dst_label", ",", "src_label")), "dst_label1", ",", "dst_label")`, + shardingLabels: []string{"pod"}, + }, + { + name: "complex query with label_replace, binary expr and aggregations on dynamic label", + expression: `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[1d:5m])) by (instance, cluster) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[1d:5m])) by (node, cluster), "instance", "$1", "node", "(.*)")) by (instance, cluster)`, + shardingLabels: []string{"cluster"}, + }, } shardableWithoutLabels := []testCase{ @@ -178,6 +204,21 @@ http_requests_total`, expression: "histogram_quantile(0.95, sum(rate(metric[1m])) without (le, cluster))", shardingLabels: []string{"cluster"}, }, + { + name: "aggregate without expression with label_replace, sharding label is not dynamic", + expression: `sum without (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + shardingLabels: []string{"dst_label"}, + }, + { + name: "aggregate without expression with label_join, sharding label is not dynamic", + expression: `sum without (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`, + shardingLabels: []string{"dst_label"}, + }, + { + name: "aggregate without expression with label_replace", + expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + shardingLabels: []string{"pod", "dst_label"}, + }, } for _, test := range nonShardable {