diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 1570caec4a..fa29c69bc5 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -3050,3 +3050,117 @@ func TestPostingGroupMerge(t *testing.T) { }) } } + +// TestExpandedPostings is a test whether there is a race between multiple ExpandPostings() calls. +func TestExpandedPostingsRace(t *testing.T) { + const blockCount = 10 + + tmpDir := t.TempDir() + t.Cleanup(func() { + testutil.Ok(t, os.RemoveAll(tmpDir)) + }) + + bkt := objstore.NewInMemBucket() + t.Cleanup(func() { + testutil.Ok(t, bkt.Close()) + }) + + // Create a block. + head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{ + TSDBDir: filepath.Join(tmpDir, "head"), + SamplesPerSeries: 10, + ScrapeInterval: 15 * time.Second, + Series: 1000, + PrependLabels: nil, + Random: rand.New(rand.NewSource(120)), + SkipChunks: true, + }) + blockID := createBlockFromHead(t, tmpDir, head) + + bucketBlocks := make([]*bucketBlock, 0, blockCount) + + for i := 0; i < blockCount; i++ { + ul := ulid.MustNew(uint64(i), rand.New(rand.NewSource(444))) + + // Upload the block to the bucket. + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: fmt.Sprintf("%d", i)}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + m, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) + + m.Thanos = thanosMeta + m.BlockMeta.ULID = ul + + e2eutil.Copy(t, filepath.Join(tmpDir, blockID.String()), filepath.Join(tmpDir, ul.String())) + testutil.Ok(t, m.WriteToDir(log.NewLogfmtLogger(os.Stderr), filepath.Join(tmpDir, ul.String()))) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(context.Background(), log.NewLogfmtLogger(os.Stderr), bkt, filepath.Join(tmpDir, ul.String()), metadata.NoneFunc)) + + r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, ul, DefaultPostingOffsetInMemorySampling) + testutil.Ok(t, err) + + blk, err := newBucketBlock( + context.Background(), + log.NewLogfmtLogger(os.Stderr), + newBucketStoreMetrics(nil), + m, + bkt, + filepath.Join(tmpDir, ul.String()), + noopCache{}, + nil, + r, + NewGapBasedPartitioner(PartitionerMaxGapSize), + nil, + nil, + ) + testutil.Ok(t, err) + + bucketBlocks = append(bucketBlocks, blk) + } + + tm, cancel := context.WithTimeout(context.Background(), 40*time.Second) + t.Cleanup(cancel) + + l := sync.Mutex{} + previousRefs := make(map[int][]storage.SeriesRef) + + for { + if tm.Err() != nil { + break + } + + m := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"), + labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"), + labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"), + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + } + + wg := &sync.WaitGroup{} + for i, bb := range bucketBlocks { + wg.Add(1) + i := i + bb := bb + go func(i int, bb *bucketBlock) { + refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil)) + testutil.Ok(t, err) + defer wg.Done() + + l.Lock() + defer l.Unlock() + if previousRefs[i] != nil { + testutil.Equals(t, previousRefs[i], refs) + } else { + previousRefs[i] = refs + } + }(i, bb) + } + wg.Wait() + } +}