Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix querysharding labels analysis #5880

Merged
merged 4 commits into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5844](https://github.com/thanos-io/thanos/pull/5844) Query Frontend: Fixes @ modifier time range when splitting queries by interval.
- [#5854](https://github.com/thanos-io/thanos/pull/5854) Query Frontend: Handles `lookback_delta` param in query frontend.
- [#5230](https://github.com/thanos-io/thanos/pull/5230) Rule: Stateless ruler support restoring `for` state from query API servers. The query API servers should be able to access the remote write storage.
- [#5880](https://github.com/thanos-io/thanos/pull/5880) Query Frontend: Fixes some edge cases of query sharding analysis.

### Added

Expand Down
19 changes: 5 additions & 14 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,13 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger)
if err != nil {
return nil, err
}
queryInstantTripperware, err := newInstantQueryTripperware(
queryInstantTripperware := newInstantQueryTripperware(
config.NumShards,
queryRangeLimits,
queryInstantCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_instant"}, reg),
config.ForwardHeaders,
)
if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), queryInstantTripperware(next), reg)
}, nil
Expand Down Expand Up @@ -191,10 +188,7 @@ func newQueryRangeTripperware(
}

if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
analyzer := querysharding.NewQueryAnalyzer()
queryRangeMiddleware = append(
queryRangeMiddleware,
PromQLShardingMiddleware(analyzer, numShards, limits, codec, reg),
Expand Down Expand Up @@ -332,14 +326,11 @@ func newInstantQueryTripperware(
codec queryrange.Codec,
reg prometheus.Registerer,
forwardHeaders []string,
) (queryrange.Tripperware, error) {
) queryrange.Tripperware {
instantQueryMiddlewares := []queryrange.Middleware{}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
if numShards > 0 {
analyzer, err := querysharding.NewQueryAnalyzer()
if err != nil {
return nil, errors.Wrap(err, "create query analyzer")
}
analyzer := querysharding.NewQueryAnalyzer()
instantQueryMiddlewares = append(
instantQueryMiddlewares,
queryrange.InstrumentMiddleware("sharding", m),
Expand All @@ -352,7 +343,7 @@ func newInstantQueryTripperware(
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
}, nil
}
}

// shouldCache controls what kind of Thanos request should be cached.
Expand Down
28 changes: 16 additions & 12 deletions pkg/querysharding/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,6 @@ func nonShardableQuery() QueryAnalysis {
}
}

func newShardableByLabels(labels []string, by bool) QueryAnalysis {
labels = without(labels, excludedLabels)

return QueryAnalysis{
shardBy: by,
shardingLabels: labels,
}
}

func (q *QueryAnalysis) scopeToLabels(labels []string, by bool) QueryAnalysis {
labels = without(labels, excludedLabels)

Expand All @@ -39,16 +30,29 @@ func (q *QueryAnalysis) scopeToLabels(labels []string, by bool) QueryAnalysis {
}
}

if by {
if q.shardBy && by {
return QueryAnalysis{
shardBy: true,
shardingLabels: intersect(q.shardingLabels, labels),
}
}

if !q.shardBy && !by {
return QueryAnalysis{
shardBy: false,
shardingLabels: union(q.shardingLabels, labels),
}
}

// If we are sharding by and without the same time,
// keep the sharding by labels that are not in the without labels set.
labelsBy, labelsWithout := q.shardingLabels, labels
if !q.shardBy {
labelsBy, labelsWithout = labelsWithout, labelsBy
}
return QueryAnalysis{
shardBy: false,
shardingLabels: union(q.shardingLabels, labels),
shardBy: true,
shardingLabels: without(labelsBy, labelsWithout),
}
}

Expand Down
43 changes: 6 additions & 37 deletions pkg/querysharding/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,14 @@ var nonShardableFuncs = []string{
}

// NewQueryAnalyzer creates a new QueryAnalyzer.
func NewQueryAnalyzer() (*CachedQueryAnalyzer, error) {
cache, err := lru.New(256)
if err != nil {
return nil, err
}

func NewQueryAnalyzer() *CachedQueryAnalyzer {
// Ignore the error check since it throws error
// only if size is <= 0.
cache, _ := lru.New(256)
return &CachedQueryAnalyzer{
analyzer: &QueryAnalyzer{},
cache: cache,
}, nil
}
}

type cachedValue struct {
Expand Down Expand Up @@ -70,10 +68,7 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
// Analyze uses the following algorithm:
// - if a query has subqueries, such as label_join or label_replace,
// or has functions which cannot be sharded, then treat the query as non shardable.
// - if the query's root expression has grouping labels,
// then treat the query as shardable by those labels.
// - if the query's root expression has no grouping labels,
// then walk the query and find the least common labelset
// - Walk the query and find the least common labelset
// used in grouping expressions. If non-empty, treat the query
// as shardable by those labels.
// - otherwise, treat the query as non-shardable.
Expand Down Expand Up @@ -117,35 +112,9 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) {
return nonShardableQuery(), nil
}

rootAnalysis := analyzeRootExpression(expr)
if rootAnalysis.IsShardable() && rootAnalysis.shardBy {
return rootAnalysis, nil
}

return analysis, nil
}

func analyzeRootExpression(node parser.Node) QueryAnalysis {
switch n := node.(type) {
case *parser.BinaryExpr:
if n.VectorMatching != nil && n.VectorMatching.On {
shardingLabels := without(n.VectorMatching.MatchingLabels, []string{"le"})
return newShardableByLabels(shardingLabels, n.VectorMatching.On)
} else {
return nonShardableQuery()
}
case *parser.AggregateExpr:
if len(n.Grouping) == 0 {
return nonShardableQuery()
}

shardingLabels := without(n.Grouping, []string{"le"})
return newShardableByLabels(shardingLabels, !n.Without)
}

return nonShardableQuery()
}

func contains(needle string, haystack []string) bool {
for _, item := range haystack {
if needle == item {
Expand Down
16 changes: 9 additions & 7 deletions pkg/querysharding/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ sum by (container) (
{
name: "multiple binary expressions with grouping",
expression: `(http_requests_total{code="400"} + on (pod) http_requests_total{code="500"}) / on (cluster, pod) http_requests_total`,
shardingLabels: []string{"cluster", "pod"},
shardingLabels: []string{"pod"},
},
{
name: "histogram quantile",
Expand All @@ -137,6 +137,11 @@ sum by (container) (
expression: "increase(sum(http_requests_total) by (pod, cluster) [1h:1m])",
shardingLabels: []string{"cluster", "pod"},
},
{
name: "ignore vector matching with 2 aggregations",
expression: `sum(rate(node_cpu_seconds_total[3h])) by (cluster_id, mode) / ignoring(mode) group_left sum(rate(node_cpu_seconds_total[3h])) by (cluster_id)`,
shardingLabels: []string{"cluster_id"},
},
}

shardableWithoutLabels := []testCase{
Expand Down Expand Up @@ -177,8 +182,7 @@ http_requests_total`,

for _, test := range nonShardable {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.False(t, analysis.IsShardable())
Expand All @@ -187,8 +191,7 @@ http_requests_total`,

for _, test := range shardableByLabels {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand All @@ -202,8 +205,7 @@ http_requests_total`,

for _, test := range shardableWithoutLabels {
t.Run(test.name, func(t *testing.T) {
analyzer, err := NewQueryAnalyzer()
require.NoError(t, err)
analyzer := NewQueryAnalyzer()
analysis, err := analyzer.Analyze(test.expression)
require.NoError(t, err)
require.True(t, analysis.IsShardable())
Expand Down