Skip to content

Commit

Permalink
storage: Removed SelectSorted method; Simplified interface.
Browse files Browse the repository at this point in the history
I found during work on #5882 that
we do so many repetitions because of this, for not good reason. I think
I found a good balance between convenience and readability with just one method.
Smaller the interface = better.

Also I don't know what TestSelectSorted was testing, but now it's testing sorting.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 12, 2020
1 parent bc703b6 commit a44b670
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 156 deletions.
17 changes: 8 additions & 9 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,10 +656,9 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
var set storage.SeriesSet
var wrn storage.Warnings
params := &storage.SelectParams{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
Step: durationToInt64Millis(s.Interval),
params := storage.SelectParams{
TimeRange: &storage.SelectRange{Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End)},
Step: durationToInt64Millis(s.Interval),
}

// We need to make sure we select the timerange selected by the subquery.
Expand All @@ -668,26 +667,26 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
// from end also.
subqOffset := ng.cumulativeSubqueryOffset(path)
offsetMilliseconds := durationMilliseconds(subqOffset)
params.Start = params.Start - offsetMilliseconds
params.TimeRange.Start = params.TimeRange.Start - offsetMilliseconds

switch n := node.(type) {
case *parser.VectorSelector:
if evalRange == 0 {
params.Start = params.Start - durationMilliseconds(ng.lookbackDelta)
params.TimeRange.Start = params.TimeRange.Start - durationMilliseconds(ng.lookbackDelta)
} else {
params.Range = durationMilliseconds(evalRange)
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(evalRange)
params.TimeRange.Start = params.TimeRange.Start - durationMilliseconds(evalRange)
evalRange = 0
}

params.Func = extractFuncFromPath(path)
params.By, params.Grouping = extractGroupsFromPath(path)
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds
params.End = params.End - offsetMilliseconds
params.TimeRange.Start = params.TimeRange.Start - offsetMilliseconds
params.TimeRange.End = params.TimeRange.End - offsetMilliseconds
}

set, wrn, err = querier.Select(params, n.LabelMatchers...)
Expand Down
36 changes: 16 additions & 20 deletions promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,12 @@ type errQuerier struct {
err error
}

func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *errQuerier) Select(storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return errSeriesSet{err: q.err}, nil, q.err
}
func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return errSeriesSet{err: q.err}, nil, q.err
}
func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }
func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }

