Skip to content

Commit

Permalink
tsdb/index: Refactor Reader tests
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed May 9, 2024
1 parent 5c4310a commit 78491c5
Showing 1 changed file with 59 additions and 128 deletions.
187 changes: 59 additions & 128 deletions tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"hash/crc32"
"os"
"path/filepath"
"slices"
"sort"
"strconv"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -160,39 +162,14 @@ func TestIndexRW_Create_Open(t *testing.T) {
}

func TestIndexRW_Postings(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn)
require.NoError(t, err)

series := []labels.Labels{
labels.FromStrings("a", "1", "b", "1"),
labels.FromStrings("a", "1", "b", "2"),
labels.FromStrings("a", "1", "b", "3"),
labels.FromStrings("a", "1", "b", "4"),
var input indexWriterSeriesSlice
for i := 1; i < 5; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("a", "1", "b", strconv.Itoa(i)),
})
}

require.NoError(t, iw.AddSymbol("1"))
require.NoError(t, iw.AddSymbol("2"))
require.NoError(t, iw.AddSymbol("3"))
require.NoError(t, iw.AddSymbol("4"))
require.NoError(t, iw.AddSymbol("a"))
require.NoError(t, iw.AddSymbol("b"))

// Postings lists are only written if a series with the respective
// reference was added before.
require.NoError(t, iw.AddSeries(1, series[0]))
require.NoError(t, iw.AddSeries(2, series[1]))
require.NoError(t, iw.AddSeries(3, series[2]))
require.NoError(t, iw.AddSeries(4, series[3]))

require.NoError(t, iw.Close())

ir, err := NewFileReader(fn)
require.NoError(t, err)
ir, fn, _ := createFileReader(ctx, t, input)

p, err := ir.Postings(ctx, "a", "1")
require.NoError(t, err)
Expand All @@ -205,7 +182,7 @@ func TestIndexRW_Postings(t *testing.T) {

require.NoError(t, err)
require.Empty(t, c)
testutil.RequireEqual(t, series[i], builder.Labels())
testutil.RequireEqual(t, input[i].labels, builder.Labels())
}
require.NoError(t, p.Err())

Expand Down Expand Up @@ -240,8 +217,6 @@ func TestIndexRW_Postings(t *testing.T) {
"b": {"1", "2", "3", "4"},
}, labelIndices)

require.NoError(t, ir.Close())

t.Run("ShardedPostings()", func(t *testing.T) {
ir, err := NewFileReader(fn)
require.NoError(t, err)
Expand Down Expand Up @@ -296,42 +271,16 @@ func TestIndexRW_Postings(t *testing.T) {
}

func TestPostingsMany(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn)
require.NoError(t, err)

// Create a label in the index which has 999 values.
symbols := map[string]struct{}{}
series := []labels.Labels{}
var input indexWriterSeriesSlice
for i := 1; i < 1000; i++ {
v := fmt.Sprintf("%03d", i)
series = append(series, labels.FromStrings("i", v, "foo", "bar"))
symbols[v] = struct{}{}
}
symbols["i"] = struct{}{}
symbols["foo"] = struct{}{}
symbols["bar"] = struct{}{}
syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
sort.Strings(syms)
for _, s := range syms {
require.NoError(t, iw.AddSymbol(s))
}

for i, s := range series {
require.NoError(t, iw.AddSeries(storage.SeriesRef(i), s))
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("i", v, "foo", "bar"),
})
}
require.NoError(t, iw.Close())

ir, err := NewFileReader(fn)
require.NoError(t, err)
defer func() { require.NoError(t, ir.Close()) }()
ir, _, symbols := createFileReader(ctx, t, input)

