Skip to content

Commit

Permalink
sidecar: Added support for streaming, chunked remote read.
Browse files Browse the repository at this point in the history
Fixes: #488

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jul 3, 2019
1 parent 38a9da0 commit 7bd205f
Show file tree
Hide file tree
Showing 23 changed files with 763 additions and 2,721 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ test-deps:
$(foreach ver,$(PROM_VERSIONS),$(call fetch_go_bin_version,github.com/prometheus/prometheus/cmd/prometheus,$(ver)))
$(call fetch_go_bin_version,github.com/prometheus/alertmanager/cmd/alertmanager,$(ALERTMANAGER_VERSION))
$(call fetch_go_bin_version,github.com/minio/minio,$(MINIO_SERVER_VERSION))
$(call fetch_go_bin_version,github.com/bplotka/prometheus/cmd/prometheus,v2.10.0-rr1))

# vet vets the code.
.PHONY: vet
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ func runQuery(
"web.prefix-header": webPrefixHeaderName,
}

ui.NewQueryUI(logger, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix))
ui.NewQueryUI(logger, reg, stores, flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, reg, engine, queryableCreator, enableAutodownsampling, enablePartialResponse)

Expand Down
4 changes: 2 additions & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,9 @@ func runRule(
"web.prefix-header": webPrefixHeaderName,
}

ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))
ui.NewRuleUI(logger, reg, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, ruleMgrs)
api := v1.NewAPI(logger, reg, ruleMgrs)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)

mux := http.NewServeMux()
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
module github.com/improbable-eng/thanos

// v2.11.0-rc.0-rr-streaming
replace github.com/prometheus/prometheus => github.com/bwplotka/prometheus v0.0.0-20190703201424-23210975d267

require (
cloud.google.com/go v0.34.0
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c
github.com/NYTimes/gziphandler v1.1.1
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/cespare/xxhash v1.1.0
github.com/fatih/structtag v1.0.0
Expand Down Expand Up @@ -32,15 +34,14 @@ require (
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
github.com/prometheus/common v0.4.0
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/prometheus/prometheus v2.9.2+incompatible
github.com/prometheus/tsdb v0.8.0
github.com/prometheus/tsdb v0.9.1
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
go.uber.org/atomic v1.4.0 // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
Expand Down
167 changes: 117 additions & 50 deletions go.sum

Large diffs are not rendered by default.

19 changes: 13 additions & 6 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
"sync"
"time"

"github.com/NYTimes/gziphandler"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/query"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/tracing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -105,7 +105,9 @@ type API struct {
rangeQueryDuration prometheus.Histogram
enableAutodownsampling bool
enablePartialResponse bool
now func() time.Time
reg prometheus.Registerer

now func() time.Time
}

// NewAPI returns an initialized API type.
Expand Down Expand Up @@ -144,6 +146,7 @@ func NewAPI(
rangeQueryDuration: rangeQueryDuration,
enableAutodownsampling: enableAutodownsampling,
enablePartialResponse: enablePartialResponse,
reg: reg,

now: time.Now,
}
Expand All @@ -162,7 +165,11 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
w.WriteHeader(http.StatusNoContent)
}
})
return prometheus.InstrumentHandler(name, tracing.HTTPMiddleware(tracer, name, logger, gziphandler.GzipHandler(hf)))

return promhttp.InstrumentMetricHandler(
prometheus.WrapRegistererWith(prometheus.Labels{"path": name}, api.reg),
http.HandlerFunc(hf),
).ServeHTTP
}

r.Options("/*path", instr("options", api.options))
Expand Down Expand Up @@ -451,7 +458,7 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {

// TODO(fabxc): add back request context.

vals, err := q.LabelValues(name)
vals, _, err := q.LabelValues(name)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -642,7 +649,7 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames")

names, err := q.LabelNames()
names, _, err := q.LabelNames()
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,37 +246,37 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelValues()")
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Values, nil
return resp.Values, nil, nil
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *querier) LabelNames() ([]string, error) {
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Names, nil
return resp.Names, nil, nil
}

func (q *querier) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"
conntrack "github.com/mwitkow/go-conntrack"
"github.com/opentracing-contrib/go-stdlib/nethttp"
opentracing "github.com/opentracing/opentracing-go"
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sort"
"sync"

"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"

"github.com/cespare/xxhash"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package receive
import (
"testing"

"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"
)

func TestHash(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package receive

import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/prometheus/prometheus/prompb"
"github.com/pkg/errors"

"github.com/prometheus/prometheus/pkg/labels"
Expand Down
12 changes: 9 additions & 3 deletions pkg/rule/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"net/http"
"time"

"github.com/NYTimes/gziphandler"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/go-kit/kit/log"
qapi "github.com/improbable-eng/thanos/pkg/query/api"
thanosrule "github.com/improbable-eng/thanos/pkg/rule"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
Expand All @@ -22,16 +22,19 @@ type API struct {
logger log.Logger
now func() time.Time
ruleRetriever RulesRetriever
reg prometheus.Registerer
}

func NewAPI(
logger log.Logger,
reg prometheus.Registerer,
ruleRetriever RulesRetriever,
) *API {
return &API{
logger: logger,
now: time.Now,
ruleRetriever: ruleRetriever,
reg: reg,
}
}

Expand All @@ -47,7 +50,10 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
w.WriteHeader(http.StatusNoContent)
}
})
return prometheus.InstrumentHandler(name, tracing.HTTPMiddleware(tracer, name, logger, gziphandler.GzipHandler(hf)))
return promhttp.InstrumentMetricHandler(
prometheus.WrapRegistererWith(prometheus.Labels{"path": name}, api.reg),
http.HandlerFunc(hf),
).ServeHTTP
}

r.Get("/alerts", instr("alerts", api.alerts))
Expand Down
5 changes: 5 additions & 0 deletions pkg/rule/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/go-kit/kit/log"
qapi "github.com/improbable-eng/thanos/pkg/query/api"
thanosrule "github.com/improbable-eng/thanos/pkg/rule"
Expand Down Expand Up @@ -79,6 +81,7 @@ func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule {
time.Second,
labels.Labels{},
labels.Labels{},
labels.Labels{},
true,
log.NewNopLogger(),
)
Expand All @@ -88,6 +91,7 @@ func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule {
time.Second,
labels.Labels{},
labels.Labels{},
labels.Labels{},
true,
log.NewNopLogger(),
)
Expand Down Expand Up @@ -125,6 +129,7 @@ func TestEndpoints(t *testing.T) {
algr.RuleGroups()
api := NewAPI(
nil,
prometheus.DefaultRegisterer,
algr,
)
testEndpoints(t, api)
Expand Down
3 changes: 2 additions & 1 deletion pkg/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []st
errs = append(errs, errors.Errorf("no updater found for %v", s))
continue
}
if err := updater.Update(evalInterval, fs); err != nil {
// TODO(bwplotka): Investigate if we should put ext labels here or not.
if err := updater.Update(evalInterval, fs, nil); err != nil {
errs = append(errs, err)
continue
}
Expand Down

0 comments on commit 7bd205f

Please sign in to comment.