Skip to content

Commit

Permalink
tsdb/{index,compact}: allow using custom postings encoding format
Browse files Browse the repository at this point in the history
We would like to experiment with a different postings encoding format in
Thanos so in this change I am proposing adding another argument to
`NewWriter` which would allow users to change the format if needed.
Also, wire the leveled compactor so that it would be possible to change
the format there too.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Dec 4, 2023
1 parent 5dbbadf commit 141df38
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 31 deletions.
6 changes: 3 additions & 3 deletions tsdb/block_test.go
Expand Up @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil)
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
}

func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil)
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand All @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}

func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil)
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand Down
2 changes: 1 addition & 1 deletion tsdb/blockwriter.go
Expand Up @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
nil,
w.logger,
[]int64{w.blockSize},
chunkenc.NewPool(), nil)
chunkenc.NewPool(), nil, nil)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
}
Expand Down
10 changes: 6 additions & 4 deletions tsdb/compact.go
Expand Up @@ -82,6 +82,7 @@ type LeveledCompactor struct {
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
}

type CompactorMetrics struct {
Expand Down Expand Up @@ -145,11 +146,11 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
}

// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc)
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, pe)
}

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) {
if len(ranges) == 0 {
return nil, fmt.Errorf("at least one range must be provided")
}
Expand All @@ -170,6 +171,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
}, nil
}

Expand Down Expand Up @@ -599,7 +601,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
}

indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder)
if err != nil {
return errors.Wrap(err, "open index writer")
}
Expand Down
18 changes: 9 additions & 9 deletions tsdb/compact_test.go
Expand Up @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
},
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, nil)
require.NoError(t, err)

c.plan(metas)
Expand All @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
180,
540,
1620,
}, nil, nil)
}, nil, nil, nil)
require.NoError(t, err)

cases := map[string]struct {
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
240,
720,
2160,
}, nil, nil)
}, nil, nil, nil)
require.NoError(t, err)

cases := []struct {
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
240,
720,
2160,
}, nil, nil)
}, nil, nil, nil)
require.NoError(t, err)

tmpdir := t.TempDir()
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, nil)
require.NoError(t, err)

meta := &BlockMeta{
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs = append(blockDirs, block.Dir())
}

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil)
require.NoError(b, err)

b.ResetTimer()
Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil)
require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Sparse head compaction.
mint := sparseHead.MinTime()
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil)
require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Old head compaction.
mint := oldHead.MinTime()
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil)
require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions tsdb/db.go
Expand Up @@ -451,7 +451,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(),
nil,
nil, nil,
)
if err != nil {
return errors.Wrap(err, "create leveled compactor")
Expand Down Expand Up @@ -820,7 +820,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}

ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, nil)
if err != nil {
cancel()
return nil, errors.Wrap(err, "create leveled compactor")
Expand Down
35 changes: 28 additions & 7 deletions tsdb/index/index.go
Expand Up @@ -110,6 +110,10 @@ type symbolCacheEntry struct {
lastValue string
}

type PostingsEncoder interface {
EncodePostings(*encoding.Encbuf, []uint32)
}

// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
Expand Down Expand Up @@ -148,6 +152,8 @@ type Writer struct {
crc32 hash.Hash

Version int

postingsEncoder PostingsEncoder
}

// TOC represents index Table Of Content that states where each section of index starts.
Expand Down Expand Up @@ -186,7 +192,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}

// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) {
dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -218,6 +224,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return nil, fmt.Errorf("sync dir: %w", err)
}

if postingsEncoder == nil {
postingsEncoder = &RawPostingsEncoder{}
}

iw := &Writer{
ctx: ctx,
f: f,
Expand All @@ -229,9 +239,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},

symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
postingsEncoder: postingsEncoder,
}
if err := iw.writeMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -941,6 +952,18 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil
}

// RawPostingsEncoder is the "basic" postings list encoding format with no compression:
// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>.
type RawPostingsEncoder struct{}

func (r *RawPostingsEncoder) EncodePostings(e *encoding.Encbuf, offs []uint32) {
e.PutBE32int(len(offs))

for _, off := range offs {
e.PutBE32(off)
}
}

func (w *Writer) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.fP.AddPadding(4); err != nil {
Expand All @@ -959,14 +982,12 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.cntPO++

w.buf1.Reset()
w.buf1.PutBE32int(len(offs))

for _, off := range offs {
if off > (1<<32)-1 {
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
}
w.buf1.PutBE32(off)
}
w.postingsEncoder.EncodePostings(&w.buf1, offs)

w.buf2.Reset()
l := w.buf1.Len()
Expand Down
10 changes: 5 additions & 5 deletions tsdb/index/index_test.go
Expand Up @@ -141,7 +141,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
fn := filepath.Join(dir, indexFilename)

// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background(), fn)
iw, err := NewWriter(context.Background(), fn, nil)
require.NoError(t, err)
require.NoError(t, iw.Close())

Expand All @@ -166,7 +166,7 @@ func TestIndexRW_Postings(t *testing.T) {

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn)
iw, err := NewWriter(context.Background(), fn, nil)
require.NoError(t, err)

series := []labels.Labels{
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestPostingsMany(t *testing.T) {

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn)
iw, err := NewWriter(context.Background(), fn, nil)
require.NoError(t, err)

// Create a label in the index which has 999 values.
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}

iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename), nil)
require.NoError(t, err)

syms := []string{}
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"))
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), nil)
require.NoError(t, err)

require.NoError(t, w.AddSymbol("__name__"))
Expand Down

0 comments on commit 141df38

Please sign in to comment.