diff --git a/tsdb/block_test.go b/tsdb/block_test.go index 46e6ecf8441..ccc69147cae 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -312,7 +312,7 @@ func TestBlockSize(t *testing.T) { require.NoError(t, err) require.Equal(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil) require.NoError(t, err) blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil) require.NoError(t, err) @@ -489,7 +489,7 @@ func createBlock(tb testing.TB, dir string, series []storage.Series) string { } func createBlockFromHead(tb testing.TB, dir string, head *Head) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) @@ -502,7 +502,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { } func createBlockFromOOOHead(tb testing.TB, dir string, head *OOOCompactionHead) string { - compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{1000000}, nil, nil, nil) require.NoError(tb, err) require.NoError(tb, os.MkdirAll(dir, 0o777)) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 0d017e095f4..d0f63926237 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -100,7 +100,7 @@ func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) { nil, w.logger, []int64{w.blockSize}, - chunkenc.NewPool(), nil) + chunkenc.NewPool(), nil, nil) if err != nil { return ulid.ULID{}, errors.Wrap(err, "create leveled compactor") } diff --git a/tsdb/compact.go b/tsdb/compact.go index 32c88d2cc0a..2b9cbe9a808 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -82,6 +82,7 @@ type LeveledCompactor struct { ctx context.Context maxBlockChunkSegmentSize int64 mergeFunc storage.VerticalChunkSeriesMergeFunc + postingsEncoder index.PostingsEncoder } type CompactorMetrics struct { @@ -145,11 +146,11 @@ func newCompactorMetrics(r prometheus.Registerer) *CompactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { - return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc) +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) { + return NewLeveledCompactorWithChunkSize(ctx, r, l, ranges, pool, chunks.DefaultChunkSegmentSize, mergeFunc, pe) } -func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) { +func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc, pe index.PostingsEncoder) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, fmt.Errorf("at least one range must be provided") } @@ -170,6 +171,7 @@ func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Register ctx: ctx, maxBlockChunkSegmentSize: maxBlockChunkSegmentSize, mergeFunc: mergeFunc, + postingsEncoder: pe, }, nil } @@ -599,7 +601,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } } - indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) + indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename), c.postingsEncoder) if err != nil { return errors.Wrap(err, "open index writer") } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 94b35e3b4c2..e2bfdd78588 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -164,7 +164,7 @@ func TestNoPanicFor0Tombstones(t *testing.T) { }, } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{50}, nil, nil, nil) require.NoError(t, err) c.plan(metas) @@ -178,7 +178,7 @@ func TestLeveledCompactor_plan(t *testing.T) { 180, 540, 1620, - }, nil, nil) + }, nil, nil, nil) require.NoError(t, err) cases := map[string]struct { @@ -387,7 +387,7 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { 240, 720, 2160, - }, nil, nil) + }, nil, nil, nil) require.NoError(t, err) cases := []struct { @@ -437,7 +437,7 @@ func TestCompactionFailWillCleanUpTempDir(t *testing.T) { 240, 720, 2160, - }, nil, nil) + }, nil, nil, nil) require.NoError(t, err) tmpdir := t.TempDir() @@ -968,7 +968,7 @@ func TestCompaction_populateBlock(t *testing.T) { blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } - c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil, nil, nil) require.NoError(t, err) meta := &BlockMeta{ @@ -1101,7 +1101,7 @@ func BenchmarkCompaction(b *testing.B) { blockDirs = append(blockDirs, block.Dir()) } - c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil) + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil, nil, nil) require.NoError(b, err) b.ResetTimer() @@ -1481,7 +1481,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) { // Compaction. mint := head.MinTime() maxt := head.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil) require.NoError(t, err) id, err := compactor.Write(head.opts.ChunkDirRoot, head, mint, maxt, nil) require.NoError(t, err) @@ -1623,7 +1623,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Sparse head compaction. mint := sparseHead.MinTime() maxt := sparseHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil) require.NoError(t, err) sparseULID, err = compactor.Write(sparseHead.opts.ChunkDirRoot, sparseHead, mint, maxt, nil) require.NoError(t, err) @@ -1674,7 +1674,7 @@ func TestSparseHistogramSpaceSavings(t *testing.T) { // Old head compaction. mint := oldHead.MinTime() maxt := oldHead.MaxTime() + 1 // Block intervals are half-open: [b.MinTime, b.MaxTime). - compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil) + compactor, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{DefaultBlockDuration}, chunkenc.NewPool(), nil, nil) require.NoError(t, err) oldULID, err = compactor.Write(oldHead.opts.ChunkDirRoot, oldHead, mint, maxt, nil) require.NoError(t, err) diff --git a/tsdb/db.go b/tsdb/db.go index 2e3801a9e05..b1db5dc6e9d 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -451,7 +451,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { db.logger, ExponentialBlockRanges(DefaultOptions().MinBlockDuration, 3, 5), chunkenc.NewPool(), - nil, + nil, nil, ) if err != nil { return errors.Wrap(err, "create leveled compactor") @@ -820,7 +820,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } ctx, cancel := context.WithCancel(context.Background()) - db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil) + db.compactor, err = NewLeveledCompactorWithChunkSize(ctx, r, l, rngs, db.chunkPool, opts.MaxBlockChunkSegmentSize, nil, nil) if err != nil { cancel() return nil, errors.Wrap(err, "create leveled compactor") diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 44ee66386f2..eca9498fe9d 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -110,6 +110,10 @@ type symbolCacheEntry struct { lastValue string } +type PostingsEncoder interface { + EncodePostings(encoding.Encbuf, []uint32) +} + // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { @@ -148,6 +152,8 @@ type Writer struct { crc32 hash.Hash Version int + + postingsEncoder PostingsEncoder } // TOC represents index Table Of Content that states where each section of index starts. @@ -186,7 +192,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -func NewWriter(ctx context.Context, fn string) (*Writer, error) { +func NewWriter(ctx context.Context, fn string, postingsEncoder PostingsEncoder) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -218,6 +224,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { return nil, fmt.Errorf("sync dir: %w", err) } + if postingsEncoder == nil { + postingsEncoder = &RawPostingsEncoder{} + } + iw := &Writer{ ctx: ctx, f: f, @@ -229,9 +239,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, - symbolCache: make(map[string]symbolCacheEntry, 1<<8), - labelNames: make(map[string]uint64, 1<<8), - crc32: newCRC32(), + symbolCache: make(map[string]symbolCacheEntry, 1<<8), + labelNames: make(map[string]uint64, 1<<8), + crc32: newCRC32(), + postingsEncoder: postingsEncoder, } if err := iw.writeMeta(); err != nil { return nil, err @@ -941,6 +952,18 @@ func (w *Writer) writePostingsToTmpFiles() error { return nil } +// RawPostingsEncoder is the "basic" postings list encoding format with no compression: +// .... +type RawPostingsEncoder struct{} + +func (r *RawPostingsEncoder) EncodePostings(e encoding.Encbuf, offs []uint32) { + e.PutBE32int(len(offs)) + + for _, off := range offs { + e.PutBE32(off) + } +} + func (w *Writer) writePosting(name, value string, offs []uint32) error { // Align beginning to 4 bytes for more efficient postings list scans. if err := w.fP.AddPadding(4); err != nil { @@ -959,14 +982,13 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.cntPO++ w.buf1.Reset() - w.buf1.PutBE32int(len(offs)) for _, off := range offs { if off > (1<<32)-1 { return fmt.Errorf("series offset %d exceeds 4 bytes", off) } - w.buf1.PutBE32(off) } + w.postingsEncoder.EncodePostings(w.buf1, offs) w.buf2.Reset() l := w.buf1.Len() diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 6c5e313d434..056f8402ed4 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -141,7 +141,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(context.Background(), fn) + iw, err := NewWriter(context.Background(), fn, nil) require.NoError(t, err) require.NoError(t, iw.Close()) @@ -166,7 +166,7 @@ func TestIndexRW_Postings(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(context.Background(), fn) + iw, err := NewWriter(context.Background(), fn, nil) require.NoError(t, err) series := []labels.Labels{ @@ -250,7 +250,7 @@ func TestPostingsMany(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(context.Background(), fn) + iw, err := NewWriter(context.Background(), fn, nil) require.NoError(t, err) // Create a label in the index which has 999 values. @@ -373,7 +373,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename)) + iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename), nil) require.NoError(t, err) syms := []string{} @@ -475,7 +475,7 @@ func TestPersistence_index_e2e(t *testing.T) { } func TestWriter_ShouldReturnErrorOnSeriesWithDuplicatedLabelNames(t *testing.T) { - w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index")) + w, err := NewWriter(context.Background(), filepath.Join(t.TempDir(), "index"), nil) require.NoError(t, err) require.NoError(t, w.AddSymbol("__name__"))