Skip to content

Commit

Permalink
change block populator to accept postings index function
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Jun 5, 2024
1 parent 8a08f45 commit 6358f3d
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 11 deletions.
28 changes: 18 additions & 10 deletions tsdb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}
closers = append(closers, indexw)

if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil {
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw, AllSortedPostings); err != nil {
return fmt.Errorf("populate block: %w", err)
}

Expand Down Expand Up @@ -718,15 +718,28 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl
}

type BlockPopulator interface {
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error
}

// IndexReaderPostingsFunc is a function to get a posting iterator from a given index reader.
type IndexReaderPostingsFunc func(ctx context.Context, reader IndexReader) index.Postings

// AllSortedPostings returns a sorted all posting iterator from the input index reader.
func AllSortedPostings(ctx context.Context, reader IndexReader) index.Postings {
k, v := index.AllPostingsKey()
all, err := reader.Postings(ctx, k, v)
if err != nil {
return index.ErrPostings(err)
}
return reader.SortedPostings(all)
}

type DefaultBlockPopulator struct{}

// PopulateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint.
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error) {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}
Expand Down Expand Up @@ -784,14 +797,9 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa
}
closers = append(closers, tombsr)

k, v := index.AllPostingsKey()
all, err := indexr.Postings(ctx, k, v)
if err != nil {
return err
}
all = indexr.SortedPostings(all)
postings := postingsFunc(ctx, indexr)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, postings, meta.MinTime, meta.MaxTime-1, false))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
Expand Down
62 changes: 61 additions & 1 deletion tsdb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/prometheus/tsdb/index"
"math"
"math/rand"
"os"
Expand Down Expand Up @@ -493,6 +494,7 @@ func TestCompaction_populateBlock(t *testing.T) {
inputSeriesSamples [][]seriesSamples
compactMinTime int64
compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64.
irPostingsFunc IndexReaderPostingsFunc
expSeriesSamples []seriesSamples
expErr error
}{
Expand Down Expand Up @@ -961,6 +963,60 @@ func TestCompaction_populateBlock(t *testing.T) {
},
},
},
{
title: "Populate from single block with index reader postings function selecting different series. Expect empty block.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
},
},
irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings {
p, err := reader.Postings(ctx, "a", "c")
if err != nil {
return index.EmptyPostings()
}
return reader.SortedPostings(p)
},
},
{
title: "Populate from single block with index reader postings function selecting one series. Expect partial block.",
inputSeriesSamples: [][]seriesSamples{
{
{
lset: map[string]string{"a": "b"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "d"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
},
},
irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings {
p, err := reader.Postings(ctx, "a", "c", "d")
if err != nil {
return index.EmptyPostings()
}
return reader.SortedPostings(p)
},
expSeriesSamples: []seriesSamples{
{
lset: map[string]string{"a": "c"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
{
lset: map[string]string{"a": "d"},
chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}},
},
},
},
} {
t.Run(tc.title, func(t *testing.T) {
blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples))
Expand All @@ -982,7 +1038,11 @@ func TestCompaction_populateBlock(t *testing.T) {

iw := &mockIndexWriter{}
blockPopulator := DefaultBlockPopulator{}
err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{})
irPostingsFunc := AllSortedPostings
if tc.irPostingsFunc != nil {
irPostingsFunc = tc.irPostingsFunc
}
err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}, irPostingsFunc)
if tc.expErr != nil {
require.Error(t, err)
require.Equal(t, tc.expErr.Error(), err.Error())
Expand Down

0 comments on commit 6358f3d

Please sign in to comment.