Skip to content

Commit

Permalink
store: add test for ExpandPostings() race
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Aug 2, 2023
1 parent 3292dc3 commit 3bba706
Showing 1 changed file with 114 additions and 0 deletions.
114 changes: 114 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3050,3 +3050,117 @@ func TestPostingGroupMerge(t *testing.T) {
})
}
}

// TestExpandedPostings is a test whether there is a race between multiple ExpandPostings() calls.
func TestExpandedPostingsRace(t *testing.T) {
const blockCount = 10

tmpDir := t.TempDir()
t.Cleanup(func() {
testutil.Ok(t, os.RemoveAll(tmpDir))
})

bkt := objstore.NewInMemBucket()
t.Cleanup(func() {
testutil.Ok(t, bkt.Close())
})

// Create a block.
head, _ := storetestutil.CreateHeadWithSeries(t, 0, storetestutil.HeadGenOptions{
TSDBDir: filepath.Join(tmpDir, "head"),
SamplesPerSeries: 10,
ScrapeInterval: 15 * time.Second,
Series: 1000,
PrependLabels: nil,
Random: rand.New(rand.NewSource(120)),
SkipChunks: true,
})
blockID := createBlockFromHead(t, tmpDir, head)

bucketBlocks := make([]*bucketBlock, 0, blockCount)

for i := 0; i < blockCount; i++ {
ul := ulid.MustNew(uint64(i), rand.New(rand.NewSource(444)))

// Upload the block to the bucket.
thanosMeta := metadata.Thanos{
Labels: labels.Labels{{Name: "ext1", Value: fmt.Sprintf("%d", i)}}.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
}
m, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String()))
testutil.Ok(t, err)

m.Thanos = thanosMeta
m.BlockMeta.ULID = ul

e2eutil.Copy(t, filepath.Join(tmpDir, blockID.String()), filepath.Join(tmpDir, ul.String()))
testutil.Ok(t, m.WriteToDir(log.NewLogfmtLogger(os.Stderr), filepath.Join(tmpDir, ul.String())))
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(context.Background(), log.NewLogfmtLogger(os.Stderr), bkt, filepath.Join(tmpDir, ul.String()), metadata.NoneFunc))

r, err := indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, ul, DefaultPostingOffsetInMemorySampling)
testutil.Ok(t, err)

blk, err := newBucketBlock(
context.Background(),
log.NewLogfmtLogger(os.Stderr),
newBucketStoreMetrics(nil),
m,
bkt,
filepath.Join(tmpDir, ul.String()),
noopCache{},
nil,
r,
NewGapBasedPartitioner(PartitionerMaxGapSize),
nil,
nil,
)
testutil.Ok(t, err)

bucketBlocks = append(bucketBlocks, blk)
}

tm, cancel := context.WithTimeout(context.Background(), 40*time.Second)
t.Cleanup(cancel)

l := sync.Mutex{}
previousRefs := make(map[int][]storage.SeriesRef)

for {
if tm.Err() != nil {
break
}

m := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"),
labels.MustNewMatcher(labels.MatchRegexp, "i", ".+"),
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}

wg := &sync.WaitGroup{}
for i, bb := range bucketBlocks {
wg.Add(1)
i := i
bb := bb
go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil))
testutil.Ok(t, err)
defer wg.Done()

l.Lock()
defer l.Unlock()
if previousRefs[i] != nil {
testutil.Equals(t, previousRefs[i], refs)
} else {
previousRefs[i] = refs
}
}(i, bb)
}
wg.Wait()
}
}

0 comments on commit 3bba706

Please sign in to comment.