Skip to content

Commit

Permalink
tsdb/compact: use a struct for leveled compactor options
Browse files Browse the repository at this point in the history
As discussed on Slack, let's use a struct for the options in leveled
compactor.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Dec 13, 2023
1 parent 141df38 commit b1831ae
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 23 deletions.
6 changes: 3 additions & 3 deletions tsdb/block_test.go
Expand Up @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{})
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string {
}

func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{})
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand All @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string {
}

func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string {
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, LeveledCompactorOptions{})
require.NoError(tb, err)

require.NoError(tb, os.MkdirAll(dir, 0o777))
Expand Down
2 changes: 1 addition & 1 deletion tsdb/blockwriter.go
Expand Up @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {
nil,
w.logger,
[]int64{w.blockSize},
chunkenc.NewPool(), nil, nil)
chunkenc.NewPool(), LeveledCompactorOptions{})
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
}
Expand Down
24 changes: 19 additions & 5 deletions tsdb/compact.go
Expand Up @@ -145,12 +145,17 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics {
return m
}

// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) {
return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, pe)
type LeveledCompactorOptions struct {
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.RawPostingsEncoder for more.
PE index.PostingsEncoder
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
MergeFunc storage.VerticalChunkSeriesMergeFunc
}

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) {
func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, opts LeveledCompactorOptions) (*LeveledCompactor, error) {
if len(ranges) == 0 {
return nil, fmt.Errorf("at least one range must be provided")
}
Expand All @@ -160,9 +165,18 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register
if l == nil {
l = log.NewNopLogger()
}
if mergeFunc == nil {
var mergeFunc storage.VerticalChunkSeriesMergeFunc
if opts.MergeFunc == nil {
mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)
}
var maxBlockChunkSegmentSize int64
if opts.MaxBlockChunkSegmentSize == 0 {
maxBlockChunkSegmentSize = chunks.DefaultChunkSegmentSize
}
var pe index.PostingsEncoder
if opts.PE == nil {
pe = &index.RawPostingsEncoder{}
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
Expand Down
18 changes: 9 additions & 9 deletions tsdb/compact_test.go
Expand Up @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) {
},
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

c.plan(metas)
Expand All @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) {
180,
540,
1620,
}, nil, nil, nil)
}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

cases := map[string]struct {
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) {
240,
720,
2160,
}, nil, nil, nil)
}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

cases := []struct {
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) {
240,
720,
2160,
}, nil, nil, nil)
}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

tmpdir := t.TempDir()
Expand Down Expand Up @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) {
blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt})
}

c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, LeveledCompactorOptions{})
require.NoError(t, err)

meta := &BlockMeta{
Expand Down Expand Up @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs = append(blockDirs, block.Dir())
}

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil)
c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, LeveledCompactorOptions{})
require.NoError(b, err)

b.ResetTimer()
Expand Down Expand Up @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
// Compaction.
mint := head.MinTime()
maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
require.NoError(t, err)
id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Sparse head compaction.
mint := sparseHead.MinTime()
maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
require.NoError(t, err)
sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) {
// Old head compaction.
mint := oldHead.MinTime()
maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime).
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil)
compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), LeveledCompactorOptions{})
require.NoError(t, err)
oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion tsdb/db.go
Expand Up @@ -451,7 +451,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) {
db.logger,
ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5),
chunkenc.NewPool(),
nil, nil,
LeveledCompactorOptions{},
)
if err != nil {
return errors.Wrap(err, "create leveled compactor")
Expand Down
8 changes: 4 additions & 4 deletions tsdb/index/index.go
Expand Up @@ -193,6 +193,10 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {

// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) {
if postingsEncoder == nil {
postingsEncoder = &RawPostingsEncoder{}
}

dir := filepath.Dir(fn)

df, err := fileutil.OpenDir(dir)
Expand Down Expand Up @@ -224,10 +228,6 @@ func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder)
return nil, fmt.Errorf("sync dir: %w", err)
}

if postingsEncoder == nil {
postingsEncoder = &RawPostingsEncoder{}
}

iw := &Writer{
ctx: ctx,
f: f,
Expand Down

0 comments on commit b1831ae

Please sign in to comment.