Skip to content

Commit

Permalink
Store: add failing test to show an issue with tsdb selector (thanos-i…
Browse files Browse the repository at this point in the history
…o#7468)

The TSDB Selector is more powerful then label matchers. The issue is
that we propagate the TSDB to select with label matchers, but they
cannot convey enough information to select the right TSDB. This is an
example of a configuration that would select too many TSDBs.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Jun 20, 2024
1 parent 0272269 commit 0ff119d
Showing 1 changed file with 114 additions and 17 deletions.
131 changes: 114 additions & 17 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,43 +999,140 @@ func TestTSDBStore_Acceptance(t *testing.T) {
}

func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
t.Skip("This is a known issue, we need to think how to fix it")

t.Cleanup(func() { custom.TolerantVerifyLeak(t) })
ctx := context.Background()

startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
db, err := e2eutil.NewTSDB()
startNestedStore := func(tt *testing.T, appendFn func(app storage.Appender), extLsets ...labels.Labels) storepb.StoreServer {
tmpDir := tt.TempDir()
bktDir := filepath.Join(tmpDir, "bkt")
auxDir := filepath.Join(tmpDir, "aux")
metaDir := filepath.Join(tmpDir, "meta")

testutil.Ok(tt, os.MkdirAll(metaDir, os.ModePerm))
testutil.Ok(tt, os.MkdirAll(auxDir, os.ModePerm))

bkt, err := filesystem.NewBucket(bktDir)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))
tt.Cleanup(func() { testutil.Ok(tt, bkt.Close()) })

return NewTSDBStore(nil, db, component.Rule, extLset)
headOpts := tsdb.DefaultHeadOptions()
headOpts.ChunkDirRoot = tmpDir
headOpts.ChunkRange = 1000
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, h.Close()) })
logger := log.NewNopLogger()

appendFn(h.Appender(context.Background()))

if h.NumSeries() == 0 {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

for _, extLset := range extLsets {
id := createBlockFromHead(tt, auxDir, h)

Check failure on line 1036 in pkg/store/acceptance_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: createBlockFromHead

Check failure on line 1036 in pkg/store/acceptance_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: createBlockFromHead

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
}

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)

insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(tt, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
"",
NewChunksLimiterFactory(10e6),
NewSeriesLimiterFactory(10e6),
NewBytesLimiterFactory(10e6),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
1*time.Minute,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, bucketStore.Close()) })

testutil.Ok(tt, bucketStore.SyncBlocks(context.Background()))

return bucketStore
}

extLset1 := labels.NewBuilder(extLset).Set("replica", "A").Labels()
extLset2 := labels.NewBuilder(extLset).Set("replica", "B").Labels()
extLset3 := labels.NewBuilder(extLset).Set("replica", "C").Labels()
extLset1 := labels.NewBuilder(extLset).Set("L1", "A").Set("L2", "B").Labels()
extLset2 := labels.NewBuilder(extLset).Set("L1", "C").Set("L2", "D").Labels()
extLset3 := labels.NewBuilder(extLset).Set("L1", "A").Set("L2", "D").Labels()

p1 := startNestedStore(tt, extLset1, appendFn)
p2 := startNestedStore(tt, extLset2, appendFn)
p3 := startNestedStore(tt, extLset3, appendFn)
p1 := startNestedStore(tt, appendFn, extLset1, extLset2, extLset3)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p3), ExtLset: []labels.Labels{extLset3}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}},
}

relabelCfgs := []*relabel.Config{{
SourceLabels: model.LabelNames([]model.LabelName{"replica"}),
Regex: relabel.MustNewRegexp("(A|C)"),
SourceLabels: model.LabelNames([]model.LabelName{"L1", "L2"}),
Separator: "-",
Regex: relabel.MustNewRegexp("(A-B|C-D)"),
Action: relabel.Keep,
}}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), WithTSDBSelector(NewTSDBSelector(relabelCfgs)))
}

testStoreAPIsAcceptance(t, startStore)
client := startStore(t, labels.EmptyLabels(), func(app storage.Appender) {
_, err := app.Append(0, labels.FromStrings("a", "b"), 0, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
})
srv := newStoreSeriesServer(ctx)

testutil.Ok(t, client.Series(&storepb.SeriesRequest{
MinTime: minTime.Unix(),
MaxTime: maxTime.Unix(),
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv))

receivedLabels := make([]labels.Labels, 0)
for _, s := range srv.SeriesSet {
receivedLabels = append(receivedLabels, s.PromLabels())
}

// This fails currently because the method of using matchers cannot drop extLset3 even though we should only
// select extLset1 and extLset2 because of the TSDB Selector
testutil.Equals(t, receivedLabels, []labels.Labels{
labels.FromStrings("L1", "A", "L2", "B", "a", "b"),
labels.FromStrings("L1", "C", "L2", "D", "a", "b"),
})

}

func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
Expand Down

0 comments on commit 0ff119d

Please sign in to comment.