Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tsdb/{index,compact}: allow using custom postings encoding format #13242

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions tsdb/block_test.go
Expand Up @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{})
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
}

func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{})
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand All @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}

func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{})
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand Down
2 changes: 1 addition & 1 deletion tsdb/blockwriter.go
Expand Up @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
nil,
w.logger,
[]int64{w.blockSize},
chunkenc.NewPool(), nil)
chunkenc.NewPool(), LeveledCompactorOptions{})
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
}
Expand Down
28 changes: 22 additions & 6 deletions tsdb/compact.go
Expand Up @@ -82,6 +82,7 @@ type LeveledCompactor struct {
ctx context.Context
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
}

type CompactorMetrics struct {
Expand Down Expand Up @@ -144,12 +145,17 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
return m
}

// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc)
type LeveledCompactorOptions struct {
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.RawPostingsEncoder for more.
PE index.PostingsEncoder
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc
}

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
if len(ranges) == 0 {
return nil, fmt.Errorf("at least one range must be provided")
}
Expand All @@ -159,9 +165,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
if l == nil {
l = log.NewNopLogger()
}
if mergeFunc == nil {
var mergeFunc storage.VerticalChunkSeriesMergeFunc
if opts.MergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
}
var maxBlockChunkSegmentSize int64
if opts.MaxBlockChunkSegmentSize == 0 {
maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
}
var pe index.PostingsEncoder
if opts.PE == nil {
pe = &index.RawPostingsEncoder{}
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
Expand All @@ -170,6 +185,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
ctx: ctx,
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
}, nil
}

Expand Down Expand Up @@ -599,7 +615,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
}

indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename))
indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder)
if err != nil {
return errors.Wrap(err, "open index writer")
}
Expand Down
18 changes: 9 additions & 9 deletions tsdb/compact_test.go
Expand Up @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
},
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

c.plan(metas)
Expand All @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
180,
540,
1620,
}, nil, nil)
}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

cases := map[string]struct {
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
240,
720,
2160,
}, nil, nil)
}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

cases := []struct {
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
240,
720,
2160,
}, nil, nil)
}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

tmpdir := t.TempDir()
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

meta := &BlockMeta{
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs = append(blockDirs, block.Dir())
}

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{})
require.NoError(b, err)

b.ResetTimer()
Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Sparse head compaction.
mint := sparseHead.MinTime()
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Old head compaction.
mint := oldHead.MinTime()
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down
6 changes: 4 additions & 2 deletions tsdb/db.go
Expand Up @@ -451,7 +451,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(),
nil,
LeveledCompactorOptions{},
)
if err != nil {
return errors.Wrap(err, "create leveled compactor")
Expand Down Expand Up @@ -820,7 +820,9 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}

ctx, cancel := context.WithCancel(context.Background())
db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil)
db.compactor, err = NewLeveledCompactor(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
})
if err != nil {
cancel()
return nil, errors.Wrap(err, "create leveled compactor")
Expand Down
35 changes: 28 additions & 7 deletions tsdb/index/index.go
Expand Up @@ -110,6 +110,10 @@ type symbolCacheEntry struct {
lastValue string
}

type PostingsEncoder interface {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
EncodePostings(*encoding.Encbuf, []uint32)
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}

// Writer implements the IndexWriter interface for the standard
// serialization format.
type Writer struct {
Expand Down Expand Up @@ -148,6 +152,8 @@ type Writer struct {
crc32 hash.Hash

Version int

postingsEncoder PostingsEncoder
}

// TOC represents index Table Of Content that states where each section of index starts.
Expand Down Expand Up @@ -186,7 +192,11 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}

// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
if postingsEncoder == nil {
postingsEncoder = &RawPostingsEncoder{}
}

dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -229,9 +239,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},

symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
symbolCache: make(map[string]symbolCacheEntry, 1<<8),
labelNames: make(map[string]uint64, 1<<8),
crc32: newCRC32(),
postingsEncoder: postingsEncoder,
}
if err := iw.writeMeta(); err != nil {
return nil, err
Expand Down Expand Up @@ -941,6 +952,18 @@ func (w *Writer) writePostingsToTmpFiles() error {
return nil
}

// RawPostingsEncoder is the "basic" postings list encoding format with no compression:
// <BE uint32 len X><BE uint32 0><BE uint32 1>...<BE uint32 X-1>.
type RawPostingsEncoder struct{}

func (r *RawPostingsEncoder) EncodePostings(e *encoding.Encbuf, offs []uint32) {
e.PutBE32int(len(offs))

for _, off := range offs {
e.PutBE32(off)
}
}

func (w *Writer) writePosting(name, value string, offs []uint32) error {
// Align beginning to 4 bytes for more efficient postings list scans.
if err := w.fP.AddPadding(4); err != nil {
Expand All @@ -959,14 +982,12 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
w.cntPO++

w.buf1.Reset()
w.buf1.PutBE32int(len(offs))

for _, off := range offs {
if off > (1<<32)-1 {
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("series offset %d exceeds 4 bytes", off)
}
w.buf1.PutBE32(off)
}
w.postingsEncoder.EncodePostings(&w.buf1, offs)

w.buf2.Reset()
l := w.buf1.Len()
Expand Down
10 changes: 5 additions & 5 deletions tsdb/index/index_test.go
Expand Up @@ -141,7 +141,7 @@ func TestIndexRW_Create_Open(t *testing.T) {
fn := filepath.Join(dir, indexFilename)

// An empty index must still result in a readable file.
iw, err := NewWriter(context.Background(), fn)
iw, err := NewWriter(context.Background(), fn, nil)
require.NoError(t, err)
require.NoError(t, iw.Close())

Expand All @@ -166,7 +166,7 @@ func TestIndexRW_Postings(t *testing.T) {

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn)
iw, err := NewWriter(context.Background(), fn, nil)
require.NoError(t, err)

series := []labels.Labels{
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestPostingsMany(t *testing.T) {

fn := filepath.Join(dir, indexFilename)

iw, err := NewWriter(context.Background(), fn)
iw, err := NewWriter(context.Background(), fn, nil)
require.NoError(t, err)

// Create a label in the index which has 999 values.
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestPersistence_index_e2e(t *testing.T) {
})
}

iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename))
iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename), nil)
require.NoError(t, err)

syms := []string{}
Expand Down Expand Up @@ -475,7 +475,7 @@ func TestPersistence_index_e2e(t *testing.T) {
}

func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) {
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"))
w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), nil)
require.NoError(t, err)

require.NoError(t, w.AddSymbol("__name__"))
Expand Down