diff --git a/promql/engine.go b/promql/engine.go index 2d35b90b8cd..785960df137 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -656,10 +656,9 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error { var set storage.SeriesSet var wrn storage.Warnings - params := &storage.SelectParams{ - Start: timestamp.FromTime(s.Start), - End: timestamp.FromTime(s.End), - Step: durationToInt64Millis(s.Interval), + params := storage.SelectParams{ + TimeRange: &storage.SelectRange{Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End)}, + Step: durationToInt64Millis(s.Interval), } // We need to make sure we select the timerange selected by the subquery. @@ -668,17 +667,17 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa // from end also. subqOffset := ng.cumulativeSubqueryOffset(path) offsetMilliseconds := durationMilliseconds(subqOffset) - params.Start = params.Start - offsetMilliseconds + params.TimeRange.Start = params.TimeRange.Start - offsetMilliseconds switch n := node.(type) { case *parser.VectorSelector: if evalRange == 0 { - params.Start = params.Start - durationMilliseconds(ng.lookbackDelta) + params.TimeRange.Start = params.TimeRange.Start - durationMilliseconds(ng.lookbackDelta) } else { params.Range = durationMilliseconds(evalRange) // For all matrix queries we want to ensure that we have (end-start) + range selected // this way we have `range` data before the start time - params.Start = params.Start - durationMilliseconds(evalRange) + params.TimeRange.Start = params.TimeRange.Start - durationMilliseconds(evalRange) evalRange = 0 } @@ -686,8 +685,8 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa params.By, params.Grouping = extractGroupsFromPath(path) if n.Offset > 0 { offsetMilliseconds := durationMilliseconds(n.Offset) - params.Start = params.Start - offsetMilliseconds - params.End = params.End - offsetMilliseconds + params.TimeRange.Start = params.TimeRange.Start - offsetMilliseconds + params.TimeRange.End = params.TimeRange.End - offsetMilliseconds } set, wrn, err = querier.Select(params, n.LabelMatchers...) diff --git a/promql/engine_test.go b/promql/engine_test.go index e75c27bca67..58c361f4906 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -175,15 +175,12 @@ type errQuerier struct { err error } -func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *errQuerier) Select(storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return errSeriesSet{err: q.err}, nil, q.err } -func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return errSeriesSet{err: q.err}, nil, q.err -} -func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } -func (*errQuerier) Close() error { return nil } +func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } +func (*errQuerier) Close() error { return nil } // errSeriesSet implements storage.SeriesSet which always returns error. type errSeriesSet struct { @@ -227,30 +224,29 @@ func TestQueryError(t *testing.T) { // paramCheckerQuerier implements storage.Querier which checks the start and end times // in params. type paramCheckerQuerier struct { - start int64 - end int64 - grouping []string - by bool - selRange int64 - function string + start int64 + end int64 + grouping []string + by bool + selRange int64 + function string + seriesSorted bool t *testing.T } -func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.SelectSorted(sp, m...) -} -func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - testutil.Equals(q.t, q.start, sp.Start) - testutil.Equals(q.t, q.end, sp.End) +func (q *paramCheckerQuerier) Select(sp storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + testutil.Equals(q.t, q.start, sp.TimeRange.Start) + testutil.Equals(q.t, q.end, sp.TimeRange.End) testutil.Equals(q.t, q.grouping, sp.Grouping) testutil.Equals(q.t, q.by, sp.By) testutil.Equals(q.t, q.selRange, sp.Range) testutil.Equals(q.t, q.function, sp.Func) + testutil.Equals(q.t, q.seriesSorted, sp.SeriesSorted) return errSeriesSet{err: nil}, nil, nil } -func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (*paramCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil } func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil } diff --git a/promql/test_test.go b/promql/test_test.go index f03f6d2037a..242f6a560ef 100644 --- a/promql/test_test.go +++ b/promql/test_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/testutil" ) @@ -133,7 +134,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) { } // Get the series for the matcher. - ss, _, err := querier.Select(nil, matchers...) + ss, _, err := querier.Select(storage.SelectParams{}, matchers...) testutil.Ok(t, err) testutil.Assert(t, ss.Next(), "") storageSeries := ss.At() diff --git a/rules/manager.go b/rules/manager.go index 50c4c8b7f54..903e9f67650 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -705,7 +705,7 @@ func (g *Group) RestoreForState(ts time.Time) { matchers = append(matchers, mt) } - sset, err, _ := q.Select(nil, matchers...) + sset, err, _ := q.Select(storage.SelectParams{}, matchers...) if err != nil { level.Error(g.logger).Log("msg", "Failed to restore 'for' state", labels.AlertName, alertRule.Name(), "stage", "Select", "err", err) diff --git a/rules/manager_test.go b/rules/manager_test.go index ac74f9ea0a6..0f1d0583aa4 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -512,8 +512,8 @@ func TestForStateRestore(t *testing.T) { } func TestStaleness(t *testing.T) { - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() engineOpts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -522,9 +522,9 @@ func TestStaleness(t *testing.T) { } engine := promql.NewEngine(engineOpts) opts := &ManagerOptions{ - QueryFunc: EngineQueryFunc(engine, storage), - Appendable: storage, - TSDB: storage, + QueryFunc: EngineQueryFunc(engine, st), + Appendable: st, + TSDB: st, Context: context.Background(), Logger: log.NewNopLogger(), } @@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) { }) // A time series that has two samples and then goes stale. - app := storage.Appender() + app := st.Appender() app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2) app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN)) @@ -556,14 +556,14 @@ func TestStaleness(t *testing.T) { group.Eval(ctx, time.Unix(1, 0)) group.Eval(ctx, time.Unix(2, 0)) - querier, err := storage.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(storage.SelectParams{}, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) @@ -658,8 +658,8 @@ func TestCopyState(t *testing.T) { } func TestDeletedRuleMarkedStale(t *testing.T) { - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() oldGroup := &Group{ rules: []Rule{ NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}), @@ -672,21 +672,21 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - Appendable: storage, + Appendable: st, }, } newGroup.CopyState(oldGroup) newGroup.Eval(context.Background(), time.Unix(0, 0)) - querier, err := storage.Querier(context.Background(), 0, 2000) + querier, err := st.Querier(context.Background(), 0, 2000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(storage.SelectParams{}, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) @@ -704,8 +704,8 @@ func TestUpdate(t *testing.T) { expected := map[string]labels.Labels{ "test": labels.FromStrings("name", "value"), } - storage := teststorage.New(t) - defer storage.Close() + st := teststorage.New(t) + defer st.Close() opts := promql.EngineOpts{ Logger: nil, Reg: nil, @@ -714,9 +714,9 @@ func TestUpdate(t *testing.T) { } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ - Appendable: storage, - TSDB: storage, - QueryFunc: EngineQueryFunc(engine, storage), + Appendable: st, + TSDB: st, + QueryFunc: EngineQueryFunc(engine, st), Context: context.Background(), Logger: log.NewNopLogger(), }) @@ -1096,16 +1096,16 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) { testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine") } -func countStaleNaN(t *testing.T, storage storage.Storage) int { +func countStaleNaN(t *testing.T, st storage.Storage) int { var c int - querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000) + querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000) testutil.Ok(t, err) defer querier.Close() matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2") testutil.Ok(t, err) - set, _, err := querier.Select(nil, matcher) + set, _, err := querier.Select(storage.SelectParams{}, matcher) testutil.Ok(t, err) samples, err := readSeriesSet(set) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index d05b3c40041..10684ec9260 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1571,7 +1571,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series, _, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) testutil.Ok(t, err) testutil.Equals(t, false, series.Next(), "series found in tsdb") @@ -1581,7 +1581,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) + series, _, err = q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) testutil.Ok(t, err) testutil.Equals(t, true, series.Next(), "series not found in tsdb") testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") @@ -1617,7 +1617,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) { q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) testutil.Ok(t, err) - series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + series, _, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) testutil.Ok(t, err) testutil.Equals(t, false, series.Next(), "series found in tsdb") } diff --git a/storage/fanout.go b/storage/fanout.go index bad6a8df127..1e0ae73c53c 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -221,20 +221,16 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier { } // Select returns a set of series that matches the given label matchers. -func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { - if len(q.queriers) != 1 { - // We need to sort for NewMergeSeriesSet to work. - return q.SelectSorted(params, matchers...) +func (q *mergeQuerier) Select(params SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { + if len(q.queriers) == 1 { + return q.queriers[0].Select(params, matchers...) } - return q.queriers[0].Select(params, matchers...) -} - -// SelectSorted returns a set of sorted series that matches the given label matchers. -func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { - seriesSets := make([]SeriesSet, 0, len(q.queriers)) - var warnings Warnings - var priErr error = nil + var ( + seriesSets = make([]SeriesSet, 0, len(q.queriers)) + warnings Warnings + priErr error + ) type queryResult struct { qr Querier set SeriesSet @@ -242,9 +238,12 @@ func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Ma selectError error } queryResultChan := make(chan *queryResult) + + // We need to sort for NewMergeSeriesSet to work. + params.SeriesSorted = true for _, querier := range q.queriers { go func(qr Querier) { - set, wrn, err := qr.SelectSorted(params, matchers...) + set, wrn, err := qr.Select(params, matchers...) queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} }(querier) } diff --git a/storage/fanout/fanout_test.go b/storage/fanout/fanout_test.go index cede30ccac4..cf557de901c 100644 --- a/storage/fanout/fanout_test.go +++ b/storage/fanout/fanout_test.go @@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) { matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a") testutil.Ok(t, err) - seriesSet, _, err := querier.SelectSorted(nil, matcher) + seriesSet, _, err := querier.Select(storage.SelectParams{}, matcher) testutil.Ok(t, err) result := make(map[int64]float64) diff --git a/storage/interface.go b/storage/interface.go index e85a159e45c..7a6f2d071fb 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -60,10 +60,7 @@ type Queryable interface { // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) - - // SelectSorted returns a sorted set of series that matches the given label matchers. - SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) + Select(SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) // LabelValues returns all potential values for a label name. // It is not safe to use the strings beyond the lifefime of the querier. @@ -76,10 +73,15 @@ type Querier interface { Close() error } -// SelectParams specifies parameters passed to data selections. -type SelectParams struct { +// SelectRange specifies time parameters passed to data selections. +type SelectRange struct { Start int64 // Start time in milliseconds for this select. End int64 // End time in milliseconds for this select. +} + +// SelectParams specifies parameters passed to data selections. +type SelectParams struct { + TimeRange *SelectRange // If non-nil it means certain time to select. Step int64 // Query step size in milliseconds. Func string // String representation of surrounding function or aggregation. @@ -87,6 +89,19 @@ type SelectParams struct { Grouping []string // List of label names used in aggregation. By bool // Indicate whether it is without or by. Range int64 // Range vector selector range in milliseconds. + + SeriesSorted bool // If enabled, selected series has to be sorted. +} + +// IsEmpty returns true if SelectParams is considered empty. +func (s SelectParams) IsEmpty() bool { + return s.TimeRange == nil && + s.Step == 0 && + s.Func == "" && + len(s.Grouping) == 0 && + !s.By && + s.Range == 0 && + !s.SeriesSorted } // QueryableFunc is an adapter to allow the use of ordinary functions as diff --git a/storage/interface_test.go b/storage/interface_test.go new file mode 100644 index 00000000000..6d208ca1b94 --- /dev/null +++ b/storage/interface_test.go @@ -0,0 +1,24 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "testing" + + "github.com/prometheus/prometheus/util/testutil" +) + +func TestSelectParams_IsEmpty(t *testing.T) { + testutil.Assert(t, (SelectParams{}).IsEmpty(), "default empty value is not empty") +} diff --git a/storage/noop.go b/storage/noop.go index a8be634fd6e..149b6531e6d 100644 --- a/storage/noop.go +++ b/storage/noop.go @@ -24,11 +24,7 @@ func NoopQuerier() Querier { return noopQuerier{} } -func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { - return NoopSeriesSet(), nil, nil -} - -func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { +func (noopQuerier) Select(SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) { return NoopSeriesSet(), nil, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 8f07eb0d1e4..2d33acee249 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -79,23 +79,25 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error } // ToQuery builds a Query proto. -func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) { +func ToQuery(from, to int64, matchers []*labels.Matcher, p storage.SelectParams) (*prompb.Query, error) { ms, err := toLabelMatchers(matchers) if err != nil { return nil, err } var rp *prompb.ReadHints - if p != nil { + if !p.IsEmpty() { rp = &prompb.ReadHints{ StepMs: p.Step, Func: p.Func, - StartMs: p.Start, - EndMs: p.End, Grouping: p.Grouping, By: p.By, RangeMs: p.Range, } + if p.TimeRange != nil { + rp.StartMs = p.TimeRange.Start + rp.EndMs = p.TimeRange.End + } } return &prompb.Query{ diff --git a/storage/remote/read.go b/storage/remote/read.go index 3e5c9573c42..9170532bd77 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -57,15 +57,9 @@ type querier struct { client *Client } -// Select implements storage.Querier and uses the given matchers to read series +// Select implements storage.Querier and uses the given matchers to read sorted series // sets from the Client. -func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.SelectSorted(p, matchers...) -} - -// SelectSorted implements storage.Querier and uses the given matchers to read series -// sets from the Client. -func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *querier) Select(p storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { query, err := ToQuery(q.mint, q.maxt, matchers, p) if err != nil { return nil, nil, err @@ -85,7 +79,7 @@ func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matc } // LabelValues implements storage.Querier and is a noop. -func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) { +func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) { // TODO implement? return nil, nil, nil } @@ -124,7 +118,7 @@ type externalLabelsQuerier struct { // Select adds equality matchers for all external labels to the list of matchers // before calling the wrapped storage.Queryable. The added external labels are // removed from the returned series sets. -func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q externalLabelsQuerier) Select(p storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { m, added := q.addExternalLabels(matchers) s, warnings, err := q.Querier.Select(p, m...) if err != nil { @@ -177,7 +171,7 @@ type requiredMatchersQuerier struct { // Select returns a NoopSeriesSet if the given matchers don't match the label // set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier. -func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q requiredMatchersQuerier) Select(p storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { ms := q.requiredMatchers for _, m := range matchers { for i, r := range ms { diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 28dfa79fb51..4f93562d041 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -117,7 +117,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) { }, } want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) - have, _, err := q.Select(nil, matchers...) + have, _, err := q.Select(storage.SelectParams{}, matchers...) if err != nil { t.Error(err) } @@ -242,7 +242,7 @@ type mockSeriesSet struct { storage.SeriesSet } -func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (mockQuerier) Select(storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return mockSeriesSet{}, nil, nil } @@ -398,7 +398,7 @@ func TestRequiredLabelsQuerierSelect(t *testing.T) { requiredMatchers: test.requiredMatchers, } - have, _, err := q.Select(nil, test.matchers...) + have, _, err := q.Select(storage.SelectParams{}, test.matchers...) if err != nil { t.Error(err) } diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 1febb4004fa..f751e33175f 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,6 +34,7 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" @@ -616,7 +617,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) { err = merr.Err() }() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) + ss, ws, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")) if err != nil { return err } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 967f8666514..6c37eaeea1e 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -565,7 +565,7 @@ func TestHeadDeleteSimple(t *testing.T) { for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) testutil.Ok(t, err) - actSeriesSet, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) + actSeriesSet, ws, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -612,7 +612,7 @@ func TestDeleteUntilCurMax(t *testing.T) { // Test the series returns no samples. The series is cleared only after compaction. q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) - res, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) testutil.Assert(t, res.Next(), "series is not present") @@ -627,7 +627,7 @@ func TestDeleteUntilCurMax(t *testing.T) { testutil.Ok(t, app.Commit()) q, err = NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) - res, ws, err = q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) + res, ws, err = q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) testutil.Assert(t, res.Next(), "series don't exist") @@ -803,7 +803,7 @@ func TestDelete_e2e(t *testing.T) { q, err := NewBlockQuerier(hb, 0, 100000) testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.SelectSorted(nil, del.ms...) + ss, ws, err := q.Select(storage.SelectParams{}, del.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) // Build the mockSeriesSet. @@ -1091,7 +1091,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss, ws, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1119,7 +1119,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { testutil.Ok(t, err) defer q.Close() - ss, ws, err := q.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) + ss, ws, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1403,7 +1403,7 @@ func TestHeadSeriesWithTimeBoundaries(t *testing.T) { seriesCount := 0 samplesCount := 0 - ss, _, err := q.Select(nil, matcher) + ss, _, err := q.Select(storage.SelectParams{}, matcher) testutil.Ok(t, err) for ss.Next() { i := ss.At().Iterator() @@ -1443,7 +1443,7 @@ func TestMemSeriesIsolation(t *testing.T) { testutil.Ok(t, err) defer querier.Close() - ss, _, err := querier.Select(nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + ss, _, err := querier.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) testutil.Ok(t, err) _, seriesSet, err := expandSeriesSet(ss) diff --git a/tsdb/querier.go b/tsdb/querier.go index a5181c3f692..cacce94d730 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -85,23 +85,22 @@ func (q *querier) lvals(qs []storage.Querier, n string) ([]string, storage.Warni return mergeStrings(s1, s2), ws, nil } -func (q *querier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - if len(q.blocks) != 1 { - return q.SelectSorted(p, ms...) - } - // Sorting Head series is slow, and unneeded when only the - // Head is being queried. Sorting blocks is a noop. - return q.blocks[0].Select(p, ms...) -} - -func (q *querier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *querier) Select(p storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if len(q.blocks) == 0 { return storage.EmptySeriesSet(), nil, nil } + if len(q.blocks) == 1 { + return q.blocks[0].Select(p, ms...) + } + + // Sorting Head series is slow, and unneeded when only the + // Head is being queried. Sorting blocks is a noop. + // Still we have to sort if blocks > 1 as Merged Series requires. + p.SeriesSorted = true ss := make([]storage.SeriesSet, len(q.blocks)) var ws storage.Warnings for i, b := range q.blocks { - s, w, err := b.SelectSorted(p, ms...) + s, w, err := b.Select(p, ms...) ws = append(ws, w...) if err != nil { return nil, ws, err @@ -127,20 +126,16 @@ type verticalQuerier struct { querier } -func (q *verticalQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - return q.sel(p, q.blocks, ms) -} - -func (q *verticalQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *verticalQuerier) Select(p storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { return q.sel(p, q.blocks, ms) } -func (q *verticalQuerier) sel(p *storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { +func (q *verticalQuerier) sel(p storage.SelectParams, qs []storage.Querier, ms []*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { if len(qs) == 0 { return storage.EmptySeriesSet(), nil, nil } if len(qs) == 1 { - return qs[0].SelectSorted(p, ms...) + return qs[0].Select(p, ms...) } l := len(qs) / 2 @@ -195,42 +190,24 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - base, err := LookupChunkSeries(q.index, q.tombstones, ms...) - if err != nil { - return nil, nil, err - } +func (q *blockQuerier) Select(p storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { + var base storage.ChunkSeriesSet + var err error - mint := q.mint - maxt := q.maxt - if p != nil { - mint = p.Start - maxt = p.End + if p.SeriesSorted { + base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...) + } else { + base, err = LookupChunkSeries(q.index, q.tombstones, ms...) } - return &blockSeriesSet{ - set: &populatedChunkSeries{ - set: base, - chunks: q.chunks, - mint: mint, - maxt: maxt, - }, - - mint: mint, - maxt: maxt, - }, nil, nil -} - -func (q *blockQuerier) SelectSorted(p *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { - base, err := LookupChunkSeriesSorted(q.index, q.tombstones, ms...) if err != nil { return nil, nil, err } mint := q.mint maxt := q.maxt - if p != nil { - mint = p.Start - maxt = p.End + if p.TimeRange != nil { + mint = p.TimeRange.Start + maxt = p.TimeRange.End } return &blockSeriesSet{ set: &populatedChunkSeries{ diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index 76543520359..b65a54a7536 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -147,12 +147,7 @@ func BenchmarkQuerierSelect(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - var ss storage.SeriesSet - if sorted { - ss, _, err = q.SelectSorted(nil, matcher) - } else { - ss, _, err = q.Select(nil, matcher) - } + ss, _, err := q.Select(storage.SelectParams{SeriesSorted: sorted}, matcher) testutil.Ok(b, err) for ss.Next() { } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 7780817375f..003f6f4de05 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -373,7 +373,7 @@ Outer: maxt: c.maxt, } - res, ws, err := querier.Select(nil, c.ms...) + res, ws, err := querier.Select(storage.SelectParams{}, c.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -536,7 +536,7 @@ Outer: maxt: c.maxt, } - res, ws, err := querier.Select(nil, c.ms...) + res, ws, err := querier.Select(storage.SelectParams{}, c.ms...) testutil.Ok(t, err) testutil.Equals(t, 0, len(ws)) @@ -1710,7 +1710,7 @@ func BenchmarkQuerySeek(b *testing.B) { b.ResetTimer() b.ReportAllocs() - ss, ws, err := sq.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + ss, ws, err := sq.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) for ss.Next() { it := ss.At().Iterator() for t := mint; t <= maxt; t++ { @@ -1848,7 +1848,7 @@ func BenchmarkSetMatcher(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - _, ws, err := que.Select(nil, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) + _, ws, err := que.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "test", c.pattern)) testutil.Ok(b, err) testutil.Equals(b, 0, len(ws)) } @@ -2297,7 +2297,7 @@ func benchQuery(b *testing.B, expExpansions int, q storage.Querier, selectors la b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - ss, ws, err := q.Select(nil, selectors...) + ss, ws, err := q.Select(storage.SelectParams{}, selectors...) testutil.Ok(b, err) testutil.Equals(b, 0, len(ws)) var actualExpansions int diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5ea367fa225..155600a29b5 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -531,7 +531,7 @@ func (api *API) series(r *http.Request) apiFuncResult { var sets []storage.SeriesSet var warnings storage.Warnings for _, mset := range matcherSets { - s, wrn, err := q.Select(nil, mset...) //TODO + s, wrn, err := q.Select(storage.SelectParams{}, mset...) warnings = append(warnings, wrn...) if err != nil { return apiFuncResult{nil, &apiError{errorExec, err}, warnings, nil} @@ -1161,10 +1161,11 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { return } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { - // The streaming API provides sorted series. - // TODO(bwplotka): Handle warnings via query log. - set, _, err := querier.SelectSorted(selectParams, filteredMatchers...) + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams storage.SelectParams, filteredMatchers []*labels.Matcher) error { + // The streaming API has to provide the series sorted. + selectParams.SeriesSorted = true + + set, _, err := querier.Select(selectParams, filteredMatchers...) if err != nil { return err } @@ -1195,7 +1196,7 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { Results: make([]*prompb.QueryResult, len(req.Queries)), } for i, query := range req.Queries { - err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error { + err := api.remoteReadQuery(ctx, query, externalLabels, func(querier storage.Querier, selectParams storage.SelectParams, filteredMatchers []*labels.Matcher) error { set, _, err := querier.Select(selectParams, filteredMatchers...) if err != nil { return err @@ -1254,7 +1255,7 @@ func filterExtLabelsFromMatchers(pbMatchers []*prompb.LabelMatcher, externalLabe return filteredMatchers, nil } -func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams *storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { +func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, externalLabels map[string]string, seriesHandleFn func(querier storage.Querier, selectParams storage.SelectParams, filteredMatchers []*labels.Matcher) error) error { filteredMatchers, err := filterExtLabelsFromMatchers(query.Matchers, externalLabels) if err != nil { return err @@ -1265,14 +1266,14 @@ func (api *API) remoteReadQuery(ctx context.Context, query *prompb.Query, extern return err } - var selectParams *storage.SelectParams + var selectParams storage.SelectParams if query.Hints != nil { - selectParams = &storage.SelectParams{ + selectParams.TimeRange = &storage.SelectRange{ Start: query.Hints.StartMs, End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, } + selectParams.Step = query.Hints.StepMs + selectParams.Func = query.Hints.Func } defer func() { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index bfac2d4f1d7..5e41b40d0d9 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -488,14 +488,14 @@ func setupRemote(s storage.Storage) *httptest.Server { return } - var selectParams *storage.SelectParams + var selectParams storage.SelectParams if query.Hints != nil { - selectParams = &storage.SelectParams{ + selectParams.TimeRange = &storage.SelectRange{ Start: query.Hints.StartMs, End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, } + selectParams.Step = query.Hints.StepMs + selectParams.Func = query.Hints.Func } querier, err := s.Querier(r.Context(), query.StartTimestampMs, query.EndTimestampMs) @@ -1615,7 +1615,7 @@ func TestSampledReadEndpoint(t *testing.T) { matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e") testutil.Ok(t, err) - query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{Step: 0, Func: "avg"}) + query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2}, storage.SelectParams{Step: 0, Func: "avg"}) testutil.Ok(t, err) req := &prompb.ReadRequest{Queries: []*prompb.Query{query}} @@ -1714,19 +1714,17 @@ func TestStreamReadEndpoint(t *testing.T) { matcher3, err := labels.NewMatcher(labels.MatchEqual, "foo", "bar1") testutil.Ok(t, err) - query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, &storage.SelectParams{ - Step: 1, - Func: "avg", - Start: 0, - End: 14400001, + query1, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher2}, storage.SelectParams{ + Step: 1, + Func: "avg", + TimeRange: &storage.SelectRange{Start: 0, End: 14400001}, }) testutil.Ok(t, err) - query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, &storage.SelectParams{ - Step: 1, - Func: "avg", - Start: 0, - End: 14400001, + query2, err := remote.ToQuery(0, 14400001, []*labels.Matcher{matcher1, matcher3}, storage.SelectParams{ + Step: 1, + Func: "avg", + TimeRange: &storage.SelectRange{Start: 0, End: 14400001}, }) testutil.Ok(t, err) diff --git a/web/federate.go b/web/federate.go index e358a3dab04..1e3772627e0 100644 --- a/web/federate.go +++ b/web/federate.go @@ -81,9 +81,8 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) { vec := make(promql.Vector, 0, 8000) - params := &storage.SelectParams{ - Start: mint, - End: maxt, + params := storage.SelectParams{ + TimeRange: &storage.SelectRange{Start: mint, End: maxt}, } var sets []storage.SeriesSet