Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose data from rule component #92

Merged
merged 6 commits into from
Nov 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 71 additions & 9 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,40 @@ import (
"os/signal"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/oklog/run"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)

// registerRule registers a rule command.
func registerRule(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

labelStrs := cmd.Flag("label", "labels applying to all generated metrics (repeated)").
PlaceHolder("<name>=\"<value>\"").Strings()

dataDir := cmd.Flag("data-dir", "data directory").Default("data/").String()

ruleDir := cmd.Flag("rule-dir", "directory containing rule files").
Default("rules/").String()

Expand All @@ -47,8 +58,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
evalInterval := cmd.Flag("eval-interval", "the default evaluation interval to use").
Default("30s").Duration()

dataDir := cmd.Flag("data-dir", "data directory").Default("data/").String()

peers := cmd.Flag("cluster.peers", "initial peers to join the cluster. It can be either <ip:port>, or <domain:port>").Strings()

clusterBindAddr := cmd.Flag("cluster.address", "listen address for cluster").
Expand All @@ -64,13 +73,20 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
*clusterBindAddr,
*clusterAdvertiseAddr,
*peers,
cluster.PeerState{Type: cluster.PeerTypeSource},
cluster.PeerState{
Type: cluster.PeerTypeSource,
APIAddr: *grpcAddr,
},
true,
)
if err != nil {
return errors.Wrap(err, "join cluster")
}
return runRule(g, logger, reg, *httpAddr, *grpcAddr, *evalInterval, *dataDir, *ruleDir, peer)
lset, err := parseFlagLabels(*labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
}
return runRule(g, logger, reg, lset, *httpAddr, *grpcAddr, *evalInterval, *dataDir, *ruleDir, peer)
}
}

