Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Allow passing a storeMatcher[] to select matching stores when debugging the querier #2931

Merged
merged 1 commit into from
Aug 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page.
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.
- [#2926](https://github.com/thanos-io/thanos/pull/2926) API: Add new blocks HTTP API to serve blocks metadata. The status endpoints (`/api/v1/status/flags`, `/api/v1/status/runtimeinfo` and `/api/v1/status/buildinfo`) are now available on all components with a HTTP API.
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.
Expand All @@ -34,6 +34,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2893](https://github.com/thanos-io/thanos/pull/2893) Store: Rename metric `thanos_bucket_store_cached_postings_compression_time_seconds` to `thanos_bucket_store_cached_postings_compression_time_seconds_total`.
- [#2915](https://github.com/thanos-io/thanos/pull/2915) Receive,Ruler: Enable TSDB directory locking by default. Add a new flag (`--tsdb.no-lockfile`) to override behavior.
- [#2902](https://github.com/thanos-io/thanos/pull/2902) ui: React: Separate dedupe and partial response checkboxes per panel.
- [#2931](https://github.com/thanos-io/thanos/pull/2931) Query: Allow passing a `storeMatcher[]` to select matching stores when debugging the querier. See [documentation](https://thanos.io/components/query.md/#store-filtering)

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
20 changes: 20 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,26 @@ Thanos Querier has the ability to perform concurrent select request per query. I
The maximum number of concurrent requests are being made per query is controller by `query.max-concurrent-select` flag.
Keep in mind that the maximum number of concurrent queries that are handled by querier is controlled by `query.max-concurrent`. Please consider implications of combined value while tuning the querier.

### Store filtering

It's possible to provide a set of matchers to the Querier api to select specific stores to be used during the query using the `storeMatch[]` parameter. It is useful when debugging a slow/broken store.
It uses the same format as the matcher of [Prometheus' federate api](https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers).
Note that at the moment the querier only supports the `__address__` which contain the address of the store as it is shown on the `/stores` endoint of the UI.

Example:
```
- targets:
- prometheus-foo.thanos-sidecar:10901
- prometheus-bar.thanos-sidecar:10901
```

```
http://localhost:10901/api/v1/query?query=up&dedup=true&partial_response=true&storeMatch={__address__=~"prometheus-foo.*"}
```

Will only return metrics from `prometheus-foo.thanos-sidecar:10901`


## Expose UI on a sub-path

It is possible to expose thanos-query UI and optionally API on a sub-path.
Expand Down
51 changes: 46 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,27 @@ func (qapi *QueryAPI) parseReplicaLabelsParam(r *http.Request) (replicaLabels []
return replicaLabels, nil
}

func (qapi *QueryAPI) parseStoreMatchersParam(r *http.Request) (storeMatchers [][]storepb.LabelMatcher, _ *api.ApiError) {
const storeMatcherParam = "storeMatch[]"
if err := r.ParseForm(); err != nil {
return nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

for _, s := range r.Form[storeMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
stm, err := storepb.TranslatePromMatchers(matchers...)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Wrap(err, "convert store matchers")}
}
storeMatchers = append(storeMatchers, stm)
}

return storeMatchers, nil
}

func (qapi *QueryAPI) parseDownsamplingParamMillis(r *http.Request, defaultVal time.Duration) (maxResolutionMillis int64, _ *api.ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution := 0 * time.Second
Expand Down Expand Up @@ -236,6 +257,11 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr
}

storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
Expand All @@ -250,7 +276,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
qry, err := qapi.queryEngine.NewInstantQuery(qapi.queryableCreate(enableDedup, replicaLabels, storeMatchers, maxSourceResolution, enablePartialResponse, false), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down Expand Up @@ -335,6 +361,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

// If no max_source_resolution is specified fit at least 5 samples between steps.
maxSourceResolution, apiErr := qapi.parseDownsamplingParamMillis(r, step/5)
if apiErr != nil {
Expand All @@ -351,7 +382,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
defer span.Finish()

qry, err := qapi.queryEngine.NewRangeQuery(
qapi.queryableCreate(enableDedup, replicaLabels, maxSourceResolution, enablePartialResponse, false),
qapi.queryableCreate(enableDedup, replicaLabels, storeMatchers, maxSourceResolution, enablePartialResponse, false),
r.FormValue("query"),
start,
end,
Expand Down Expand Up @@ -399,7 +430,7 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := qapi.queryableCreate(true, nil, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -474,12 +505,17 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, apiErr
}

storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(enableDedup, replicaLabels, math.MaxInt64, enablePartialResponse, true).
q, err := qapi.queryableCreate(enableDedup, replicaLabels, storeMatchers, math.MaxInt64, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
Expand Down Expand Up @@ -537,7 +573,12 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
storeMatchers, apiErr := qapi.parseStoreMatchersParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeMatchers, 0, enablePartialResponse, false).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand All @@ -27,17 +28,18 @@ import (
// replicaLabels at query time.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behavior of proxy.
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable
type QueryableCreator func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
keeper := gate.NewKeeper(reg)

return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return func(deduplicate bool, replicaLabels []string, storeMatchers [][]storepb.LabelMatcher, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
reg: reg,
replicaLabels: replicaLabels,
storeMatchers: storeMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
Expand All @@ -54,6 +56,7 @@ type queryable struct {
logger log.Logger
reg prometheus.Registerer
replicaLabels []string
storeMatchers [][]storepb.LabelMatcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
Expand All @@ -66,7 +69,7 @@ type queryable struct {

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.storeMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateKeeper.NewGate(q.maxConcurrentSelects), q.selectTimeout), nil
}

type querier struct {
Expand All @@ -76,6 +79,7 @@ type querier struct {
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
storeMatchers [][]storepb.LabelMatcher
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
Expand All @@ -93,6 +97,7 @@ func newQuerier(
reg prometheus.Registerer,
mint, maxt int64,
replicaLabels []string,
storeMatchers [][]storepb.LabelMatcher,
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
Expand Down Expand Up @@ -120,6 +125,7 @@ func newQuerier(
mint: mint,
maxt: maxt,
replicaLabels: rl,
storeMatchers: storeMatchers,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
Expand Down Expand Up @@ -253,6 +259,9 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

aggrs := aggrsFromFunc(hints.Func)

// TODO: Pass it using the SerieRequest instead of relying on context
ctx = context.WithValue(ctx, store.StoreMatcherKey, q.storeMatchers)

resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
Expand Down
10 changes: 5 additions & 5 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestQueryableCreator_MaxResolution(t *testing.T) {
queryableCreator := NewQueryableCreator(nil, nil, testProxy, 2, 5*time.Second)

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, nil, oneHourMillis, false, false)
queryable := queryableCreator(false, nil, nil, oneHourMillis, false, false)

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestQuerier_DownsampledData(t *testing.T) {
}

timeout := 10 * time.Second
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, 9999999, false, false)
q := NewQueryableCreator(nil, nil, testProxy, 2, timeout)(false, nil, nil, 9999999, false, false)
engine := promql.NewEngine(
promql.EngineOpts{
MaxSamples: math.MaxInt32,
Expand Down Expand Up @@ -510,7 +510,7 @@ func TestQuerier_Select(t *testing.T) {
{dedup: true, expected: []series{tcase.expectedAfterDedup}},
} {
g := gate.New(2)
q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
q := newQuerier(context.Background(), nil, nil, tcase.mint, tcase.maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) {
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, false, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -754,7 +754,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, s, true, 0, true, false, g, timeout)
q := newQuerier(context.Background(), logger, nil, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, g, timeout)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
41 changes: 39 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"google.golang.org/grpc/status"
)

type ctxKey int

// StoreMatcherKey is the context key for the store's allow list.
const StoreMatcherKey = ctxKey(0)

// Client holds meta information about a store.
type Client interface {
// Client to access the store.
Expand Down Expand Up @@ -255,8 +260,14 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
// NOTE: all matchers are validated in matchesExternalLabels method so we explicitly ignore error.
var ok bool
tracing.DoInSpan(gctx, "store_matches", func(ctx context.Context) {
storeMatcher := [][]storepb.LabelMatcher{}
if ctxVal := srv.Context().Value(StoreMatcherKey); ctxVal != nil {
if value, ok := ctxVal.([][]storepb.LabelMatcher); ok {
storeMatcher = value
}
}
// We can skip error, we already translated matchers once.
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...)
ok, _ = storeMatches(st, r.MinTime, r.MaxTime, storeMatcher, r.Matchers...)
})
if !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out", st))
Expand Down Expand Up @@ -498,14 +509,40 @@ func (s *streamSeriesSet) Err() error {

// matchStore returns true if the given store may hold data for the given label
// matchers.
func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher) (bool, error) {
func storeMatches(s Client, mint, maxt int64, storeMatcher [][]storepb.LabelMatcher, matchers ...storepb.LabelMatcher) (bool, error) {
storeMinTime, storeMaxTime := s.TimeRange()
if mint > storeMaxTime || maxt < storeMinTime {
return false, nil
}
match, err := storeMatchMetadata(s, storeMatcher)
if err != nil || !match {
return match, err
}
return labelSetsMatch(s.LabelSets(), matchers)
}

// storeMatch return true if the store's metadata labels match the storeMatcher.
func storeMatchMetadata(s Client, storeMatcher [][]storepb.LabelMatcher) (bool, error) {
clientLabels := generateMetadataClientLabels(s)
if len(storeMatcher) == 0 {
return true, nil
}
res := false
for _, stm := range storeMatcher {
stmMatch, err := labelSetMatches(clientLabels, stm)
if err != nil {
return false, err
}
res = res || stmMatch
}
return res, nil
}

func generateMetadataClientLabels(s Client) storepb.LabelSet {
l := storepb.Label{Name: "__address__", Value: s.Addr()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like shallow function to me ):

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in a separate function because I wanted to separate the matching from the generation of labels. It also make the function easier to extend with new labels or be used elsewhere.

return storepb.LabelSet{Labels: []storepb.Label{l}}
}

// labelSetsMatch returns false if all label-set do not match the matchers.
func labelSetsMatch(lss []storepb.LabelSet, matchers []storepb.LabelMatcher) (bool, error) {
if len(lss) == 0 {
Expand Down