From 4875874cce8925cd10cf3d20b1730f8c42fbcbef Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Fri, 26 Apr 2024 20:28:47 +0200 Subject: [PATCH] Proxy: acceptance tests for relabel filter Signed-off-by: Michael Hoffmann --- pkg/store/acceptance_test.go | 19 +++++++++++++----- pkg/store/proxy.go | 39 +++++++++++++++++++++++++++--------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index c8541ba865..6355ebb1c6 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -18,7 +18,9 @@ import ( "github.com/pkg/errors" "golang.org/x/exp/slices" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -991,7 +993,7 @@ func TestTSDBStore_Acceptance(t *testing.T) { testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore) } -func TestProxyStore_Acceptance(t *testing.T) { +func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) { t.Cleanup(func() { custom.TolerantVerifyLeak(t) }) startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { @@ -1002,17 +1004,24 @@ func TestProxyStore_Acceptance(t *testing.T) { appendFn(db.Appender(context.Background())) return NewTSDBStore(nil, db, component.Rule, extLset) + } p1 := startNestedStore(tt, extLset, appendFn) - p2 := startNestedStore(tt, extLset, appendFn) + p2 := startNestedStore(tt, labels.FromStrings("some", "label"), appendFn) clients := []Client{ - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1)}, - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2)}, + storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset}}, + storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{labels.FromStrings("some", "label")}}, } - return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval)) + relabelCfgs := []*relabel.Config{{ + SourceLabels: model.LabelNames([]model.LabelName{"some"}), + Regex: relabel.MustNewRegexp("label"), + Action: relabel.Drop, + }} + + return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), WithTSDBSelector(NewTSDBSelector(relabelCfgs))) } testStoreAPIsAcceptance(t, startStore) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 50a508d783..71ec5736ec 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -274,8 +274,12 @@ func (s *ProxyStore) TimeRange() (int64, int64) { func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo { infos := make([]infopb.TSDBInfo, 0) - for _, store := range s.stores() { - infos = append(infos, store.TSDBInfos()...) + for _, st := range s.stores() { + matches, _ := s.tsdbSelector.MatchLabelSets(st.LabelSets()...) + if !matches { + continue + } + infos = append(infos, st.TSDBInfos()...) } return infos } @@ -336,13 +340,15 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok { if s.debugLogging { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason)) } continue } - matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...) if !matches { + if s.debugLogging { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector")) + } continue } storeLabelSets = append(storeLabelSets, extraMatchers...) @@ -360,7 +366,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. for _, st := range stores { st := st if s.debugLogging { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) } respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses) @@ -503,10 +509,18 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(gctx, st, s.debugLogging, r.Start, r.End); !ok { if s.debugLogging { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason)) } continue } + matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...) + if !matches { + if s.debugLogging { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector")) + } + continue + } + if s.debugLogging { storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) } @@ -516,7 +530,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques PartialResponseDisabled: r.PartialResponseDisabled, Start: r.Start, End: r.End, - Matchers: r.Matchers, + Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...), }) if err != nil { err = errors.Wrapf(err, "fetch label names from store %s", st) @@ -590,7 +604,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(gctx, st, s.debugLogging, r.Start, r.End); !ok { if s.debugLogging { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason)) + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason)) + } + continue + } + matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...) + if !matches { + if s.debugLogging { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector")) } continue } @@ -611,7 +632,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ PartialResponseDisabled: r.PartialResponseDisabled, Start: r.Start, End: r.End, - Matchers: r.Matchers, + Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...), }) if err != nil { msg := "fetch label values from store %s"