Skip to content

Commit

Permalink
Support vertical sharding for label_join and label_replace functions (t…
Browse files Browse the repository at this point in the history
…hanos-io#5889)

* support vertical sharding for label_join and label_replace functions

Signed-off-by: Ben Ye <benye@amazon.com>

* update changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* fix doc

Signed-off-by: Ben Ye <benye@amazon.com>

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored and Nathaniel Graham committed May 18, 2023
1 parent 0fe9f54 commit 0d7c266
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
- [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart.
- [#5865](https://github.com/thanos-io/thanos/pull/5865) Compact: Retry on sync metas error.
- [#5889](https://github.com/thanos-io/thanos/pull/5889) Query Frontend: Support sharding vertical sharding `label_replace` and `label_join` functions.

### Changed

Expand Down
76 changes: 54 additions & 22 deletions pkg/querysharding/analyzer.go
@@ -1,35 +1,40 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package querysharding

import (
"fmt"

lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
)

// QueryAnalyzer is an analyzer which determines
// whether a PromQL Query is shardable and using which labels.

type Analyzer interface {
Analyze(string) (QueryAnalysis, error)
}

// QueryAnalyzer is an analyzer which determines
// whether a PromQL Query is shardable and using which labels.
type QueryAnalyzer struct{}

type CachedQueryAnalyzer struct {
analyzer *QueryAnalyzer
cache *lru.Cache
}

var nonShardableFuncs = []string{
"label_join",
"label_replace",
}

// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() *CachedQueryAnalyzer {
// Ignore the error check since it throws error
Expand Down Expand Up @@ -80,14 +85,18 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nonShardableQuery(), err
}

isShardable := true
var analysis QueryAnalysis
var (
analysis QueryAnalysis
dynamicLabels []string
)
parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error {
switch n := node.(type) {
case *parser.Call:
if n.Func != nil && contains(n.Func.Name, nonShardableFuncs) {
isShardable = false
return fmt.Errorf("expressions with %s are not shardable", n.Func.Name)
if n.Func != nil {
if n.Func.Name == "label_join" || n.Func.Name == "label_replace" {
dstLabel := stringFromArg(n.Args[1])
dynamicLabels = append(dynamicLabels, dstLabel)
}
}
case *parser.BinaryExpr:
if n.VectorMatching != nil {
Expand All @@ -108,19 +117,42 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nil
})

if !isShardable {
return nonShardableQuery(), nil
// If currently it is shard by, it is still shardable if there is
// any label left after removing the dynamic labels.
// If currently it is shard without, it is still shardable if we
// shard without the union of the labels.
// TODO(yeya24): we can still make dynamic labels shardable if we push
// down the label_replace and label_join computation to the store level.
if len(dynamicLabels) > 0 {
analysis = analysis.scopeToLabels(dynamicLabels, false)
}

return analysis, nil
}

func contains(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
return true
// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/functions.go#L1416.
func stringFromArg(e parser.Expr) string {
tmp := unwrapStepInvariantExpr(e) // Unwrap StepInvariant
unwrapParenExpr(&tmp) // Optionally unwrap ParenExpr
return tmp.(*parser.StringLiteral).Val
}

// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2642.
// unwrapParenExpr does the AST equivalent of removing parentheses around a expression.
func unwrapParenExpr(e *parser.Expr) {
for {
if p, ok := (*e).(*parser.ParenExpr); ok {
*e = p.Expr
} else {
break
}
}
}

return false
// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2652.
func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
if p, ok := e.(*parser.StepInvariantExpr); ok {
return p.Expr
}
return e
}
65 changes: 53 additions & 12 deletions pkg/querysharding/analyzer_test.go
Expand Up @@ -32,14 +32,6 @@ func TestAnalyzeQuery(t *testing.T) {
name: "outer aggregation with without grouping",
expression: "count(sum without (pod) (http_requests_total))",
},
{
name: "aggregate expression with label_replace",
expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
},
{
name: "aggregate without expression with label_replace",
expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
},
{
name: "binary expression",
expression: `http_requests_total{code="400"} / http_requests_total`,
Expand All @@ -56,10 +48,6 @@ func TestAnalyzeQuery(t *testing.T) {
name: "binary aggregation with different grouping labels",
expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (cluster) (http_requests_total)`,
},
{
name: "binary expression with vector matching and label_replace",
expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`,
},
{
name: "multiple binary expressions",
expression: `(http_requests_total{code="400"} + http_requests_total{code="500"}) / http_requests_total`,
Expand All @@ -71,6 +59,14 @@ func TestAnalyzeQuery(t *testing.T) {
/ on ()
http_requests_total`,
},
{
name: "aggregate by expression with label_replace, sharding label is dynamic",
expression: `sum by (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
},
{
name: "aggregate by expression with label_join, sharding label is dynamic",
expression: `sum by (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`,
},
}

shardableByLabels := []testCase{
Expand Down Expand Up @@ -142,6 +138,36 @@ sum by (container) (
expression: `sum(rate(node_cpu_seconds_total[3h])) by (cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[3h])) by (cluster_id)`,
shardingLabels: []string{"cluster_id"},
},
{
name: "aggregate by expression with label_replace, sharding label is not dynamic",
expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
shardingLabels: []string{"pod"},
},
{
name: "aggregate by expression with label_join, sharding label is not dynamic",
expression: `sum by (pod) (label_join(metric, "dst_label", ",", "src_label"))`,
shardingLabels: []string{"pod"},
},
{
name: "label_join and aggregation on multiple labels. Can be sharded by the static one",
expression: `sum by (pod, dst_label) (label_join(metric, "dst_label", ",", "src_label"))`,
shardingLabels: []string{"pod"},
},
{
name: "binary expression with vector matching and label_replace",
expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`,
shardingLabels: []string{"pod"},
},
{
name: "nested label joins",
expression: `label_join(sum by (pod) (label_join(metric, "dst_label", ",", "src_label")), "dst_label1", ",", "dst_label")`,
shardingLabels: []string{"pod"},
},
{
name: "complex query with label_replace, binary expr and aggregations on dynamic label",
expression: `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[1d:5m])) by (instance, cluster) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[1d:5m])) by (node, cluster), "instance", "$1", "node", "(.*)")) by (instance, cluster)`,
shardingLabels: []string{"cluster"},
},
}

shardableWithoutLabels := []testCase{
Expand Down Expand Up @@ -178,6 +204,21 @@ http_requests_total`,
expression: "histogram_quantile(0.95, sum(rate(metric[1m])) without (le, cluster))",
shardingLabels: []string{"cluster"},
},
{
name: "aggregate without expression with label_replace, sharding label is not dynamic",
expression: `sum without (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
shardingLabels: []string{"dst_label"},
},
{
name: "aggregate without expression with label_join, sharding label is not dynamic",
expression: `sum without (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`,
shardingLabels: []string{"dst_label"},
},
{
name: "aggregate without expression with label_replace",
expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`,
shardingLabels: []string{"pod", "dst_label"},
},
}

for _, test := range nonShardable {
Expand Down

0 comments on commit 0d7c266

Please sign in to comment.