Skip to content

Commit

Permalink
Make it clear, explicit, and fail early on query with just external l…
Browse files Browse the repository at this point in the history
…abels. (#1310) (#1321)

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
GiedriusS authored and bwplotka committed Jul 11, 2019
1 parent 40829e6 commit 6e1af69
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 45 deletions.
21 changes: 15 additions & 6 deletions pkg/store/prometheus.go
Expand Up @@ -125,15 +125,21 @@ func (p *PrometheusStore) putBuffer(b *[]byte) {

// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_SeriesServer) error {
ext := p.externalLabels()
externalLabels := p.externalLabels()

match, newMatchers, err := labelsMatches(ext, r.Matchers)
match, newMatchers, err := matchesExternalLabels(r.Matchers, externalLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

if !match {
return nil
}

if len(newMatchers) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

q := prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}

// TODO(fabxc): import common definitions from prompb once we have a stable gRPC
Expand Down Expand Up @@ -166,7 +172,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
span.SetTag("series_count", len(resp.Results[0].Timeseries))

for _, e := range resp.Results[0].Timeseries {
lset := p.translateAndExtendLabels(e.Labels, ext)
lset := p.translateAndExtendLabels(e.Labels, externalLabels)

if len(e.Samples) == 0 {
// As found in https://github.com/improbable-eng/thanos/issues/381
Expand Down Expand Up @@ -286,8 +292,10 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom
return &data, nil
}

func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []storepb.LabelMatcher, error) {
if len(lset) == 0 {
// matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them.
// It also returns false if given matchers are not matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []storepb.LabelMatcher, error) {
if len(externalLabels) == 0 {
return true, ms, nil
}

Expand All @@ -299,7 +307,7 @@ func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []store
return false, nil, err
}

extValue := lset.Get(m.Name)
extValue := externalLabels.Get(m.Name)
if extValue == "" {
// Agnostic to external labels.
newMatcher = append(newMatcher, m)
Expand All @@ -312,6 +320,7 @@ func labelsMatches(lset labels.Labels, ms []storepb.LabelMatcher) (bool, []store
return false, nil, nil
}
}

return true, newMatcher, nil
}

Expand Down
63 changes: 40 additions & 23 deletions pkg/store/prometheus_test.go
Expand Up @@ -61,36 +61,53 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
}, nil)
testutil.Ok(t, err)

// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)
{
// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv)
testutil.Ok(t, err)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(srv.SeriesSet))

testutil.Equals(t, 1, len(srv.SeriesSet))
testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, srv.SeriesSet[0].Labels)

testutil.Equals(t, []storepb.Label{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
}, srv.SeriesSet[0].Labels)
testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))

testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))
c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)

c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)
chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)

chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)
samples := expandChunk(chk.Iterator())
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)

samples := expandChunk(chk.Iterator())
testutil.Equals(t, []sample{{baseT + 200, 2}, {baseT + 300, 3}}, samples)
}
// Querying by external labels only.
{
srv := newStoreSeriesServer(ctx)

err = proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"},
},
}, srv)
testutil.NotOk(t, err)
testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error())
}
}

type sample struct {
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/proxy.go
Expand Up @@ -181,14 +181,18 @@ func (s ctxRespSender) send(r *storepb.SeriesResponse) {
// Series returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, newMatchers, err := labelsMatches(s.selectorLabels, r.Matchers)
match, newMatchers, err := matchesExternalLabels(r.Matchers, s.selectorLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
if !match {
return nil
}

if len(newMatchers) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

var (
g, gctx = errgroup.WithContext(srv.Context())

Expand Down Expand Up @@ -220,7 +224,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates
// it cannot have series matching our query.
// NOTE: all matchers are validated in labelsMatches method so we explicitly ignore error.
// NOTE: all matchers are validated in matchesExternalLabels method so we explicitly ignore error.
spanStoreMathes, gctx := tracing.StartSpan(gctx, "store_matches")
ok, _ := storeMatches(st, r.MinTime, r.MaxTime, r.Matchers...)
spanStoreMathes.Finish()
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/proxy_test.go
Expand Up @@ -644,7 +644,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
&storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{{Name: "fed", Value: "a", Type: storepb.LabelMatcher_EQ}},
Matchers: []storepb.LabelMatcher{{Name: "any", Value: ".*", Type: storepb.LabelMatcher_RE}},
}, s,
))
testutil.Equals(t, 0, len(s.SeriesSet))
Expand Down
32 changes: 19 additions & 13 deletions pkg/store/tsdb.go
Expand Up @@ -22,37 +22,37 @@ import (
// It attaches the provided external labels to all results. It only responds with raw data
// and does not support downsampling.
type TSDBStore struct {
logger log.Logger
db *tsdb.DB
component component.SourceStoreAPI
labels labels.Labels
logger log.Logger
db *tsdb.DB
component component.SourceStoreAPI
externalLabels labels.Labels
}

// NewTSDBStore creates a new TSDBStore.
func NewTSDBStore(logger log.Logger, reg prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore {
func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db *tsdb.DB, component component.SourceStoreAPI, externalLabels labels.Labels) *TSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
return &TSDBStore{
logger: logger,
db: db,
component: component,
labels: externalLabels,
logger: logger,
db: db,
component: component,
externalLabels: externalLabels,
}
}

// Info returns store information about the Prometheus instance.
func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
res := &storepb.InfoResponse{
Labels: make([]storepb.Label, 0, len(s.labels)),
Labels: make([]storepb.Label, 0, len(s.externalLabels)),
StoreType: s.component.ToProto(),
MinTime: 0,
MaxTime: math.MaxInt64,
}
if blocks := s.db.Blocks(); len(blocks) > 0 {
res.MinTime = blocks[0].Meta().MinTime
}
for _, l := range s.labels {
for _, l := range s.externalLabels {
res.Labels = append(res.Labels, storepb.Label{
Name: l.Name,
Value: l.Value,
Expand All @@ -73,13 +73,19 @@ func (s *TSDBStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, newMatchers, err := labelsMatches(s.labels, r.Matchers)
match, newMatchers, err := matchesExternalLabels(r.Matchers, s.externalLabels)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}

if !match {
return nil
}

if len(newMatchers) == 0 {
return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error())
}

matchers, err := translateMatchers(newMatchers)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -113,7 +119,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
return status.Errorf(codes.Internal, "encode chunk: %s", err)
}

respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.labels)
respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels)
respSeries.Chunks = append(respSeries.Chunks[:0], c...)

if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil {
Expand Down

0 comments on commit 6e1af69

Please sign in to comment.