Skip to content

Commit

Permalink
Implement async select for Querier
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed May 25, 2020
1 parent 5821155 commit ae8a8b5
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 61 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, reg, stores.Get, component.Query, selectorLset, storeResponseTimeout)
rulesProxy = rules.NewProxy(logger, stores.GetRulesClients)
queryableCreator = query.NewQueryableCreator(logger, proxy)
queryableCreator = query.NewQueryableCreator(logger, reg, proxy)
engine = promql.NewEngine(
promql.EngineOpts{
Logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestEndpoints(t *testing.T) {

now := time.Now()
api := &API{
queryableCreate: query.NewQueryableCreator(nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, nil, db, component.Query, nil)),
queryEngine: promql.NewEngine(promql.EngineOpts{
Logger: nil,
Reg: nil,
Expand Down
80 changes: 80 additions & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package query

import (
"context"
"math"
"sort"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store/storepb"
)

Expand Down Expand Up @@ -657,3 +659,81 @@ func (it *dedupSeriesIterator) Err() error {
}
return it.b.Err()
}

type errSeriesSet struct{ err error }

func (errSeriesSet) Next() bool { return false }

func (errSeriesSet) At() storage.Series { return nil }

func (e errSeriesSet) Err() error { return e.err }

var errPrematurelyClosedPromise = errors.New("promise channel closed before result received")

type asyncSeriesSet struct {
ctx context.Context
promise chan storage.SeriesSet
result storage.SeriesSet
}

func newAsyncSeriesSet(ctx context.Context, gate *gate.Gate, f func() (storage.SeriesSet, storage.Warnings, error)) storage.SeriesSet {
promise := make(chan storage.SeriesSet, 1)
go func() {
defer close(promise)

if err := gate.IsMyTurn(ctx); err != nil {
promise <- errSeriesSet{errors.Wrap(err, "failed to wait for turn")}
}
defer gate.Done()

set, _, err := f()
// TODO(kakkoyun): Handle warnings after Prometheus changes.
if err != nil {
promise <- errSeriesSet{err}
}
promise <- set
}()

return &asyncSeriesSet{ctx: ctx, promise: promise}
}

func (s *asyncSeriesSet) Next() bool {
if s.result == nil {
select {
case <-s.ctx.Done():
return false
case res, ok := <-s.promise:
if !ok {
return false
}
s.result = res
return res.Next()
}
}

return s.result.Next()
}

func (s *asyncSeriesSet) At() storage.Series {
return s.result.At()
}

func (s *asyncSeriesSet) Err() error {
if err := s.ctx.Err(); err != nil {
return err
}

if s.result == nil {
return errPrematurelyClosedPromise
}

return s.result.Err()
}

// TODO(kakkoyun): Uncomment after Prometheus changes.
// func (s *asyncSeriesSet) Warnings() storage.Warnings {
// if s.result != nil {
// return s.result.Warnings()
// }
// return nil
// }
131 changes: 131 additions & 0 deletions pkg/query/iter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package query

import (
"context"
"errors"
"testing"

"github.com/prometheus/prometheus/storage"
)

func TestAsyncSeriesSet_Next(t *testing.T) {
type fields struct {
ctx context.Context
promise chan storage.SeriesSet
}

cancelledContext, cancel := context.WithCancel(context.Background())
cancel()

closedChannel := make(chan storage.SeriesSet)
close(closedChannel)

channel := make(chan storage.SeriesSet, 1)
channel <- storage.EmptySeriesSet()
close(channel)

tests := []struct {
name string
fields fields
want bool
}{
{
name: "returns false when context cancelled",
fields: fields{
ctx: cancelledContext,
},
want: false,
},
{
name: "returns false when channel closed",
fields: fields{
ctx: context.TODO(),
promise: closedChannel,
},
want: false,
},
{
name: "proxies call to underlying set",
fields: fields{
ctx: context.TODO(),
promise: channel,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &asyncSeriesSet{
ctx: tt.fields.ctx,
promise: tt.fields.promise,
}
if got := s.Next(); got != tt.want {
t.Errorf("asyncSeriesSet.Next() = %v, want %v", got, tt.want)
}
})
}
}

func TestAsyncSeriesSet_Err(t *testing.T) {
type fields struct {
ctx context.Context
promise chan storage.SeriesSet
}

cancelledContext, cancel := context.WithCancel(context.Background())
cancel()

closedChannel := make(chan storage.SeriesSet)
close(closedChannel)

errRemote := errors.New("remote error")
channel := make(chan storage.SeriesSet, 1)
channel <- errSeriesSet{errRemote}
close(channel)

tests := []struct {
name string
fields fields
want error
}{
{
name: "returns error from context when context cancelled",
fields: fields{
ctx: cancelledContext,
},
want: context.Canceled,
},
{
name: "returns sentinel error when channel closed",
fields: fields{
ctx: context.TODO(),
promise: closedChannel,
},
want: errPrematurelyClosedPromise,
},
{
name: "proxies call to underlying set",
fields: fields{
ctx: context.TODO(),
promise: channel,
},
want: errRemote,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &asyncSeriesSet{
ctx: tt.fields.ctx,
promise: tt.fields.promise,
}
for s.Next() {
}
if err := s.Err(); !errors.Is(err, tt.want) {
t.Errorf("asyncSeriesSet.Err() error = %v, want %v", err, tt.want)
}
})
}
}

0 comments on commit ae8a8b5

Please sign in to comment.