Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Prometheus to latest master and reflect changes #2748

Merged
merged 2 commits into from Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -28,6 +28,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2671](https://github.com/thanos-io/thanos/pull/2671) *breaking* Tools: bucket replicate flag `--resolution` is now in Go duration format.
- [#2671](https://github.com/thanos-io/thanos/pull/2671) Tools: bucket replicate now replicates by default all blocks.
- [#2739](https://github.com/thanos-io/thanos/pull/2739) Changed `bucket tool bucket verify` `--id-whitelist` flag to `--id`.
- [#2748](https://github.com/thanos-io/thanos/pull/2748) Upgrade Prometheus to [@66dfb951c4ca](https://github.com/prometheus/prometheus/commit/66dfb951c4ca2c1dd3f266172a48a925403b13a5) which is after v2.19.0.
- PromQL now allow us to executed concurrent selects.

### Added

Expand Down
19 changes: 8 additions & 11 deletions go.mod
Expand Up @@ -25,7 +25,7 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e
github.com/golang/snappy v0.0.1
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/gophercloud/gophercloud v0.10.0
github.com/gophercloud/gophercloud v0.11.0
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
Expand All @@ -49,7 +49,7 @@ require (
github.com/prometheus/common v0.10.0
github.com/prometheus/prometheus v1.8.2-0.20200609165731-66dfb951c4ca
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/uber/jaeger-client-go v2.23.0+incompatible
github.com/uber/jaeger-client-go v2.23.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
go.elastic.co/apm v1.5.0
go.elastic.co/apm/module/apmot v1.5.0
Expand All @@ -58,13 +58,13 @@ require (
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
golang.org/x/text v0.3.2
google.golang.org/api v0.22.0
google.golang.org/genproto v0.0.0-20200420144010-e5e8543f8aeb
google.golang.org/grpc v1.29.0
google.golang.org/api v0.26.0
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736
google.golang.org/grpc v1.29.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20200504163728-5308cda29e3d
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200603094226-e3079894b1e8
)

// We want to replace the client-go version with a specific commit hash,
Expand All @@ -74,10 +74,7 @@ replace (
// Mitigation for: https://github.com/Azure/go-autorest/issues/414
github.com/Azure/go-autorest => github.com/Azure/go-autorest v12.3.0+incompatible
// Make sure Cortex is not forcing us to some other Prometheus version.
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200601152113-3268eac2ddda // Master after v2.18.0 including m-map fixes.
k8s.io/api => k8s.io/api v0.17.5
k8s.io/apimachinery => k8s.io/apimachinery v0.17.5
k8s.io/client-go => k8s.io/client-go v0.17.5
github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200609165731-66dfb951c4ca
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
)
Expand Down
131 changes: 60 additions & 71 deletions go.sum

Large diffs are not rendered by default.

14 changes: 4 additions & 10 deletions pkg/query/api/v1.go
Expand Up @@ -530,17 +530,11 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable series")

var (
warnings []error
metrics = []labels.Labels{}
sets []storage.SeriesSet
metrics = []labels.Labels{}
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
s, warns, err := q.Select(false, nil, mset...)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
warnings = append(warnings, warns...)
sets = append(sets, s)
sets = append(sets, q.Select(false, nil, mset...))
}

set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
Expand All @@ -550,7 +544,7 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
if set.Err() != nil {
return nil, nil, &ApiError{errorExec, set.Err()}
}
return metrics, warnings, nil
return metrics, set.Warnings(), nil
squat marked this conversation as resolved.
Show resolved Hide resolved
}

func Respond(w http.ResponseWriter, data interface{}, warnings []error) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/query/iter.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand All @@ -27,6 +28,8 @@ type promSeriesSet struct {

currLset []storepb.Label
currChunks []storepb.AggrChunk

warns storage.Warnings
}

func (s *promSeriesSet) Next() bool {
Expand Down Expand Up @@ -98,6 +101,10 @@ func (s *promSeriesSet) Err() error {
return s.set.Err()
}

func (s *promSeriesSet) Warnings() storage.Warnings {
return s.warns
}

// storeSeriesSet implements a storepb SeriesSet against a list of storepb.Series.
type storeSeriesSet struct {
// TODO(bwplotka): Don't buffer all, we have to buffer single series (to sort and dedup chunks), but nothing more.
Expand Down Expand Up @@ -427,6 +434,10 @@ func (s *dedupSeriesSet) Err() error {
return s.set.Err()
}

func (s *dedupSeriesSet) Warnings() storage.Warnings {
return s.set.Warnings()
}

type seriesWithLabels struct {
storage.Series
lset labels.Labels
Expand Down
13 changes: 8 additions & 5 deletions pkg/query/querier.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -160,7 +161,7 @@ func aggrsFromFunc(f string) []storepb.Aggr {
return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM}
}

func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet {
if hints == nil {
hints = &storage.SelectHints{
Start: q.mint,
Expand All @@ -181,7 +182,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match

sms, err := storepb.TranslatePromMatchers(ms...)
if err != nil {
return nil, nil, errors.Wrap(err, "convert matchers")
return storage.ErrSeriesSet(errors.Wrap(err, "convert matchers"))
}

aggrs := aggrsFromFunc(hints.Func)
Expand All @@ -196,7 +197,7 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
PartialResponseDisabled: !q.partialResponse,
SkipChunks: q.skipChunks,
}, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Series()")
return storage.ErrSeriesSet(errors.Wrap(err, "proxy Series()"))
}

var warns storage.Warnings
Expand All @@ -211,7 +212,8 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
}, warns, nil
warns: warns,
}
}

// TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible.
Expand All @@ -221,11 +223,12 @@ func (q *querier) Select(_ bool, hints *storage.SelectHints, ms ...*labels.Match
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
warns: warns,
}

// The merged series set assembles all potentially-overlapping time ranges of the same series into a single one.
// TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available.
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER), warns, nil
return newDedupSeriesSet(set, q.replicaLabels, len(aggrs) == 1 && aggrs[0] == storepb.Aggr_COUNTER)
}

// sortDedupLabels re-sorts the set so that the same series with different replica
Expand Down
42 changes: 26 additions & 16 deletions pkg/query/querier_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -512,16 +513,17 @@ func TestQuerier_Select(t *testing.T) {
t.Run(fmt.Sprintf("dedup=%v", sc.dedup), func(t *testing.T) {
t.Run("querier.Select", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()
defer func() { testutil.Ok(t, q.Close()) }()
defer testutil.Ok(t, q.Close())

res := q.Select(false, tcase.hints, tcase.matchers...)

testSelectResponse(t, sc.expected, res)

res, w, err := q.Select(false, tcase.hints, tcase.matchers...)
testutil.Ok(t, err)
if tcase.expectedWarning != "" {
w := res.Warnings()
testutil.Equals(t, 1, len(w))
testutil.Equals(t, tcase.expectedWarning, w[0].Error())
}
testSelectResponse(t, sc.expected, res)

})
// Integration test: Make sure the PromQL would select exactly the same.
t.Run("through PromQL with 100s step", func(t *testing.T) {
Expand All @@ -535,16 +537,18 @@ func TestQuerier_Select(t *testing.T) {
r := q.Exec(context.Background())
testutil.Ok(t, r.Err)

testSelectResponse(t, sc.expected, catcher.resp[0])

warns := catcher.warns()
// We don't care about anything else, all should be recorded.
testutil.Assert(t, len(catcher.warns) == 1, "expected only single warnings")
testutil.Assert(t, len(warns) == 1, "expected only single warnings")
testutil.Assert(t, len(catcher.resp) == 1, "expected only single response, subqueries?")

w := catcher.warns[0]
w := warns[0]
if tcase.expectedWarning != "" {
testutil.Equals(t, 1, len(w))
testutil.Equals(t, tcase.expectedWarning, w[0].Error())
}
testSelectResponse(t, sc.expected, catcher.resp[0])
})
})
}
Expand Down Expand Up @@ -590,21 +594,25 @@ type querierResponseCatcher struct {
storage.Querier
t testing.TB

resp []storage.SeriesSet
warns []storage.Warnings
resp []storage.SeriesSet
}

func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
s, w, err := q.Querier.Select(selectSorted, p, m...)
testutil.Ok(q.t, err)

func (q *querierResponseCatcher) Select(selectSorted bool, p *storage.SelectHints, m ...*labels.Matcher) storage.SeriesSet {
s := q.Querier.Select(selectSorted, p, m...)
q.resp = append(q.resp, s)
q.warns = append(q.warns, w)
return storage.NoopSeriesSet(), storage.Warnings{errors.New("response caught")}, nil
return storage.NoopSeriesSet()
}

func (q querierResponseCatcher) Close() error { return nil }

func (q *querierResponseCatcher) warns() []storage.Warnings {
var warns []storage.Warnings
for _, r := range q.resp {
warns = append(warns, r.Warnings())
}
return warns
}

// TODO(bwplotka): Reuse SeriesSets from chunk iterators from Prometheus.
type mockedSeriesSet struct {
series []series
Expand All @@ -621,6 +629,8 @@ func (s *mockedSeriesSet) At() storage.Series {
}
func (s *mockedSeriesSet) Err() error { return nil }

func (s *mockedSeriesSet) Warnings() storage.Warnings { return nil }

type mockedSeriesIterator struct {
cur int
samples []sample
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/tsdb.go
Expand Up @@ -107,13 +107,10 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
}
defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier series")

set, _, err := q.Select(false, nil, matchers...)
if err != nil {
return status.Error(codes.Internal, err.Error())
}

var respSeries storepb.Series

var (
set = q.Select(false, nil, matchers...)
respSeries storepb.Series
)
for set.Next() {
series := set.At()

Expand All @@ -139,6 +136,9 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Error(codes.Aborted, err.Error())
}
}
if err := set.Err(); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}

Expand Down