cases := []struct {
in []string
Expand Down Expand Up @@ -387,25 +336,13 @@ func TestPostingsMany(t *testing.T) {
}

func TestPersistence_index_e2e(t *testing.T) {
dir := t.TempDir()
ctx := context.Background()

lbls, err := labels.ReadLabels(filepath.Join("..", "testdata", "20kseries.json"), 20000)
require.NoError(t, err)

// Sort labels as the index writer expects series in sorted order.
sort.Sort(labels.Slice(lbls))

symbols := map[string]struct{}{}
for _, lset := range lbls {
lset.Range(func(l labels.Label) {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
})
}

var input indexWriterSeriesSlice

ref := uint64(0)
// Generate ChunkMetas for every label set.
for i, lset := range lbls {
Expand All @@ -426,17 +363,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}

iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
require.NoError(t, err)

syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
sort.Strings(syms)
for _, s := range syms {
require.NoError(t, iw.AddSymbol(s))
}
ir, _, _ := createFileReader(ctx, t, input)

// Population procedure as done by compaction.
var (
Expand All @@ -447,8 +374,6 @@ func TestPersistence_index_e2e(t *testing.T) {
mi := newMockIndex()

for i, s := range input {
err = iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...)
require.NoError(t, err)
require.NoError(t, mi.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))

s.labels.Range(func(l labels.Label) {
Expand All @@ -462,12 +387,6 @@ func TestPersistence_index_e2e(t *testing.T) {
postings.Add(storage.SeriesRef(i), s.labels)
}

err = iw.Close()
require.NoError(t, err)

ir, err := NewFileReader(filepath.Join(dir, indexFilename))
require.NoError(t, err)

for p := range mi.postings {
gotp, err := ir.Postings(ctx, p.Name, p.Value)
require.NoError(t, err)
Expand Down Expand Up @@ -523,8 +442,6 @@ func TestPersistence_index_e2e(t *testing.T) {
}
sort.Strings(expSymbols)
require.Equal(t, expSymbols, gotSymbols)

require.NoError(t, ir.Close())
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
Expand Down Expand Up @@ -624,39 +541,14 @@ func BenchmarkReader_ShardedPostings(b *testing.B) {
numShards = 16
)

dir, err := os.MkdirTemp("", "benchmark_reader_sharded_postings")
require.NoError(b, err)
defer func() {
require.NoError(b, os.RemoveAll(dir))
}()

ctx := context.Background()

// Generate an index.
fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(ctx, fn)
require.NoError(b, err)

for i := 1; i <= numSeries; i++ {
require.NoError(b, iw.AddSymbol(fmt.Sprintf("%10d", i)))
}
require.NoError(b, iw.AddSymbol("const"))
require.NoError(b, iw.AddSymbol("unique"))

var input indexWriterSeriesSlice
for i := 1; i <= numSeries; i++ {
require.NoError(b, iw.AddSeries(storage.SeriesRef(i),
labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i))))
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("const", fmt.Sprintf("%10d", 1), "unique", fmt.Sprintf("%10d", i)),
})
}

require.NoError(b, iw.Close())

b.ResetTimer()

// Create a reader to read back all postings from the index.
ir, err := NewFileReader(fn)
require.NoError(b, err)

ir, _, _ := createFileReader(ctx, b, input)
b.ResetTimer()

for n := 0; n < b.N; n++ {
Expand Down Expand Up @@ -719,3 +611,42 @@ func TestChunksTimeOrdering(t *testing.T) {

require.NoError(t, idx.Close())
}

// createFileReader creates a Reader for a temporary index file, which input has been written to.
// It returns the Reader, the index filename and the symbol map.
func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, string, map[string]struct{}) {
tb.Helper()

fn := filepath.Join(tb.TempDir(), indexFilename)

iw, err := NewWriter(ctx, fn)
require.NoError(tb, err)

symbols := map[string]struct{}{}
for _, s := range input {
s.labels.Range(func(l labels.Label) {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
})
}

syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
slices.Sort(syms)
for _, s := range syms {
require.NoError(tb, iw.AddSymbol(s))
}
for i, s := range input {
require.NoError(tb, iw.AddSeries(storage.SeriesRef(i), s.labels, s.chunks...))
}
require.NoError(tb, iw.Close())

ir, err := NewFileReader(fn)
require.NoError(tb, err)
tb.Cleanup(func() {
require.NoError(tb, ir.Close())
})
return ir, fn, symbols
}

0 comments on commit 78491c5

Please sign in to comment.