Skip to content

Commit

Permalink
Upgrade Promethues and reflect changes
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed May 26, 2020
1 parent 895d37c commit 5242e45
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 51 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e
github.com/golang/snappy v0.0.1
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/gophercloud/gophercloud v0.10.0
github.com/gophercloud/gophercloud v0.11.0
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
Expand Down Expand Up @@ -53,6 +53,7 @@ require (
go.elastic.co/apm/module/apmot v1.5.0
go.uber.org/automaxprocs v1.2.0
golang.org/x/crypto v0.0.0-20200422194213-44a606286825
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/text v0.3.2
Expand All @@ -72,7 +73,7 @@ replace (
// Mitigation for: https://github.com/Azure/go-autorest/issues/414
github.com/Azure/go-autorest => github.com/Azure/go-autorest v12.3.0+incompatible
// Make sure Cortex is not forcing us to some other Prometheus version.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200522113006-f4dd45609a05 // @f4dd45609a05e8f582cdcd8ef369004d1f9e3c02 (current master after v2.18.0).
github.com/prometheus/prometheus => github.com/kakkoyun/prometheus v1.8.2-0.20200526163230-72e6e58792fc
k8s.io/api => k8s.io/api v0.17.5
k8s.io/apimachinery => k8s.io/apimachinery v0.17.5
k8s.io/client-go => k8s.io/client-go v0.17.5
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ github.com/gophercloud/gophercloud v0.6.0 h1:Xb2lcqZtml1XjgYZxbeayEemq7ASbeTp09m
github.com/gophercloud/gophercloud v0.6.0/go.mod h1:GICNByuaEBibcjmjvI7QvYJSZEbGkcYwAR7EZK2WMqM=
github.com/gophercloud/gophercloud v0.10.0 h1:Et+UGxoD72pK6K+46uOwyVxbtXJ6KBkWAegXBmqlf6c=
github.com/gophercloud/gophercloud v0.10.0/go.mod h1:gmC5oQqMDOMO1t1gq5DquX/yAU808e/4mzjjDA76+Ss=
github.com/gophercloud/gophercloud v0.11.0 h1:pYMP9UZBdQa3lsfIZ1tZor4EbtxiuB6BHhocenkiH/E=
github.com/gophercloud/gophercloud v0.11.0/go.mod h1:gmC5oQqMDOMO1t1gq5DquX/yAU808e/4mzjjDA76+Ss=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20191106031601-ce3c9ade29de h1:F7WD09S8QB4LrkEpka0dFPLSotH11HRpCsLIbIcJ7sU=
github.com/gopherjs/gopherjs v0.0.0-20191106031601-ce3c9ade29de/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
Expand Down Expand Up @@ -615,6 +617,8 @@ github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4d
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0=
github.com/kakkoyun/prometheus v1.8.2-0.20200526163230-72e6e58792fc h1:Jhd0fKFm4M+bRAm0ToCQ9owrTRlYAUBTENaPv9zMFS0=
github.com/kakkoyun/prometheus v1.8.2-0.20200526163230-72e6e58792fc/go.mod h1:QKHYbx8sTY1fj75M+lL+LhzDSFM00+dOBlFn5wBi+14=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0/go.mod h1:1NbS8ALrpOvjt0rHPNLyCIeMtbizbir8U//inJ+zuB8=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
Expand Down Expand Up @@ -861,8 +865,6 @@ github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLk
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI=
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/prometheus v1.8.2-0.20200522113006-f4dd45609a05 h1:Tzn/Jcv/ORgKB59kGKNb95ThgrwuPBTgbdp09V+wM/Q=
github.com/prometheus/prometheus v1.8.2-0.20200522113006-f4dd45609a05/go.mod h1:QKHYbx8sTY1fj75M+lL+LhzDSFM00+dOBlFn5wBi+14=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
Expand Down Expand Up @@ -1120,6 +1122,8 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200421231249-e086a090c8fd h1:QPwSajcTUrFriMF1nJ3XzgoqakqQEsnZf9LdXdi2nkI=
golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 h1:eDrdRpKgkcCqKZQwyZRyeFZgfqt37SL7Kv3tok06cKE=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
14 changes: 4 additions & 10 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,17 +527,11 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable series")

var (
warnings []error
metrics = []labels.Labels{}
sets []storage.SeriesSet
metrics = []labels.Labels{}
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
s, warns, err := q.Select(false, nil, mset...)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
warnings = append(warnings, warns...)
sets = append(sets, s)
sets = append(sets, q.Select(false, nil, mset...))
}

set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
Expand All @@ -547,7 +541,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
if set.Err() != nil {
return nil, nil, &ApiError{errorExec, set.Err()}
}
return metrics, warnings, nil
return metrics, set.Warnings(), nil
}

func Respond(w http.ResponseWriter, data interface{}, warnings []error) {
Expand Down
35 changes: 18 additions & 17 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (s *promSeriesSet) Err() error {
return s.set.Err()
}

func (s *promSeriesSet) Warnings() storage.Warnings {
// TODO(kakkoyun): Implement me!
return nil
}

// storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series.
type storeSeriesSet struct {
// TODO(bwplotka): Don't buffer all, we have to buffer single series (to sort and dedup chunks), but nothing more.
Expand Down Expand Up @@ -427,6 +432,11 @@ func (s *dedupSeriesSet) Err() error {
return s.set.Err()
}

func (s *dedupSeriesSet) Warnings() storage.Warnings {
// TODO(kakkoyun): Implement me!
return nil
}

type seriesWithLabels struct {
storage.Series
lset labels.Labels
Expand Down Expand Up @@ -660,14 +670,6 @@ func (it *dedupSeriesIterator) Err() error {
return it.b.Err()
}

type errSeriesSet struct{ err error }

func (errSeriesSet) Next() bool { return false }

func (errSeriesSet) At() storage.Series { return nil }

func (e errSeriesSet) Err() error { return e.err }

var errPrematurelyClosedPromise = errors.New("promise channel closed before result received")

type asyncSeriesSet struct {
Expand All @@ -682,14 +684,14 @@ func newAsyncSeriesSet(ctx context.Context, gate *gate.Gate, f func() (storage.S
defer close(promise)

if err := gate.IsMyTurn(ctx); err != nil {
promise <- errSeriesSet{errors.Wrap(err, "failed to wait for turn")}
promise <- storage.ErrSeriesSet(errors.Wrap(err, "failed to wait for turn"))
}
defer gate.Done()

set, _, err := f()
// TODO(kakkoyun): Handle warnings after Prometheus changes.
if err != nil {
promise <- errSeriesSet{err}
promise <- storage.ErrSeriesSet(err)
}
promise <- set
}()
Expand Down Expand Up @@ -730,10 +732,9 @@ func (s *asyncSeriesSet) Err() error {
return s.result.Err()
}

// TODO(kakkoyun): Uncomment after Prometheus changes.
// func (s *asyncSeriesSet) Warnings() storage.Warnings {
// if s.result != nil {
// return s.result.Warnings()
// }
// return nil
// }
func (s *asyncSeriesSet) Warnings() storage.Warnings {
if s.result != nil {
return s.result.Warnings()
}
return nil
}
12 changes: 11 additions & 1 deletion pkg/query/iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

type errSeriesSet struct {
ws storage.Warnings
err error
}

func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() storage.Series { return nil }
func (s errSeriesSet) Err() error { return s.err }
func (s errSeriesSet) Warnings() storage.Warnings { return s.ws }

func TestAsyncSeriesSet_Next(t *testing.T) {
type fields struct {
ctx context.Context
Expand Down Expand Up @@ -82,7 +92,7 @@ func TestAsyncSeriesSet_Err(t *testing.T) {

errRemote := errors.New("remote error")
channel := make(chan storage.SeriesSet, 1)
channel <- errSeriesSet{errRemote}
channel <- errSeriesSet{err: errRemote}
close(channel)

tests := []struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Start: q.mint,
Expand Down Expand Up @@ -238,7 +238,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), warns, nil
}), nil, nil
})
}

// sortDedupLabels re-sorts the set so that the same series with different replica
Expand Down
22 changes: 14 additions & 8 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,13 @@ func TestQuerier_Select(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()
defer func() { testutil.Ok(t, q.Close()) }()

res, w, err := q.Select(false, tcase.hints, tcase.matchers...)
testutil.Ok(t, err)
res := q.Select(false, tcase.hints, tcase.matchers...)
for res.Next() {
}

testutil.Ok(t, res.Err())
if tcase.expectedWarning != "" {
w := res.Warnings()
testutil.Equals(t, 1, len(w))
testutil.Equals(t, tcase.expectedWarning, w[0].Error())
}
Expand Down Expand Up @@ -594,13 +598,13 @@ type querierResponseCatcher struct {
warns []storage.Warnings
}

func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
s, w, err := q.Querier.Select(selectSorted, p, m...)
testutil.Ok(q.t, err)

func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) storage.SeriesSet {
s := q.Querier.Select(selectSorted, p, m...)
q.resp = append(q.resp, s)
q.warns = append(q.warns, w)
return storage.NoopSeriesSet(), storage.Warnings{errors.New("response caught")}, nil
for s.Next() {
}
q.warns = append(q.warns, s.Warnings())
return storage.NoopSeriesSet()
}

func (q querierResponseCatcher) Close() error { return nil }
Expand All @@ -621,6 +625,8 @@ func (s *mockedSeriesSet) At() storage.Series {
}
func (s *mockedSeriesSet) Err() error { return nil }

func (s *mockedSeriesSet) Warnings() storage.Warnings { return nil }

type mockedSeriesIterator struct {
cur int
samples []sample
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func TestFlushableStorage(t *testing.T) {
defer func() { testutil.Ok(t, querier.Close()) }()

// Sum the values.
seriesSet, _, err := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "thanos", Value: "flush"})
testutil.Ok(t, err)
seriesSet := querier.Select(false, nil, &labels.Matcher{Type: labels.MatchEqual, Name: "thanos", Value: "flush"})
sum := 0.0
for seriesSet.Next() {
series := seriesSet.At().Iterator()
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,10 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
}
defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier series")

set, _, err := q.Select(false, nil, matchers...)
if err != nil {
return status.Error(codes.Internal, err.Error())
}

var respSeries storepb.Series

var (
set = q.Select(false, nil, matchers...)
respSeries storepb.Series
)
for set.Next() {
series := set.At()

Expand All @@ -139,6 +136,9 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Error(codes.Aborted, err.Error())
}
}
if err := set.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}

Expand Down

0 comments on commit 5242e45

Please sign in to comment.