Expand All @@ -80,6 +96,7 @@ func runRule(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
lset labels.Labels,
httpAddr string,
grpcAddr string,
evalInterval time.Duration,
Expand Down Expand Up @@ -200,6 +217,35 @@ func runRule(
close(cancel)
})
}
{
l, err := net.Listen("tcp", grpcAddr)
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")

store := store.NewTSDBStore(logger, reg, db, lset)

met := grpc_prometheus.NewServerMetrics()
met.EnableHandlingTimeHistogram(
grpc_prometheus.WithHistogramBuckets([]float64{
0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we ensure that all of these are the same across all thanos Grpc clients/servers i.e fetch it from common var

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want that though? Right now they are the same but they are also just guesses at this point.
We might find out different components have different performance characteristics and different buckets could make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

}),
)
s := grpc.NewServer(
grpc.UnaryInterceptor(met.UnaryServerInterceptor()),
grpc.StreamInterceptor(met.StreamServerInterceptor()),
)
storepb.RegisterStoreServer(s, store)
reg.MustRegister(met)

g.Add(func() error {
return errors.Wrap(s.Serve(l), "serve gRPC")
}, func(error) {
s.Stop()
l.Close()
})
}
// Start the HTTP server for debugging and metrics.
{
router := route.New()
Expand Down Expand Up @@ -260,10 +306,10 @@ func queryPrometheusInstant(ctx context.Context, addr, query string, t time.Time
vec := make(promql.Vector, 0, len(m.Data.Result))

for _, e := range m.Data.Result {
lset := make(labels.Labels, 0, len(e.Metric))
lset := make(promlabels.Labels, 0, len(e.Metric))

for k, v := range e.Metric {
lset = append(lset, labels.Label{
lset = append(lset, promlabels.Label{
Name: string(k),
Value: string(v),
})
Expand All @@ -285,3 +331,19 @@ func printAlertNotifications(ctx context.Context, expr string, alerts ...*rules.
}
return nil
}

func parseFlagLabels(s []string) (labels.Labels, error) {
var lset labels.Labels
for _, l := range s {
parts := strings.SplitN(l, "=", 2)
if len(parts) != 2 {
return nil, errors.Errorf("unrecognized label %q", l)
}
val, err := strconv.Unquote(parts[1])
if err != nil {
return nil, errors.Wrap(err, "unquote label value")
}
lset = append(lset, labels.Label{Name: parts[0], Value: val})
}
return lset, nil
}
8 changes: 4 additions & 4 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func runSidecar(
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "proxy")
logger := log.With(logger, "component", "store")

var client http.Client

proxy, err := store.NewPrometheusProxy(
promStore, err := store.NewPrometheusStore(
logger, prometheus.DefaultRegisterer, &client, promURL, externalLabels.Get)
if err != nil {
return errors.Wrap(err, "create Prometheus proxy")
return errors.Wrap(err, "create Prometheus store")
}

met := grpc_prometheus.NewServerMetrics()
Expand All @@ -155,7 +155,7 @@ func runSidecar(
grpc.UnaryInterceptor(met.UnaryServerInterceptor()),
grpc.StreamInterceptor(met.StreamServerInterceptor()),
)
storepb.RegisterStoreServer(s, proxy)
storepb.RegisterStoreServer(s, promStore)
reg.MustRegister(met)

g.Add(func() error {
Expand Down
41 changes: 17 additions & 24 deletions pkg/store/promproxy.go → pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,32 @@ import (
"github.com/prometheus/tsdb/chunks"
)

// PrometheusProxy implements the store node API on top of the Prometheus
// HTTP v1 API.
type PrometheusProxy struct {
// PrometheusStore implements the store node API on top of the Prometheus remote read API.
type PrometheusStore struct {
logger log.Logger
base *url.URL
client *http.Client
buffers sync.Pool
externalLabels func() labels.Labels
}

var _ storepb.StoreServer = (*PrometheusProxy)(nil)

// NewPrometheusProxy returns a new PrometheusProxy that uses the given HTTP client
// NewPrometheusStore returns a new PrometheusStore that uses the given HTTP client
// to talk to Prometheus.
// It attaches the provided external labels to all results.
func NewPrometheusProxy(
func NewPrometheusStore(
logger log.Logger,
reg prometheus.Registerer,
client *http.Client,
baseURL *url.URL,
externalLabels func() labels.Labels,
) (*PrometheusProxy, error) {
) (*PrometheusStore, error) {
if client == nil {
client = http.DefaultClient
}
if logger == nil {
logger = log.NewNopLogger()
}
p := &PrometheusProxy{
p := &PrometheusStore{
logger: logger,
base: baseURL,
client: client,
Expand All @@ -63,7 +60,7 @@ func NewPrometheusProxy(
}

// Info returns store information about the Prometheus instance.
func (p *PrometheusProxy) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
func (p *PrometheusStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
lset := p.externalLabels()

res := &storepb.InfoResponse{
Expand All @@ -78,24 +75,20 @@ func (p *PrometheusProxy) Info(ctx context.Context, r *storepb.InfoRequest) (*st
return res, nil
}

func (p *PrometheusProxy) getBuffer() []byte {
func (p *PrometheusStore) getBuffer() []byte {
b := p.buffers.Get()
if b == nil {
return make([]byte, 0, 32*1024) // 32KB seems like a good minimum starting size.
}
return b.([]byte)
}

func (p *PrometheusProxy) putBuffer(b []byte) {
func (p *PrometheusStore) putBuffer(b []byte) {
p.buffers.Put(b[:0])
}

// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
//
// Prometheus's range query API is not suitable to give us all datapoints. We use the
// instant API and do a range selection in PromQL to cover the queried time range.
func (p *PrometheusProxy) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error {
// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error {
ext := p.externalLabels()

match, newMatchers, err := extLabelsMatches(ext, r.Matchers)
Expand Down Expand Up @@ -179,13 +172,13 @@ func (p *PrometheusProxy) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
var res storepb.SeriesResponse

for _, e := range data.Results[0].Timeseries {
lset := translateAndExtendLabels(e.Labels, ext)
lset := p.translateAndExtendLabels(e.Labels, ext)
// We generally expect all samples of the requested range to be traversed
// so we just encode all samples into one big chunk regardless of size.
//
// Drop all data before r.MinTime since we might have fetched more than
// the requested range (see above).
enc, b, err := encodeChunk(e.Samples, r.MinTime)
enc, b, err := p.encodeChunk(e.Samples, r.MinTime)
if err != nil {
return status.Error(codes.Unknown, err.Error())
}
Expand Down Expand Up @@ -232,7 +225,7 @@ func extLabelsMatches(extLabels labels.Labels, ms []storepb.LabelMatcher) (bool,

// encodeChunk translates the sample pairs into a chunk. It takes a minimum timestamp
// and drops all samples before that one.
func encodeChunk(ss []prompb.Sample, mint int64) (storepb.Chunk_Encoding, []byte, error) {
func (p *PrometheusStore) encodeChunk(ss []prompb.Sample, mint int64) (storepb.Chunk_Encoding, []byte, error) {
c := chunks.NewXORChunk()
a, err := c.Appender()
if err != nil {
Expand All @@ -249,7 +242,7 @@ func encodeChunk(ss []prompb.Sample, mint int64) (storepb.Chunk_Encoding, []byte

// translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally
// attaches the given labels to it, overwriting existing ones on colllision.
func translateAndExtendLabels(m []prompb.Label, extend labels.Labels) []storepb.Label {
func (p *PrometheusStore) translateAndExtendLabels(m []prompb.Label, extend labels.Labels) []storepb.Label {
lset := make([]storepb.Label, 0, len(m)+len(extend))

for _, l := range m {
Expand All @@ -274,14 +267,14 @@ func translateAndExtendLabels(m []prompb.Label, extend labels.Labels) []storepb.
}

// LabelNames returns all known label names.
func (p *PrometheusProxy) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
}

// LabelValues returns all known label values for a given label name.
func (p *PrometheusProxy) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
*storepb.LabelValuesResponse, error,
) {
u := *p.base
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/promproxy_test.go → pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/prometheus/tsdb/labels"
)

func TestPrometheusProxy_Series(t *testing.T) {
func TestPrometheusStore_Series(t *testing.T) {
p, err := testutil.NewPrometheus()
testutil.Ok(t, err)

Expand All @@ -36,7 +36,7 @@ func TestPrometheusProxy_Series(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusProxy(nil, nil, nil, u,
proxy, err := NewPrometheusStore(nil, nil, nil, u,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
})
Expand Down Expand Up @@ -87,7 +87,7 @@ func expandChunk(cit chunks.Iterator) (res []sample) {
return res
}

func TestPrometheusProxy_LabelValues(t *testing.T) {
func TestPrometheusStore_LabelValues(t *testing.T) {
p, err := testutil.NewPrometheus()
testutil.Ok(t, err)

Expand All @@ -106,7 +106,7 @@ func TestPrometheusProxy_LabelValues(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusProxy(nil, nil, nil, u, nil)
proxy, err := NewPrometheusStore(nil, nil, nil, u, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
Expand All @@ -117,7 +117,7 @@ func TestPrometheusProxy_LabelValues(t *testing.T) {
testutil.Equals(t, []string{"a", "b", "c"}, resp.Values)
}

func TestPrometheusProxy_Series_MatchExternalLabel(t *testing.T) {
func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) {
p, err := testutil.NewPrometheus()
testutil.Ok(t, err)

Expand All @@ -138,7 +138,7 @@ func TestPrometheusProxy_Series_MatchExternalLabel(t *testing.T) {
u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusProxy(nil, nil, nil, u,
proxy, err := NewPrometheusStore(nil, nil, nil, u,
func() labels.Labels {
return labels.FromStrings("region", "eu-west")
})
Expand Down
Loading