diff --git a/tsdb/block_test.go b/tsdb/block_test.go index ccc69147cae..1c7a5d56862 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{}) require.NoError(t, err) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string { } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{}) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{}) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index d0f63926237..6a67b34ddb7 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), nil, nil) + chunkenc.NewPool(), LeveledCompactorOptions{}) if err != nil { return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 2b9cbe9a808..5c73f70a146 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -145,12 +145,17 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics { return m } -// NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, pe) +type LeveledCompactorOptions struct { + // PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction. + // If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.RawPostingsEncoder for more. + PE index.PostingsEncoder + // MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used. + MaxBlockChunkSegmentSize int64 + // MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used. + MergeFunc storage.VerticalChunkSeriesMergeFunc } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) { +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, fmt.Errorf("at least one range must be provided") } @@ -160,9 +165,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register if l == nil { l = log.NewNopLogger() } - if mergeFunc == nil { + var mergeFunc storage.VerticalChunkSeriesMergeFunc + if opts.MergeFunc == nil { mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) } + var maxBlockChunkSegmentSize int64 + if opts.MaxBlockChunkSegmentSize == 0 { + maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize + } + var pe index.PostingsEncoder + if opts.PE == nil { + pe = &index.RawPostingsEncoder{} + } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e2bfdd78588..55b23bf99db 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{}) require.NoError(t, err) c.plan(metas) @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) { 180, 540, 1620, - }, nil, nil, nil) + }, nil, LeveledCompactorOptions{}) require.NoError(t, err) cases := map[string]struct { @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { 240, 720, 2160, - }, nil, nil, nil) + }, nil, LeveledCompactorOptions{}) require.NoError(t, err) cases := []struct { @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 240, 720, 2160, - }, nil, nil, nil) + }, nil, LeveledCompactorOptions{}) require.NoError(t, err) tmpdir := t.TempDir() @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{}) require.NoError(t, err) meta := &BlockMeta{ @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{}) require.NoError(b, err) b.ResetTimer() @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) { // Compaction. mint := head.MinTime() maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{}) require.NoError(t, err) id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Sparse head compaction. mint := sparseHead.MinTime() maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{}) require.NoError(t, err) sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Old head compaction. mint := oldHead.MinTime() maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{}) require.NoError(t, err) oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) diff --git a/tsdb/db.go b/tsdb/db.go index b1db5dc6e9d..aa7be73a823 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -451,7 +451,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), chunkenc.NewPool(), - nil, nil, + LeveledCompactorOptions{}, ) if err != nil { return errors.Wrap(err, "create leveled compactor") diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 07e22c21d06..eab291bcab2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -193,6 +193,10 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) { + if postingsEncoder == nil { + postingsEncoder = &RawPostingsEncoder{} + } + dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -224,10 +228,6 @@ func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) return nil, fmt.Errorf("sync dir: %w", err) } - if postingsEncoder == nil { - postingsEncoder = &RawPostingsEncoder{} - } - iw := &Writer{ ctx: ctx, f: f,