Skip to content

Commit

Permalink
Implement async select for Querier
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed Jun 12, 2020
1 parent 48ee1f8 commit 3439ad6
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 59 deletions.
7 changes: 6 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
maxConcurrentQueries := cmd.Flag("query.max-concurrent", "Maximum number of queries processed concurrently by query node.").
Default("20").Int()

maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()

queryReplicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter. Data includes time series, recording rules, and alerting rules.").
Strings()

Expand Down Expand Up @@ -159,6 +162,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*webExternalPrefix,
*webPrefixHeaderName,
*maxConcurrentQueries,
*maxConcurrentSelects,
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*queryReplicaLabels,
Expand Down Expand Up @@ -202,6 +206,7 @@ func runQuery(
webExternalPrefix string,
webPrefixHeaderName string,
maxConcurrentQueries int,
maxConcurrentSelects int,
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
queryReplicaLabels []string,
Expand Down Expand Up @@ -280,7 +285,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy, maxConcurrentSelects, queryTimeout)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down
3 changes: 3 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ Flags:
--query.timeout=2m Maximum time to process query by query node.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
--query.replica-label=QUERY.REPLICA-LABEL ...
Labels to treat as a replica indicator along
which data is deduplicated. Still you will be
Expand Down
5 changes: 3 additions & 2 deletions pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ func TestEndpoints(t *testing.T) {
testutil.Ok(t, app.Commit())

now := time.Now()
timeout := 100 * time.Second
api := &API{
queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil), 2, timeout),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10000,
Timeout: 100 * time.Second,
Timeout: timeout,
}),
now: func() time.Time { return now },
gate: gate.NewGate(4, nil),
Expand Down
37 changes: 37 additions & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,3 +670,40 @@ func (it *dedupSeriesIterator) Err() error {
}
return it.b.Err()
}

type lazySeriesSet struct {
create func() (s storage.SeriesSet, ok bool)

set storage.SeriesSet
}

func (c *lazySeriesSet) Next() bool {
if c.set != nil {
return c.set.Next()
}

var ok bool
c.set, ok = c.create()
return ok
}

func (c *lazySeriesSet) Err() error {
if c.set != nil {
return c.set.Err()
}
return nil
}

func (c *lazySeriesSet) At() storage.Series {
if c.set != nil {
return c.set.At()
}
return nil
}

func (c *lazySeriesSet) Warnings() storage.Warnings {
if c.set != nil {
return c.set.Warnings()
}
return nil
}
118 changes: 91 additions & 27 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ import (
"context"
"sort"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand All @@ -27,38 +31,45 @@ import (
type QueryableCreator func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer) QueryableCreator {
func NewQueryableCreator(logger log.Logger, reg prometheus.Registerer, proxy storepb.StoreServer, maxConcurrentSelects int, selectTimeout time.Duration) QueryableCreator {
return func(deduplicate bool, replicaLabels []string, maxResolutionMillis int64, partialResponse, skipChunks bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
logger: logger,
reg: reg,
replicaLabels: replicaLabels,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
skipChunks: skipChunks,
maxConcurrentSelects: maxConcurrentSelects,
selectTimeout: selectTimeout,
}
}
}

type queryable struct {
logger log.Logger
replicaLabels []string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
logger log.Logger
reg prometheus.Registerer
replicaLabels []string
proxy storepb.StoreServer
deduplicate bool
maxResolutionMillis int64
partialResponse bool
skipChunks bool
maxConcurrentSelects int
selectTimeout time.Duration
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks), nil
return newQuerier(ctx, q.logger, q.reg, mint, maxt, q.replicaLabels, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.maxConcurrentSelects, q.selectTimeout), nil
}

type querier struct {
ctx context.Context
logger log.Logger
reg prometheus.Registerer
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
Expand All @@ -67,20 +78,24 @@ type querier struct {
maxResolutionMillis int64
partialResponse bool
skipChunks bool
gate *gate.Gate
selectTimeout time.Duration
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
// store API endpoints.
func newQuerier(
ctx context.Context,
logger log.Logger,
reg prometheus.Registerer,
mint, maxt int64,
replicaLabels []string,
proxy storepb.StoreServer,
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
skipChunks bool,
partialResponse, skipChunks bool,
maxConcurrentSelects int,
selectTimeout time.Duration,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -92,9 +107,16 @@ func newQuerier(
rl[replicaLabel] = struct{}{}
}
return &querier{
ctx: ctx,
logger: logger,
cancel: cancel,
ctx: ctx,
logger: logger,
reg: reg,
cancel: cancel,
gate: gate.NewGate(
maxConcurrentSelects,
extprom.WrapRegistererWithPrefix("thanos_concurrent_select", reg),
),
selectTimeout: selectTimeout,

mint: mint,
maxt: maxt,
replicaLabels: rl,
Expand Down Expand Up @@ -173,16 +195,58 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
for i, m := range ms {
matchers[i] = m.String()
}
span, ctx := tracing.StartSpan(q.ctx, "querier_select", opentracing.Tags{

// querier has a context but it gets cancelled, as soon as query evaluation is completed, by the engine.
ctx, cancel := context.WithTimeout(context.Background(), q.selectTimeout)
span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{
"minTime": hints.Start,
"maxTime": hints.End,
"matchers": "{" + strings.Join(matchers, ",") + "}",
})
defer span.Finish()

promise := make(chan storage.SeriesSet, 1)
go func() {
defer close(promise)

var err error
tracing.DoInSpan(ctx, "querier_select_ismyturn", func(ctx context.Context) {
err = q.gate.IsMyTurn(ctx)
})
if err != nil {
promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn"))
return
}
defer q.gate.Done()

span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn")
defer span.Finish()

set, err := q.selectFn(ctx, hints, ms...)
if err != nil {
promise <- storage.ErrSeriesSet(err)
return
}

promise <- set
}()

return &lazySeriesSet{create: func() (storage.SeriesSet, bool) {
defer cancel()
defer span.Finish()

// Only gets called once, for the first Next() call of the series set.
set, ok := <-promise
if !ok {
return storage.ErrSeriesSet(errors.New("channel closed before a value received")), false
}
return set, set.Next()
}}
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, error) {
sms, err := storepb.TranslatePromMatchers(ms...)
if err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "convert matchers"))
return nil, errors.Wrap(err, "convert matchers")
}

aggrs := aggrsFromFunc(hints.Func)
Expand All @@ -197,7 +261,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
return storage.ErrSeriesSet(errors.Wrap(err, "proxy Series()"))
return nil, errors.Wrap(err, "proxy Series()")
}

var warns storage.Warnings
Expand All @@ -213,7 +277,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
warns: warns,
}
}, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible.
Expand All @@ -228,7 +292,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match

// The merged series set assembles all potentially-overlapping time ranges of the same series into a single one.
// TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available.
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER)
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
Expand Down

0 comments on commit 3439ad6

Please sign in to comment.