Skip to content

Commit

Permalink
storage: Removed SelectSorted method; Simplified interface.
Browse files Browse the repository at this point in the history
I found during work on #5882 that
we do so many repetitions because of this, for not good reason. I think
I found a good balance between convenience and readability with just one method.
Smaller the interface = better.

Also I don't know what TestSelectSorted was testing, but now it's testing sorting.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 12, 2020
1 parent bc703b6 commit 24b970a
Show file tree
Hide file tree
Showing 22 changed files with 190 additions and 193 deletions.
17 changes: 8 additions & 9 deletions promql/engine.go
Expand Up @@ -656,10 +656,9 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
var set storage.SeriesSet
var wrn storage.Warnings
params := &storage.SelectParams{
Start: timestamp.FromTime(s.Start),
End: timestamp.FromTime(s.End),
Step: durationToInt64Millis(s.Interval),
params := storage.SelectParams{
TimeRange: &storage.SelectRange{Start: timestamp.FromTime(s.Start), End: timestamp.FromTime(s.End)},
Step: durationToInt64Millis(s.Interval),
}

// We need to make sure we select the timerange selected by the subquery.
Expand All @@ -668,26 +667,26 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *pa
// from end also.
subqOffset := ng.cumulativeSubqueryOffset(path)
offsetMilliseconds := durationMilliseconds(subqOffset)
params.Start = params.Start - offsetMilliseconds
params.TimeRange.Start = params.TimeRange.Start - offsetMilliseconds

switch n := node.(type) {
case *parser.VectorSelector:
if evalRange == 0 {
params.Start = params.Start - durationMilliseconds(ng.lookbackDelta)
params.TimeRange.Start = params.TimeRange.Start - durationMilliseconds(ng.lookbackDelta)
} else {
params.Range = durationMilliseconds(evalRange)
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(evalRange)
params.TimeRange.Start = params.TimeRange.Start - durationMilliseconds(evalRange)
evalRange = 0
}

params.Func = extractFuncFromPath(path)
params.By, params.Grouping = extractGroupsFromPath(path)
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds
params.End = params.End - offsetMilliseconds
params.TimeRange.Start = params.TimeRange.Start - offsetMilliseconds
params.TimeRange.End = params.TimeRange.End - offsetMilliseconds
}

set, wrn, err = querier.Select(params, n.LabelMatchers...)
Expand Down
36 changes: 16 additions & 20 deletions promql/engine_test.go
Expand Up @@ -175,15 +175,12 @@ type errQuerier struct {
err error
}

func (q *errQuerier) Select(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *errQuerier) Select(storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return errSeriesSet{err: q.err}, nil, q.err
}
func (q *errQuerier) SelectSorted(*storage.SelectParams, ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return errSeriesSet{err: q.err}, nil, q.err
}
func (*errQuerier) LabelValues(name string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }
func (*errQuerier) LabelValues(string) ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (*errQuerier) Close() error { return nil }

// errSeriesSet implements storage.SeriesSet which always returns error.
type errSeriesSet struct {
Expand Down Expand Up @@ -227,30 +224,29 @@ func TestQueryError(t *testing.T) {
// paramCheckerQuerier implements storage.Querier which checks the start and end times
// in params.
type paramCheckerQuerier struct {
start int64
end int64
grouping []string
by bool
selRange int64
function string
start int64
end int64
grouping []string
by bool
selRange int64
function string
seriesSorted bool

t *testing.T
}

func (q *paramCheckerQuerier) Select(sp *storage.SelectParams, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
return q.SelectSorted(sp, m...)
}
func (q *paramCheckerQuerier) SelectSorted(sp *storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
testutil.Equals(q.t, q.start, sp.Start)
testutil.Equals(q.t, q.end, sp.End)
func (q *paramCheckerQuerier) Select(sp storage.SelectParams, _ ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
testutil.Equals(q.t, q.start, sp.TimeRange.Start)
testutil.Equals(q.t, q.end, sp.TimeRange.End)
testutil.Equals(q.t, q.grouping, sp.Grouping)
testutil.Equals(q.t, q.by, sp.By)
testutil.Equals(q.t, q.selRange, sp.Range)
testutil.Equals(q.t, q.function, sp.Func)
testutil.Equals(q.t, q.seriesSorted, sp.SeriesSorted)

return errSeriesSet{err: nil}, nil, nil
}
func (*paramCheckerQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
func (*paramCheckerQuerier) LabelValues(string) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (*paramCheckerQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
Expand Down
3 changes: 2 additions & 1 deletion promql/test_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"

"github.com/prometheus/prometheus/util/testutil"
)
Expand Down Expand Up @@ -133,7 +134,7 @@ func TestLazyLoader_WithSamplesTill(t *testing.T) {
}

// Get the series for the matcher.
ss, _, err := querier.Select(nil, matchers...)
ss, _, err := querier.Select(storage.SelectParams{}, matchers...)
testutil.Ok(t, err)
testutil.Assert(t, ss.Next(), "")
storageSeries := ss.At()
Expand Down
2 changes: 1 addition & 1 deletion rules/manager.go
Expand Up @@ -705,7 +705,7 @@ func (g *Group) RestoreForState(ts time.Time) {
matchers = append(matchers, mt)
}

sset, err, _ := q.Select(nil, matchers...)
sset, err, _ := q.Select(storage.SelectParams{}, matchers...)
if err != nil {
level.Error(g.logger).Log("msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(), "stage", "Select", "err", err)
Expand Down
42 changes: 21 additions & 21 deletions rules/manager_test.go
Expand Up @@ -512,8 +512,8 @@ func TestForStateRestore(t *testing.T) {
}

func TestStaleness(t *testing.T) {
storage := teststorage.New(t)
defer storage.Close()
st := teststorage.New(t)
defer st.Close()
engineOpts := promql.EngineOpts{
Logger: nil,
Reg: nil,
Expand All @@ -522,9 +522,9 @@ func TestStaleness(t *testing.T) {
}
engine := promql.NewEngine(engineOpts)
opts := &ManagerOptions{
QueryFunc: EngineQueryFunc(engine, storage),
Appendable: storage,
TSDB: storage,
QueryFunc: EngineQueryFunc(engine, st),
Appendable: st,
TSDB: st,
Context: context.Background(),
Logger: log.NewNopLogger(),
}
Expand All @@ -541,7 +541,7 @@ func TestStaleness(t *testing.T) {
})

// A time series that has two samples and then goes stale.
app := storage.Appender()
app := st.Appender()
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 0, 1)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1000, 2)
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2000, math.Float64frombits(value.StaleNaN))
Expand All @@ -556,14 +556,14 @@ func TestStaleness(t *testing.T) {
group.Eval(ctx, time.Unix(1, 0))
group.Eval(ctx, time.Unix(2, 0))

querier, err := storage.Querier(context.Background(), 0, 2000)
querier, err := st.Querier(context.Background(), 0, 2000)
testutil.Ok(t, err)
defer querier.Close()

matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a_plus_one")
testutil.Ok(t, err)

set, _, err := querier.Select(nil, matcher)
set, _, err := querier.Select(storage.SelectParams{}, matcher)
testutil.Ok(t, err)

samples, err := readSeriesSet(set)
Expand Down Expand Up @@ -658,8 +658,8 @@ func TestCopyState(t *testing.T) {
}

func TestDeletedRuleMarkedStale(t *testing.T) {
storage := teststorage.New(t)
defer storage.Close()
st := teststorage.New(t)
defer st.Close()
oldGroup := &Group{
rules: []Rule{
NewRecordingRule("rule1", nil, labels.Labels{{Name: "l1", Value: "v1"}}),
Expand All @@ -672,21 +672,21 @@ func TestDeletedRuleMarkedStale(t *testing.T) {
rules: []Rule{},
seriesInPreviousEval: []map[string]labels.Labels{},
opts: &ManagerOptions{
Appendable: storage,
Appendable: st,
},
}
newGroup.CopyState(oldGroup)

newGroup.Eval(context.Background(), time.Unix(0, 0))

querier, err := storage.Querier(context.Background(), 0, 2000)
querier, err := st.Querier(context.Background(), 0, 2000)
testutil.Ok(t, err)
defer querier.Close()

matcher, err := labels.NewMatcher(labels.MatchEqual, "l1", "v1")
testutil.Ok(t, err)

set, _, err := querier.Select(nil, matcher)
set, _, err := querier.Select(storage.SelectParams{}, matcher)
testutil.Ok(t, err)

samples, err := readSeriesSet(set)
Expand All @@ -704,8 +704,8 @@ func TestUpdate(t *testing.T) {
expected := map[string]labels.Labels{
"test": labels.FromStrings("name", "value"),
}
storage := teststorage.New(t)
defer storage.Close()
st := teststorage.New(t)
defer st.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
Expand All @@ -714,9 +714,9 @@ func TestUpdate(t *testing.T) {
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
TSDB: storage,
QueryFunc: EngineQueryFunc(engine, storage),
Appendable: st,
TSDB: st,
QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(),
Logger: log.NewNopLogger(),
})
Expand Down Expand Up @@ -1096,16 +1096,16 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
testutil.Equals(t, 0, countStaleNaN(t, storage), "invalid count of staleness markers after stopping the engine")
}

func countStaleNaN(t *testing.T, storage storage.Storage) int {
func countStaleNaN(t *testing.T, st storage.Storage) int {
var c int
querier, err := storage.Querier(context.Background(), 0, time.Now().Unix()*1000)
querier, err := st.Querier(context.Background(), 0, time.Now().Unix()*1000)
testutil.Ok(t, err)
defer querier.Close()

matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2")
testutil.Ok(t, err)

set, _, err := querier.Select(nil, matcher)
set, _, err := querier.Select(storage.SelectParams{}, matcher)
testutil.Ok(t, err)

samples, err := readSeriesSet(set)
Expand Down
6 changes: 3 additions & 3 deletions scrape/scrape_test.go
Expand Up @@ -1571,7 +1571,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {

q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
series, _, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")

Expand All @@ -1581,7 +1581,7 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {

q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
series, _, err = q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Ok(t, err)
testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
Expand Down Expand Up @@ -1617,7 +1617,7 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {

q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
series, _, err := q.Select(storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")
}
Expand Down
25 changes: 12 additions & 13 deletions storage/fanout.go
Expand Up @@ -221,30 +221,29 @@ func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
}

// Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
if len(q.queriers) != 1 {
// We need to sort for NewMergeSeriesSet to work.
return q.SelectSorted(params, matchers...)
func (q *mergeQuerier) Select(params SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
if len(q.queriers) == 1 {
return q.queriers[0].Select(params, matchers...)
}
return q.queriers[0].Select(params, matchers...)
}

// SelectSorted returns a set of sorted series that matches the given label matchers.
func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
seriesSets := make([]SeriesSet, 0, len(q.queriers))
var warnings Warnings

var priErr error = nil
var (
seriesSets = make([]SeriesSet, 0, len(q.queriers))
warnings Warnings
priErr error
)
type queryResult struct {
qr Querier
set SeriesSet
wrn Warnings
selectError error
}
queryResultChan := make(chan *queryResult)

// We need to sort for NewMergeSeriesSet to work.
params.SeriesSorted = true
for _, querier := range q.queriers {
go func(qr Querier) {
set, wrn, err := qr.SelectSorted(params, matchers...)
set, wrn, err := qr.Select(params, matchers...)
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
}(querier)
}
Expand Down
2 changes: 1 addition & 1 deletion storage/fanout/fanout_test.go
Expand Up @@ -79,7 +79,7 @@ func TestSelectSorted(t *testing.T) {
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "a")
testutil.Ok(t, err)

seriesSet, _, err := querier.SelectSorted(nil, matcher)
seriesSet, _, err := querier.Select(storage.SelectParams{}, matcher)
testutil.Ok(t, err)

result := make(map[int64]float64)
Expand Down
27 changes: 21 additions & 6 deletions storage/interface.go
Expand Up @@ -60,10 +60,7 @@ type Queryable interface {
// time range.
type Querier interface {
// Select returns a set of series that matches the given label matchers.
Select(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)

// SelectSorted returns a sorted set of series that matches the given label matchers.
SelectSorted(*SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)
Select(SelectParams, ...*labels.Matcher) (SeriesSet, Warnings, error)

// LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier.
Expand All @@ -76,17 +73,35 @@ type Querier interface {
Close() error
}

// SelectParams specifies parameters passed to data selections.
type SelectParams struct {
// SelectRange specifies time parameters passed to data selections.
type SelectRange struct {
Start int64 // Start time in milliseconds for this select.
End int64 // End time in milliseconds for this select.
}

// SelectParams specifies parameters passed to data selections.
type SelectParams struct {
TimeRange *SelectRange // If non-nil it means certain time to select.

Step int64 // Query step size in milliseconds.
Func string // String representation of surrounding function or aggregation.

Grouping []string // List of label names used in aggregation.
By bool // Indicate whether it is without or by.
Range int64 // Range vector selector range in milliseconds.

SeriesSorted bool // If enabled, selected series has to be sorted.
}

// IsEmpty returns true if SelectParams is considered empty.
func (s SelectParams) IsEmpty() bool {
return s.TimeRange == nil &&
s.Step == 0 &&
s.Func == "" &&
len(s.Grouping) == 0 &&
!s.By &&
s.Range == 0 &&
!s.SeriesSorted
}

// QueryableFunc is an adapter to allow the use of ordinary functions as
Expand Down

0 comments on commit 24b970a

Please sign in to comment.