// errSeriesSet implements storage.SeriesSet which always returns error.
type errSeriesSet struct {
Expand Down Expand Up @@ -227,30 +224,29 @@ func TestQueryError(t *testing.T) {
// paramCheckerQuerier implements storage.Querier which checks the start and end times
// in params.
type paramCheckerQuerier struct {
start int64
end int64
grouping []string
by bool
selRange int64
function string
start int64
end int64
grouping []string
by bool
selRange int64
function string
seriesSorted bool

t *testing.T
}

func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return q.SelectSorted(sp, m...)
}
func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
testutil.Equals(q.t, q.start, sp.Start)
testutil.Equals(q.t, q.end, sp.End)
func (q *paramCheckerQuerier) Select(sp storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
testutil.Equals(q.t, q.start, sp.TimeRange.Start)
testutil.Equals(q.t, q.end, sp.TimeRange.End)
testutil.Equals(q.t, q.grouping, sp.Grouping)
testutil.Equals(q.t, q.by, sp.By)
testutil.Equals(q.t, q.selRange, sp.Range)
testutil.Equals(q.t, q.function, sp.Func)
testutil.Equals(q.t, q.seriesSorted, sp.SeriesSorted)

return errSeriesSet{err: nil}, nil, nil
}
func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
func (*paramCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
Expand Down
6 changes: 3 additions & 3 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {

q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
series, _, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")

Expand All @@ -1581,7 +1581,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {

q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
series, _, err = q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Ok(t, err)
testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
Expand Down Expand Up @@ -1617,7 +1617,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {

q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
series, _, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")
}
Expand Down
25 changes: 12 additions & 13 deletions storage/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,30 +221,29 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
}

// Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
if len(q.queriers) != 1 {
// We need to sort for NewMergeSeriesSet to work.
return q.SelectSorted(params, matchers...)
func (q *mergeQuerier) Select(params SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
if len(q.queriers) == 1 {
return q.queriers[0].Select(params, matchers...)
}
return q.queriers[0].Select(params, matchers...)
}

// SelectSorted returns a set of sorted series that matches the given label matchers.
func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
seriesSets := make([]SeriesSet, 0, len(q.queriers))
var warnings Warnings

var priErr error = nil
var (
seriesSets = make([]SeriesSet, 0, len(q.queriers))
warnings Warnings
priErr error
)
type queryResult struct {
qr Querier
set SeriesSet
wrn Warnings
selectError error
}
queryResultChan := make(chan *queryResult)

// We need to sort for NewMergeSeriesSet to work.
params.SeriesSorted = true
for _, querier := range q.queriers {
go func(qr Querier) {
set, wrn, err := qr.SelectSorted(params, matchers...)
set, wrn, err := qr.Select(params, matchers...)
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
}(querier)
}
Expand Down
2 changes: 1 addition & 1 deletion storage/fanout/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err)

seriesSet, _, err := querier.SelectSorted(nil, matcher)
seriesSet, _, err := querier.Select(storage.SelectParams{}, matcher)
testutil.Ok(t, err)

result := make(map[int64]float64)
Expand Down
26 changes: 20 additions & 6 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ type Queryable interface {
// time range.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)

// SelectSorted returns a sorted set of series that matches the given label matchers.
SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
Select(SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)

// LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier.
Expand All @@ -76,17 +73,34 @@ type Querier interface {
Close() error
}

// SelectParams specifies parameters passed to data selections.
type SelectParams struct {
// SelectRange specifies time parameters passed to data selections.
type SelectRange struct {
Start int64 // Start time in milliseconds for this select.
End int64 // End time in milliseconds for this select.
}

// SelectParams specifies parameters passed to data selections.
type SelectParams struct {
TimeRange *SelectRange // If non-nil it means certain time to select.

Step int64 // Query step size in milliseconds.
Func string // String representation of surrounding function or aggregation.

Grouping []string // List of label names used in aggregation.
By bool // Indicate whether it is without or by.
Range int64 // Range vector selector range in milliseconds.

SeriesSorted bool // If enabled, selected series has to be sorted.
}

func (s *SelectParams) IsEmpty() bool {
return s.TimeRange == nil &&
s.Step == 0 &&
s.Func == "" &&
len(s.Grouping) == 0 &&
!s.By &&
s.Range == 0 &&
!s.SeriesSorted
}

// QueryableFunc is an adapter to allow the use of ordinary functions as
Expand Down
6 changes: 1 addition & 5 deletions storage/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ func NoopQuerier() Querier {
return noopQuerier{}
}

func (noopQuerier) Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
return NoopSeriesSet(), nil, nil
}

func (noopQuerier) SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
func (noopQuerier) Select(SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error) {
return NoopSeriesSet(), nil, nil
}

Expand Down
10 changes: 6 additions & 4 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,25 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
}

// ToQuery builds a Query proto.
func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) {
func ToQuery(from, to int64, matchers []*labels.Matcher, p storage.SelectParams) (*prompb.Query, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
}

var rp *prompb.ReadHints
if p != nil {
if !p.IsEmpty() {
rp = &prompb.ReadHints{
StepMs: p.Step,
Func: p.Func,
StartMs: p.Start,
EndMs: p.End,
Grouping: p.Grouping,
By: p.By,
RangeMs: p.Range,
}
if p.TimeRange != nil {
rp.StartMs = p.TimeRange.Start
rp.EndMs = p.TimeRange.End
}
}

return &prompb.Query{
Expand Down
16 changes: 5 additions & 11 deletions storage/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,9 @@ type querier struct {
client *Client
}

// Select implements storage.Querier and uses the given matchers to read series
// Select implements storage.Querier and uses the given matchers to read sorted series
// sets from the Client.
func (q *querier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return q.SelectSorted(p, matchers...)
}

// SelectSorted implements storage.Querier and uses the given matchers to read series
// sets from the Client.
func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *querier) Select(p storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
query, err := ToQuery(q.mint, q.maxt, matchers, p)
if err != nil {
return nil, nil, err
Expand All @@ -85,7 +79,7 @@ func (q *querier) SelectSorted(p *storage.SelectParams, matchers ...*labels.Matc
}

// LabelValues implements storage.Querier and is a noop.
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
func (q *querier) LabelValues(string) ([]string, storage.Warnings, error) {
// TODO implement?
return nil, nil, nil
}
Expand Down Expand Up @@ -124,7 +118,7 @@ type externalLabelsQuerier struct {
// Select adds equality matchers for all external labels to the list of matchers
// before calling the wrapped storage.Queryable. The added external labels are
// removed from the returned series sets.
func (q externalLabelsQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q externalLabelsQuerier) Select(p storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
m, added := q.addExternalLabels(matchers)
s, warnings, err := q.Querier.Select(p, m...)
if err != nil {
Expand Down Expand Up @@ -177,7 +171,7 @@ type requiredMatchersQuerier struct {

// Select returns a NoopSeriesSet if the given matchers don't match the label
// set of the requiredMatchersQuerier. Otherwise it'll call the wrapped querier.
func (q requiredMatchersQuerier) Select(p *storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q requiredMatchersQuerier) Select(p storage.SelectParams, matchers ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
ms := q.requiredMatchers
for _, m := range matchers {
for i, r := range ms {
Expand Down
4 changes: 2 additions & 2 deletions storage/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestExternalLabelsQuerierSelect(t *testing.T) {
},
}
want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels)
have, _, err := q.Select(nil, matchers...)
have, _, err := q.Select(storage.SelectParams{}, matchers...)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -242,7 +242,7 @@ type mockSeriesSet struct {
storage.SeriesSet
}

func (mockQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (mockQuerier) Select(storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return mockSeriesSet{}, nil, nil
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func TestDelete_e2e(t *testing.T) {
q, err := NewBlockQuerier(hb, 0, 100000)
testutil.Ok(t, err)
defer q.Close()
ss, ws, err := q.SelectSorted(nil, del.ms...)
ss, ws, err := q.Select(nil, del.ms...)
testutil.Ok(t, err)
testutil.Equals(t, 0, len(ws))
// Build the mockSeriesSet.
Expand Down
Loading

0 comments on commit a44b670

Please sign in to comment.