diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f458472..705dc6b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ ## master / unreleased + - [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370) + - Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks. + - Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas. + - Added `MinTime` and `MaxTime` method for `BlockReader`. - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` @@ -7,9 +11,9 @@ - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: - - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` - - new public interface `SizeReader: Size() int64` - - `OpenBlock` signature changed to take a logger. + - Added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - New public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. @@ -24,4 +28,4 @@ - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. - -[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. + - [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. diff --git a/block.go b/block.go index 002ed1be..d9570bdd 100644 --- a/block.go +++ b/block.go @@ -135,6 +135,12 @@ type BlockReader interface { // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) + + // MinTime returns the min time of the block. + MinTime() int64 + + // MaxTime returns the max time of the block. + MaxTime() int64 } // Appendable defines an entity to which data can be appended. @@ -363,6 +369,12 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// MinTime returns the min time of the meta. +func (pb *Block) MinTime() int64 { return pb.meta.MinTime } + +// MaxTime returns the max time of the meta. +func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime } + // Size returns the number of bytes that the block takes up. func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } diff --git a/block_test.go b/block_test.go index 8a4081a3..dd0972d6 100644 --- a/block_test.go +++ b/block_test.go @@ -101,8 +101,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series { if totalSeries == 0 || labelCount == 0 { return nil } - series := make([]Series, totalSeries) + series := make([]Series, totalSeries) for i := 0; i < totalSeries; i++ { lbls := make(map[string]string, labelCount) for len(lbls) < labelCount { @@ -114,7 +114,26 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series { } series[i] = newSeries(lbls, samples) } + return series +} + +// populateSeries generates series from given labels, mint and maxt. +func populateSeries(lbls []map[string]string, mint, maxt int64) []Series { + if len(lbls) == 0 { + return nil + } + series := make([]Series, 0, len(lbls)) + for _, lbl := range lbls { + if len(lbl) == 0 { + continue + } + samples := make([]tsdbutil.Sample, 0, maxt-mint+1) + for t := mint; t <= maxt; t++ { + samples = append(samples, sample{t: t, v: rand.Float64()}) + } + series = append(series, newSeries(lbl, samples)) + } return series } diff --git a/chunks/chunks.go b/chunks/chunks.go index f0f5ac77..3f643bc7 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -196,6 +196,84 @@ func (w *Writer) write(b []byte) error { return err } +// MergeOverlappingChunks removes the samples whose timestamp is overlapping. +// The last appearing sample is retained in case there is overlapping. +// This assumes that `chks []Meta` is sorted w.r.t. MinTime. +func MergeOverlappingChunks(chks []Meta) ([]Meta, error) { + if len(chks) < 2 { + return chks, nil + } + newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks. + newChks = append(newChks, chks[0]) + last := 0 + for _, c := range chks[1:] { + // We need to check only the last chunk in newChks. + // Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping) + // (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime. + // So never overlaps with newChks[last-1] or anything before that. + if c.MinTime > newChks[last].MaxTime { + newChks = append(newChks, c) + continue + } + nc := &newChks[last] + if c.MaxTime > nc.MaxTime { + nc.MaxTime = c.MaxTime + } + chk, err := MergeChunks(nc.Chunk, c.Chunk) + if err != nil { + return nil, err + } + nc.Chunk = chk + } + + return newChks, nil +} + +// MergeChunks vertically merges a and b, i.e., if there is any sample +// with same timestamp in both a and b, the sample in a is discarded. +func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) { + newChunk := chunkenc.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + return nil, err + } + ait := a.Iterator() + bit := b.Iterator() + aok, bok := ait.Next(), bit.Next() + for aok && bok { + at, av := ait.At() + bt, bv := bit.At() + if at < bt { + app.Append(at, av) + aok = ait.Next() + } else if bt < at { + app.Append(bt, bv) + bok = bit.Next() + } else { + app.Append(bt, bv) + aok = ait.Next() + bok = bit.Next() + } + } + for aok { + at, av := ait.At() + app.Append(at, av) + aok = ait.Next() + } + for bok { + bt, bv := bit.At() + app.Append(bt, bv) + bok = bit.Next() + } + if ait.Err() != nil { + return nil, ait.Err() + } + if bit.Err() != nil { + return nil, bit.Err() + } + return newChunk, nil +} + func (w *Writer) WriteChunks(chks ...Meta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. diff --git a/compact.go b/compact.go index 5e2cb1a4..065b43e7 100644 --- a/compact.go +++ b/compact.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "io" + "math" "math/rand" "os" "path/filepath" @@ -80,13 +81,14 @@ type LeveledCompactor struct { } type compactorMetrics struct { - ran prometheus.Counter - populatingBlocks prometheus.Gauge - failed prometheus.Counter - duration prometheus.Histogram - chunkSize prometheus.Histogram - chunkSamples prometheus.Histogram - chunkRange prometheus.Histogram + ran prometheus.Counter + populatingBlocks prometheus.Gauge + failed prometheus.Counter + overlappingBlocks prometheus.Counter + duration prometheus.Histogram + chunkSize prometheus.Histogram + chunkSamples prometheus.Histogram + chunkRange prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { @@ -104,6 +106,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", }) + m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_vertical_compactions_total", + Help: "Total number of compactions done on overlapping blocks.", + }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_compaction_duration_seconds", Help: "Duration of compaction runs", @@ -130,6 +136,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { m.ran, m.populatingBlocks, m.failed, + m.overlappingBlocks, m.duration, m.chunkRange, m.chunkSamples, @@ -147,6 +154,9 @@ func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Log if pool == nil { pool = chunkenc.NewPool() } + if l == nil { + l = log.NewNopLogger() + } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, @@ -187,11 +197,15 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return dms[i].meta.MinTime < dms[j].meta.MinTime }) + res := c.selectOverlappingDirs(dms) + if len(res) > 0 { + return res, nil + } + // No overlapping blocks, do compaction the usual way. // We do not include a recently created block with max(minTime), so the block which was just created from WAL. // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. dms = dms[:len(dms)-1] - var res []string for _, dm := range c.selectDirs(dms) { res = append(res, dm.dir) } @@ -252,6 +266,28 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { return nil } +// selectOverlappingDirs returns all dirs with overlaping time ranges. +// It expects sorted input by mint. +func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { + if len(ds) < 2 { + return nil + } + var overlappingDirs []string + globalMaxt := ds[0].meta.MaxTime + for i, d := range ds[1:] { + if d.meta.MinTime < globalMaxt { + if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well. + overlappingDirs = append(overlappingDirs, ds[i].dir) + } + overlappingDirs = append(overlappingDirs, d.dir) + } + if d.meta.MaxTime > globalMaxt { + globalMaxt = d.meta.MaxTime + } + } + return overlappingDirs +} + // splitByRange splits the directories by the time range. The range sequence starts at 0. // // For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 @@ -299,12 +335,17 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { res := &BlockMeta{ ULID: uid, MinTime: blocks[0].MinTime, - MaxTime: blocks[len(blocks)-1].MaxTime, } sources := map[ulid.ULID]struct{}{} + // For overlapping blocks, the Maxt can be + // in any block so we track it globally. + maxt := int64(math.MinInt64) for _, b := range blocks { + if b.MaxTime > maxt { + maxt = b.MaxTime + } if b.Compaction.Level > res.Compaction.Level { res.Compaction.Level = b.Compaction.Level } @@ -326,6 +367,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 }) + res.MaxTime = maxt return res } @@ -603,15 +645,17 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. +// It expects sorted blocks input by mint. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } var ( - set ChunkSeriesSet - allSymbols = make(map[string]struct{}, 1<<16) - closers = []io.Closer{} + set ChunkSeriesSet + allSymbols = make(map[string]struct{}, 1<<16) + closers = []io.Closer{} + overlapping bool ) defer func() { var merr MultiError @@ -622,6 +666,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, }() c.metrics.populatingBlocks.Set(1) + globalMaxt := blocks[0].MaxTime() for i, b := range blocks { select { case <-c.ctx.Done(): @@ -629,6 +674,17 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, default: } + if !overlapping { + if i > 0 && b.MinTime() < globalMaxt { + c.metrics.overlappingBlocks.Inc() + overlapping = true + level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) + } + if b.MaxTime() > globalMaxt { + globalMaxt = b.MaxTime() + } + } + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -692,6 +748,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } lset, chks, dranges := set.At() // The chunks here are not fully deleted. + if overlapping { + // If blocks are overlapping, it is possible to have unsorted chunks. + sort.Slice(chks, func(i, j int) bool { + return chks[i].MinTime < chks[j].MinTime + }) + } // Skip the series with all deleted chunks. if len(chks) == 0 { @@ -725,21 +787,28 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - if err := chunkw.WriteChunks(chks...); err != nil { + mergedChks := chks + if overlapping { + mergedChks, err = chunks.MergeOverlappingChunks(chks) + if err != nil { + return errors.Wrap(err, "merge overlapping chunks") + } + } + if err := chunkw.WriteChunks(mergedChks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(i, lset, chks...); err != nil { + if err := indexw.AddSeries(i, lset, mergedChks...); err != nil { return errors.Wrap(err, "add series") } - meta.Stats.NumChunks += uint64(len(chks)) + meta.Stats.NumChunks += uint64(len(mergedChks)) meta.Stats.NumSeries++ - for _, chk := range chks { + for _, chk := range mergedChks { meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } - for _, chk := range chks { + for _, chk := range mergedChks { if err := c.chunkPool.Put(chk.Chunk); err != nil { return errors.Wrap(err, "put chunk") } diff --git a/compact_test.go b/compact_test.go index a755d69f..9dc9396a 100644 --- a/compact_test.go +++ b/compact_test.go @@ -15,6 +15,7 @@ package tsdb import ( "context" + "fmt" "io/ioutil" "math" "os" @@ -311,13 +312,46 @@ func TestLeveledCompactor_plan(t *testing.T) { }, expected: []string{"7", "8"}, }, + // For overlapping blocks. + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 19, 40, nil), + metaRange("3", 40, 60, nil), + }, + expected: []string{"1", "2"}, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 30, 50, nil), + }, + expected: []string{"2", "3"}, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 10, 40, nil), + metaRange("3", 30, 50, nil), + }, + expected: []string{"1", "2", "3"}, + }, + { + metas: []dirMeta{ + metaRange("5", 0, 360, nil), + metaRange("6", 340, 560, nil), + metaRange("7", 360, 420, nil), + metaRange("8", 420, 540, nil), + }, + expected: []string{"5", "6", "7", "8"}, + }, } for _, c := range cases { if !t.Run("", func(t *testing.T) { res, err := compactor.plan(c.metas) testutil.Ok(t, err) - testutil.Equals(t, c.expected, res) }) { return @@ -410,6 +444,8 @@ type erringBReader struct{} func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } +func (erringBReader) MinTime() int64 { return 0 } +func (erringBReader) MaxTime() int64 { return 0 } type nopChunkWriter struct{} @@ -422,9 +458,8 @@ func TestCompaction_populateBlock(t *testing.T) { inputSeriesSamples [][]seriesSamples compactMinTime int64 compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64. - - expSeriesSamples []seriesSamples - expErr error + expSeriesSamples []seriesSamples + expErr error }{ { title: "Populate block from empty input should return error.", @@ -503,37 +538,37 @@ func TestCompaction_populateBlock(t *testing.T) { { { lset: map[string]string{"a": "b"}, - chunks: [][]sample{{{t: 21}, {t: 30}}}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, }, { lset: map[string]string{"a": "c"}, - chunks: [][]sample{{{t: 40}, {t: 45}}}, + chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, }, }, { { lset: map[string]string{"a": "b"}, - chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + chunks: [][]sample{{{t: 21}, {t: 30}}}, }, { lset: map[string]string{"a": "c"}, - chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, + chunks: [][]sample{{{t: 40}, {t: 45}}}, }, }, }, expSeriesSamples: []seriesSamples{ { lset: map[string]string{"a": "b"}, - chunks: [][]sample{{{t: 21}, {t: 30}}, {{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}, {{t: 21}, {t: 30}}}, }, { lset: map[string]string{"a": "c"}, - chunks: [][]sample{{{t: 40}, {t: 45}}, {{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}}, + chunks: [][]sample{{{t: 1}, {t: 9}}, {{t: 10}, {t: 19}}, {{t: 40}, {t: 45}}}, }, }, }, { - title: "Populate from two blocks showing that order or series is sorted.", + title: "Populate from two blocks showing that order of series is sorted.", inputSeriesSamples: [][]seriesSamples{ { { @@ -647,8 +682,8 @@ func TestCompaction_populateBlock(t *testing.T) { if ok := t.Run(tc.title, func(t *testing.T) { blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples)) for _, b := range tc.inputSeriesSamples { - ir, cr := createIdxChkReaders(b) - blocks = append(blocks, &mockBReader{ir: ir, cr: cr}) + ir, cr, mint, maxt := createIdxChkReaders(b) + blocks = append(blocks, &mockBReader{ir: ir, cr: cr, mint: mint, maxt: maxt}) } c, err := NewLeveledCompactor(context.Background(), nil, nil, []int64{0}, nil) @@ -690,6 +725,78 @@ func TestCompaction_populateBlock(t *testing.T) { } } +func BenchmarkCompaction(b *testing.B) { + cases := []struct { + ranges [][2]int64 + compactionType string + }{ + { + ranges: [][2]int64{{0, 100}, {200, 300}, {400, 500}, {600, 700}}, + compactionType: "normal", + }, + { + ranges: [][2]int64{{0, 1000}, {2000, 3000}, {4000, 5000}, {6000, 7000}}, + compactionType: "normal", + }, + { + ranges: [][2]int64{{0, 10000}, {20000, 30000}, {40000, 50000}, {60000, 70000}}, + compactionType: "normal", + }, + { + ranges: [][2]int64{{0, 100000}, {200000, 300000}, {400000, 500000}, {600000, 700000}}, + compactionType: "normal", + }, + // 40% overlaps. + { + ranges: [][2]int64{{0, 100}, {60, 160}, {120, 220}, {180, 280}}, + compactionType: "vertical", + }, + { + ranges: [][2]int64{{0, 1000}, {600, 1600}, {1200, 2200}, {1800, 2800}}, + compactionType: "vertical", + }, + { + ranges: [][2]int64{{0, 10000}, {6000, 16000}, {12000, 22000}, {18000, 28000}}, + compactionType: "vertical", + }, + { + ranges: [][2]int64{{0, 100000}, {60000, 160000}, {120000, 220000}, {180000, 280000}}, + compactionType: "vertical", + }, + } + + nSeries := 10000 + for _, c := range cases { + nBlocks := len(c.ranges) + b.Run(fmt.Sprintf("type=%s,blocks=%d,series=%d,samplesPerSeriesPerBlock=%d", c.compactionType, nBlocks, nSeries, c.ranges[0][1]-c.ranges[0][0]+1), func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_compaction") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + blockDirs := make([]string, 0, len(c.ranges)) + var blocks []*Block + for _, r := range c.ranges { + block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil) + testutil.Ok(b, err) + blocks = append(blocks, block) + defer func() { + testutil.Ok(b, block.Close()) + }() + blockDirs = append(blockDirs, block.Dir()) + } + + c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil) + testutil.Ok(b, err) + + b.ResetTimer() + b.ReportAllocs() + _, err = c.Compact(dir, blockDirs, blocks) + testutil.Ok(b, err) + }) + } +} + // TestDisableAutoCompactions checks that we can // disable and enable the auto compaction. // This is needed for unit tests that rely on diff --git a/db.go b/db.go index 8307c082..fd457ca4 100644 --- a/db.go +++ b/db.go @@ -546,11 +546,8 @@ func (db *DB) reload() (err error) { db.metrics.blocksBytes.Set(float64(blocksSize)) sort.Slice(loadable, func(i, j int) bool { - return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime + return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime }) - if err := validateBlockSequence(loadable); err != nil { - return errors.Wrap(err, "invalid block sequence") - } // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() @@ -558,6 +555,14 @@ func (db *DB) reload() (err error) { db.blocks = loadable db.mtx.Unlock() + blockMetas := make([]BlockMeta, 0, len(loadable)) + for _, b := range loadable { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(db.logger).Log("msg", "overlapping blocks found during reload", "detail", overlaps.String()) + } + for _, b := range oldBlocks { if _, ok := deletable[b.Meta().ULID]; ok { deletable[b.Meta().ULID] = b @@ -694,25 +699,6 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { return nil } -// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. -func validateBlockSequence(bs []*Block) error { - if len(bs) <= 1 { - return nil - } - - var metas []BlockMeta - for _, b := range bs { - metas = append(metas, b.meta) - } - - overlaps := OverlappingBlocks(metas) - if len(overlaps) > 0 { - return errors.Errorf("block time ranges overlap: %s", overlaps) - } - - return nil -} - // TimeRange specifies minTime and maxTime range. type TimeRange struct { Min, Max int64 @@ -909,6 +895,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { // A goroutine must not handle more than one open Querier. func (db *DB) Querier(mint, maxt int64) (Querier, error) { var blocks []BlockReader + var blockMetas []BlockMeta db.mtx.RLock() defer db.mtx.RUnlock() @@ -916,6 +903,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { for _, b := range db.blocks { if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) + blockMetas = append(blockMetas, b.Meta()) } } if maxt >= db.head.MinTime() { @@ -926,22 +914,31 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { }) } - sq := &querier{ - blocks: make([]Querier, 0, len(blocks)), - } + blockQueriers := make([]Querier, 0, len(blocks)) for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) if err == nil { - sq.blocks = append(sq.blocks, q) + blockQueriers = append(blockQueriers, q) continue } // If we fail, all previously opened queriers must be closed. - for _, q := range sq.blocks { + for _, q := range blockQueriers { q.Close() } return nil, errors.Wrapf(err, "open querier for block %s", b) } - return sq, nil + + if len(OverlappingBlocks(blockMetas)) > 0 { + return &verticalQuerier{ + querier: querier{ + blocks: blockQueriers, + }, + }, nil + } + + return &querier{ + blocks: blockQueriers, + }, nil } func rangeForTimestamp(t int64, width int64) (maxt int64) { diff --git a/db_test.go b/db_test.go index 1b5cb350..857273ad 100644 --- a/db_test.go +++ b/db_test.go @@ -52,16 +52,19 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { } // query runs a matcher query against the querier and fully expands its data. -func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample { +func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]tsdbutil.Sample { ss, err := q.Select(matchers...) + defer func() { + testutil.Ok(t, q.Close()) + }() testutil.Ok(t, err) - result := map[string][]sample{} + result := map[string][]tsdbutil.Sample{} for ss.Next() { series := ss.At() - samples := []sample{} + samples := []tsdbutil.Sample{} it := series.Iterator() for it.Next() { t, v := it.At() @@ -124,9 +127,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { querier, err := db.Querier(0, 1) testutil.Ok(t, err) seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) - - testutil.Equals(t, map[string][]sample{}, seriesSet) - testutil.Ok(t, querier.Close()) + testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet) err = app.Commit() testutil.Ok(t, err) @@ -137,7 +138,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, map[string][]sample{`{foo="bar"}`: {{t: 0, v: 0}}}, seriesSet) + testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {sample{t: 0, v: 0}}}, seriesSet) } func TestDataNotAvailableAfterRollback(t *testing.T) { @@ -160,7 +161,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) - testutil.Equals(t, map[string][]sample{}, seriesSet) + testutil.Equals(t, map[string][]tsdbutil.Sample{}, seriesSet) } func TestDBAppenderAddRef(t *testing.T) { @@ -207,17 +208,15 @@ func TestDBAppenderAddRef(t *testing.T) { res := query(t, q, labels.NewEqualMatcher("a", "b")) - testutil.Equals(t, map[string][]sample{ + testutil.Equals(t, map[string][]tsdbutil.Sample{ labels.FromStrings("a", "b").String(): { - {t: 123, v: 0}, - {t: 124, v: 1}, - {t: 125, v: 0}, - {t: 133, v: 1}, - {t: 143, v: 2}, + sample{t: 123, v: 0}, + sample{t: 124, v: 1}, + sample{t: 125, v: 0}, + sample{t: 133, v: 1}, + sample{t: 143, v: 2}, }, }, res) - - testutil.Ok(t, q.Close()) } func TestDeleteSimple(t *testing.T) { @@ -398,12 +397,10 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { ssMap := query(t, q, labels.NewEqualMatcher("a", "b")) - testutil.Equals(t, map[string][]sample{ - labels.New(labels.Label{"a", "b"}).String(): {{0, 1}}, + testutil.Equals(t, map[string][]tsdbutil.Sample{ + labels.New(labels.Label{"a", "b"}).String(): {sample{0, 1}}, }, ssMap) - testutil.Ok(t, q.Close()) - // Append Out of Order Value. app = db.Appender() _, err = app.Add(labels.Labels{{"a", "b"}}, 10, 3) @@ -417,10 +414,9 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { ssMap = query(t, q, labels.NewEqualMatcher("a", "b")) - testutil.Equals(t, map[string][]sample{ - labels.New(labels.Label{"a", "b"}).String(): {{0, 1}, {10, 3}}, + testutil.Equals(t, map[string][]tsdbutil.Sample{ + labels.New(labels.Label{"a", "b"}).String(): {sample{0, 1}, sample{10, 3}}, }, ssMap) - testutil.Ok(t, q.Close()) } func TestDB_Snapshot(t *testing.T) { @@ -610,9 +606,9 @@ func TestDB_e2e(t *testing.T) { }, } - seriesMap := map[string][]sample{} + seriesMap := map[string][]tsdbutil.Sample{} for _, l := range lbls { - seriesMap[labels.New(l...).String()] = []sample{} + seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} } db, delete := openTestDB(t, nil) @@ -625,7 +621,7 @@ func TestDB_e2e(t *testing.T) { for _, l := range lbls { lset := labels.New(l...) - series := []sample{} + series := []tsdbutil.Sample{} ts := rand.Int63n(300) for i := 0; i < numDatapoints; i++ { @@ -682,7 +678,7 @@ func TestDB_e2e(t *testing.T) { mint := rand.Int63n(300) maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints)) - expected := map[string][]sample{} + expected := map[string][]tsdbutil.Sample{} // Build the mockSeriesSet. for _, m := range matched { @@ -698,7 +694,7 @@ func TestDB_e2e(t *testing.T) { ss, err := q.Select(qry.ms...) testutil.Ok(t, err) - result := map[string][]sample{} + result := map[string][]tsdbutil.Sample{} for ss.Next() { x := ss.At() @@ -1666,6 +1662,310 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } +func TestVerticalCompaction(t *testing.T) { + cases := []struct { + blockSeries [][]Series + expSeries map[string][]tsdbutil.Sample + }{ + // Case 0 + // |--------------| + // |----------------| + { + blockSeries: [][]Series{ + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, + sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, + sample{12, 99}, sample{13, 99}, sample{14, 99}, + }), + }, + }, + expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, + sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, + sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 99}, + sample{12, 99}, sample{13, 99}, sample{14, 99}, + }}, + }, + // Case 1 + // |-------------------------------| + // |----------------| + { + blockSeries: [][]Series{ + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, + sample{11, 0}, sample{13, 0}, sample{17, 0}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, + sample{8, 99}, sample{9, 99}, sample{10, 99}, + }), + }, + }, + expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, + sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, + sample{8, 99}, sample{9, 99}, sample{10, 99}, sample{11, 0}, + sample{13, 0}, sample{17, 0}, + }}, + }, + // Case 2 + // |-------------------------------| + // |------------| + // |--------------------| + { + blockSeries: [][]Series{ + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{7, 0}, sample{8, 0}, sample{9, 0}, + sample{11, 0}, sample{13, 0}, sample{17, 0}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 99}, + sample{8, 99}, sample{9, 99}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, + sample{21, 59}, sample{22, 59}, + }), + }, + }, + expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, + sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 99}, + sample{8, 99}, sample{9, 99}, sample{11, 0}, sample{13, 0}, + sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, + sample{21, 59}, sample{22, 59}, + }}, + }, + // Case 3 + // |-------------------| + // |--------------------| + // |----------------| + { + blockSeries: [][]Series{ + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{8, 0}, sample{9, 0}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{14, 59}, sample{15, 59}, sample{17, 59}, sample{20, 59}, + sample{21, 59}, sample{22, 59}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99}, + sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, + sample{16, 99}, sample{17, 99}, + }), + }, + }, + expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 99}, sample{6, 99}, sample{7, 99}, sample{8, 99}, + sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{14, 59}, + sample{15, 59}, sample{16, 99}, sample{17, 59}, sample{20, 59}, + sample{21, 59}, sample{22, 59}, + }}, + }, + // Case 4 + // |-------------------------------------| + // |------------| + // |-------------------------| + { + blockSeries: [][]Series{ + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, + sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, + sample{20, 0}, sample{22, 0}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, + sample{11, 59}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, + sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, + sample{16, 99}, sample{17, 99}, + }), + }, + }, + expSeries: map[string][]tsdbutil.Sample{`{a="b"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, + sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59}, + sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, + sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, + sample{20, 0}, sample{22, 0}, + }}, + }, + // Case 5: series are merged properly when there are multiple series. + // |-------------------------------------| + // |------------| + // |-------------------------| + { + blockSeries: [][]Series{ + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, + sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, + sample{20, 0}, sample{22, 0}, + }), + newSeries(map[string]string{"b": "c"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, + sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, + sample{20, 0}, sample{22, 0}, + }), + newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{ + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, + sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, + sample{20, 0}, sample{22, 0}, + }), + }, + []Series{ + newSeries(map[string]string{"__name__": "a"}, []tsdbutil.Sample{ + sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, + sample{11, 59}, + }), + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, + sample{11, 59}, + }), + newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{ + sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, + sample{11, 59}, + }), + newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{ + sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, + sample{11, 59}, + }), + }, + []Series{ + newSeries(map[string]string{"a": "b"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, + sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, + sample{16, 99}, sample{17, 99}, + }), + newSeries(map[string]string{"aa": "bb"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, + sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, + sample{16, 99}, sample{17, 99}, + }), + newSeries(map[string]string{"c": "d"}, []tsdbutil.Sample{ + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{8, 99}, + sample{9, 99}, sample{10, 99}, sample{13, 99}, sample{15, 99}, + sample{16, 99}, sample{17, 99}, + }), + }, + }, + expSeries: map[string][]tsdbutil.Sample{ + `{__name__="a"}`: { + sample{7, 59}, sample{8, 59}, sample{9, 59}, sample{10, 59}, + sample{11, 59}, + }, + `{a="b"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, + sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59}, + sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, + sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, + sample{20, 0}, sample{22, 0}, + }, + `{aa="bb"}`: { + sample{3, 99}, sample{5, 99}, sample{6, 99}, sample{7, 59}, + sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, + sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, + }, + `{b="c"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{4, 0}, + sample{5, 0}, sample{8, 0}, sample{9, 0}, sample{10, 0}, + sample{13, 0}, sample{15, 0}, sample{16, 0}, sample{17, 0}, + sample{20, 0}, sample{22, 0}, + }, + `{c="d"}`: { + sample{0, 0}, sample{1, 0}, sample{2, 0}, sample{3, 99}, + sample{4, 0}, sample{5, 99}, sample{6, 99}, sample{7, 59}, + sample{8, 59}, sample{9, 59}, sample{10, 59}, sample{11, 59}, + sample{13, 99}, sample{15, 99}, sample{16, 99}, sample{17, 99}, + sample{20, 0}, sample{22, 0}, + }, + }, + }, + } + + defaultMatcher := labels.NewMustRegexpMatcher("__name__", ".*") + for _, c := range cases { + if ok := t.Run("", func(t *testing.T) { + + tmpdir, err := ioutil.TempDir("", "data") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + }() + + for _, series := range c.blockSeries { + createBlock(t, tmpdir, series) + } + db, err := Open(tmpdir, nil, nil, nil) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + db.DisableCompactions() + testutil.Assert(t, len(db.blocks) == len(c.blockSeries), "Wrong number of blocks [before compact].") + + // Vertical Query Merging test. + querier, err := db.Querier(0, 100) + testutil.Ok(t, err) + actSeries := query(t, querier, defaultMatcher) + testutil.Equals(t, c.expSeries, actSeries) + + // Vertical compaction. + lc := db.compactor.(*LeveledCompactor) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count should be still 0 here") + err = db.compact() + testutil.Ok(t, err) + testutil.Equals(t, 1, len(db.Blocks()), "Wrong number of blocks [after compact]") + + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(lc.metrics.overlappingBlocks)), "overlapping blocks count mismatch") + + // Query test after merging the overlapping blocks. + querier, err = db.Querier(0, 100) + testutil.Ok(t, err) + actSeries = query(t, querier, defaultMatcher) + testutil.Equals(t, c.expSeries, actSeries) + }); !ok { + return + } + } +} + // TestBlockRanges checks the following use cases: // - No samples can be added with timestamps lower than the last block maxt. // - The compactor doesn't create overlapping blocks diff --git a/head.go b/head.go index c63c3a86..c5ac06c9 100644 --- a/head.go +++ b/head.go @@ -616,6 +616,14 @@ func (h *rangeHead) Tombstones() (TombstoneReader, error) { return emptyTombstoneReader, nil } +func (h *rangeHead) MinTime() int64 { + return h.mint +} + +func (h *rangeHead) MaxTime() int64 { + return h.maxt +} + // initAppender is a helper to initialize the time bounds of the head // upon the first sample it receives. type initAppender struct { diff --git a/head_test.go b/head_test.go index 99b3895e..679e8c83 100644 --- a/head_test.go +++ b/head_test.go @@ -489,7 +489,7 @@ func TestDeleteUntilCurMax(t *testing.T) { it := exps.Iterator() ressmpls, err := expandSeriesIterator(it) testutil.Ok(t, err) - testutil.Equals(t, []sample{{11, 1}}, ressmpls) + testutil.Equals(t, []tsdbutil.Sample{sample{11, 1}}, ressmpls) } func TestDelete_e2e(t *testing.T) { numDatapoints := 1000 @@ -650,16 +650,16 @@ func TestDelete_e2e(t *testing.T) { } } -func boundedSamples(full []sample, mint, maxt int64) []sample { +func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample { for len(full) > 0 { - if full[0].t >= mint { + if full[0].T() >= mint { break } full = full[1:] } for i, s := range full { // labels.Labelinate on the first sample larger than maxt. - if s.t > maxt { + if s.T() > maxt { return full[:i] } } diff --git a/mocks_test.go b/mocks_test.go index ee0cb973..243d5cf1 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -64,10 +64,14 @@ func (mockIndexWriter) WritePostings(name, value string, it index.Postings) erro func (mockIndexWriter) Close() error { return nil } type mockBReader struct { - ir IndexReader - cr ChunkReader + ir IndexReader + cr ChunkReader + mint int64 + maxt int64 } func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil } +func (r *mockBReader) MinTime() int64 { return r.mint } +func (r *mockBReader) MaxTime() int64 { return r.maxt } diff --git a/querier.go b/querier.go index 4a5a4063..61503d67 100644 --- a/querier.go +++ b/querier.go @@ -111,7 +111,6 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) - } func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { @@ -143,6 +142,36 @@ func (q *querier) Close() error { return merr.Err() } +// verticalQuerier aggregates querying results from time blocks within +// a single partition. The block time ranges can be overlapping. +type verticalQuerier struct { + querier +} + +func (q *verticalQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { + return q.sel(q.blocks, ms) +} + +func (q *verticalQuerier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { + if len(qs) == 0 { + return EmptySeriesSet(), nil + } + if len(qs) == 1 { + return qs[0].Select(ms...) + } + l := len(qs) / 2 + + a, err := q.sel(qs[:l], ms) + if err != nil { + return nil, err + } + b, err := q.sel(qs[l:], ms) + if err != nil { + return nil, err + } + return newMergedVerticalSeriesSet(a, b), nil +} + // NewBlockQuerier returns a querier against the reader. func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { indexr, err := b.Index() @@ -444,6 +473,72 @@ func (s *mergedSeriesSet) Next() bool { return true } +type mergedVerticalSeriesSet struct { + a, b SeriesSet + cur Series + adone, bdone bool +} + +// NewMergedVerticalSeriesSet takes two series sets as a single series set. +// The input series sets must be sorted and +// the time ranges of the series can be overlapping. +func NewMergedVerticalSeriesSet(a, b SeriesSet) SeriesSet { + return newMergedVerticalSeriesSet(a, b) +} + +func newMergedVerticalSeriesSet(a, b SeriesSet) *mergedVerticalSeriesSet { + s := &mergedVerticalSeriesSet{a: a, b: b} + // Initialize first elements of both sets as Next() needs + // one element look-ahead. + s.adone = !s.a.Next() + s.bdone = !s.b.Next() + + return s +} + +func (s *mergedVerticalSeriesSet) At() Series { + return s.cur +} + +func (s *mergedVerticalSeriesSet) Err() error { + if s.a.Err() != nil { + return s.a.Err() + } + return s.b.Err() +} + +func (s *mergedVerticalSeriesSet) compare() int { + if s.adone { + return 1 + } + if s.bdone { + return -1 + } + return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) +} + +func (s *mergedVerticalSeriesSet) Next() bool { + if s.adone && s.bdone || s.Err() != nil { + return false + } + + d := s.compare() + + // Both sets contain the current series. Chain them into a single one. + if d > 0 { + s.cur = s.b.At() + s.bdone = !s.b.Next() + } else if d < 0 { + s.cur = s.a.At() + s.adone = !s.a.Next() + } else { + s.cur = &verticalChainedSeries{series: []Series{s.a.At(), s.b.At()}} + s.adone = !s.a.Next() + s.bdone = !s.b.Next() + } + return true +} + // ChunkSeriesSet exposes the chunks and intervals of a series instead of the // actual series itself. type ChunkSeriesSet interface { @@ -739,6 +834,100 @@ func (it *chainedSeriesIterator) Err() error { return it.cur.Err() } +// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series. +// They all must have the same labels. +type verticalChainedSeries struct { + series []Series +} + +func (s *verticalChainedSeries) Labels() labels.Labels { + return s.series[0].Labels() +} + +func (s *verticalChainedSeries) Iterator() SeriesIterator { + return newVerticalMergeSeriesIterator(s.series...) +} + +// verticalMergeSeriesIterator implements a series iterater over a list +// of time-sorted, time-overlapping iterators. +type verticalMergeSeriesIterator struct { + a, b SeriesIterator + aok, bok, initialized bool + + curT int64 + curV float64 +} + +func newVerticalMergeSeriesIterator(s ...Series) SeriesIterator { + if len(s) == 1 { + return s[0].Iterator() + } else if len(s) == 2 { + return &verticalMergeSeriesIterator{ + a: s[0].Iterator(), + b: s[1].Iterator(), + } + } + return &verticalMergeSeriesIterator{ + a: s[0].Iterator(), + b: newVerticalMergeSeriesIterator(s[1:]...), + } +} + +func (it *verticalMergeSeriesIterator) Seek(t int64) bool { + it.aok, it.bok = it.a.Seek(t), it.b.Seek(t) + it.initialized = true + return it.Next() +} + +func (it *verticalMergeSeriesIterator) Next() bool { + if !it.initialized { + it.aok = it.a.Next() + it.bok = it.b.Next() + it.initialized = true + } + + if !it.aok && !it.bok { + return false + } + + if !it.aok { + it.curT, it.curV = it.b.At() + it.bok = it.b.Next() + return true + } + if !it.bok { + it.curT, it.curV = it.a.At() + it.aok = it.a.Next() + return true + } + + acurT, acurV := it.a.At() + bcurT, bcurV := it.b.At() + if acurT < bcurT { + it.curT, it.curV = acurT, acurV + it.aok = it.a.Next() + } else if acurT > bcurT { + it.curT, it.curV = bcurT, bcurV + it.bok = it.b.Next() + } else { + it.curT, it.curV = bcurT, bcurV + it.aok = it.a.Next() + it.bok = it.b.Next() + } + return true +} + +func (it *verticalMergeSeriesIterator) At() (t int64, v float64) { + return it.curT, it.curV +} + +func (it *verticalMergeSeriesIterator) Err() error { + if it.a.Err() != nil { + return it.a.Err() + } + return it.b.Err() +} + // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { diff --git a/querier_test.go b/querier_test.go index c75d8d52..ee426d04 100644 --- a/querier_test.go +++ b/querier_test.go @@ -178,7 +178,7 @@ Outer: } } -func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { +func expandSeriesIterator(it SeriesIterator) (r []tsdbutil.Sample, err error) { for it.Next() { t, v := it.At() r = append(r, sample{t: t, v: v}) @@ -194,7 +194,7 @@ type seriesSamples struct { // Index: labels -> postings -> chunkMetas -> chunkRef // ChunkReader: ref -> vals -func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) { +func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader, int64, int64) { sort.Slice(tc, func(i, j int) bool { return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 }) @@ -203,6 +203,8 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) { chkReader := mockChunkReader(make(map[uint64]chunkenc.Chunk)) lblIdx := make(map[string]stringset) mi := newMockIndex() + blockMint := int64(math.MaxInt64) + blockMaxt := int64(math.MinInt64) for i, s := range tc { i = i + 1 // 0 is not a valid posting. @@ -211,6 +213,13 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) { // Collisions can be there, but for tests, its fine. ref := rand.Uint64() + if chk[0].t < blockMint { + blockMint = chk[0].t + } + if chk[len(chk)-1].t > blockMaxt { + blockMaxt = chk[len(chk)-1].t + } + metas = append(metas, chunks.Meta{ MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t, @@ -248,7 +257,7 @@ func createIdxChkReaders(tc []seriesSamples) (IndexReader, ChunkReader) { return mi.WritePostings(l.Name, l.Value, p) }) - return mi, chkReader + return mi, chkReader, blockMint, blockMaxt } func TestBlockQuerier(t *testing.T) { @@ -355,7 +364,7 @@ func TestBlockQuerier(t *testing.T) { Outer: for _, c := range cases.queries { - ir, cr := createIdxChkReaders(cases.data) + ir, cr, _, _ := createIdxChkReaders(cases.data) querier := &blockQuerier{ index: ir, chunks: cr, @@ -517,7 +526,7 @@ func TestBlockQuerierDelete(t *testing.T) { Outer: for _, c := range cases.queries { - ir, cr := createIdxChkReaders(cases.data) + ir, cr, _, _ := createIdxChkReaders(cases.data) querier := &blockQuerier{ index: ir, chunks: cr, @@ -924,6 +933,46 @@ func TestSeriesIterator(t *testing.T) { }) t.Run("Chain", func(t *testing.T) { + // Extra cases for overlapping series. + itcasesExtra := []struct { + a, b, c []tsdbutil.Sample + exp []tsdbutil.Sample + mint, maxt int64 + }{ + { + a: []tsdbutil.Sample{ + sample{1, 2}, sample{2, 3}, sample{3, 5}, sample{6, 1}, + }, + b: []tsdbutil.Sample{ + sample{5, 49}, sample{7, 89}, sample{9, 8}, + }, + c: []tsdbutil.Sample{ + sample{2, 33}, sample{4, 44}, sample{10, 3}, + }, + + exp: []tsdbutil.Sample{ + sample{1, 2}, sample{2, 33}, sample{3, 5}, sample{4, 44}, sample{5, 49}, sample{6, 1}, sample{7, 89}, sample{9, 8}, sample{10, 3}, + }, + mint: math.MinInt64, + maxt: math.MaxInt64, + }, + { + a: []tsdbutil.Sample{ + sample{1, 2}, sample{2, 3}, sample{9, 5}, sample{13, 1}, + }, + b: []tsdbutil.Sample{}, + c: []tsdbutil.Sample{ + sample{1, 23}, sample{2, 342}, sample{3, 25}, sample{6, 11}, + }, + + exp: []tsdbutil.Sample{ + sample{1, 23}, sample{2, 342}, sample{3, 25}, sample{6, 11}, sample{9, 5}, sample{13, 1}, + }, + mint: math.MinInt64, + maxt: math.MaxInt64, + }, + } + for _, tc := range itcases { a, b, c := itSeries{newListSeriesIterator(tc.a)}, itSeries{newListSeriesIterator(tc.b)}, @@ -939,30 +988,55 @@ func TestSeriesIterator(t *testing.T) { testutil.Equals(t, smplExp, smplRes) } + for _, tc := range append(itcases, itcasesExtra...) { + a, b, c := itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)} + + res := newVerticalMergeSeriesIterator(a, b, c) + exp := newListSeriesIterator([]tsdbutil.Sample(tc.exp)) + + smplExp, errExp := expandSeriesIterator(exp) + smplRes, errRes := expandSeriesIterator(res) + + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } + t.Run("Seek", func(t *testing.T) { for _, tc := range seekcases { - a, b, c := itSeries{newListSeriesIterator(tc.a)}, - itSeries{newListSeriesIterator(tc.b)}, - itSeries{newListSeriesIterator(tc.c)} + ress := []SeriesIterator{ + newChainedSeriesIterator( + itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)}, + ), + newVerticalMergeSeriesIterator( + itSeries{newListSeriesIterator(tc.a)}, + itSeries{newListSeriesIterator(tc.b)}, + itSeries{newListSeriesIterator(tc.c)}, + ), + } - res := newChainedSeriesIterator(a, b, c) - exp := newListSeriesIterator(tc.exp) + for _, res := range ress { + exp := newListSeriesIterator(tc.exp) - testutil.Equals(t, tc.success, res.Seek(tc.seek)) + testutil.Equals(t, tc.success, res.Seek(tc.seek)) - if tc.success { - // Init the list and then proceed to check. - remaining := exp.Next() - testutil.Assert(t, remaining == true, "") + if tc.success { + // Init the list and then proceed to check. + remaining := exp.Next() + testutil.Assert(t, remaining == true, "") - for remaining { - sExp, eExp := exp.At() - sRes, eRes := res.At() - testutil.Equals(t, eExp, eRes) - testutil.Equals(t, sExp, sRes) + for remaining { + sExp, eExp := exp.At() + sRes, eRes := res.At() + testutil.Equals(t, eExp, eRes) + testutil.Equals(t, sExp, sRes) - remaining = exp.Next() - testutil.Equals(t, remaining, res.Next()) + remaining = exp.Next() + testutil.Equals(t, remaining, res.Next()) + } } } } @@ -1159,6 +1233,7 @@ func BenchmarkPersistedQueries(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) + block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, 1, int64(nSamples))), nil) testutil.Ok(b, err) defer block.Close() @@ -1439,3 +1514,178 @@ func (it *listSeriesIterator) Seek(t int64) bool { func (it *listSeriesIterator) Err() error { return nil } + +func BenchmarkQueryIterator(b *testing.B) { + cases := []struct { + numBlocks int + numSeries int + numSamplesPerSeriesPerBlock int + overlapPercentages []int // >=0, <=100, this is w.r.t. the previous block. + }{ + { + numBlocks: 20, + numSeries: 1000, + numSamplesPerSeriesPerBlock: 20000, + overlapPercentages: []int{0, 10, 30}, + }, + } + + for _, c := range cases { + for _, overlapPercentage := range c.overlapPercentages { + benchMsg := fmt.Sprintf("nBlocks=%d,nSeries=%d,numSamplesPerSeriesPerBlock=%d,overlap=%d%%", + c.numBlocks, c.numSeries, c.numSamplesPerSeriesPerBlock, overlapPercentage) + + b.Run(benchMsg, func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_query_iterator") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + + var ( + blocks []*Block + overlapDelta = int64(overlapPercentage * c.numSamplesPerSeriesPerBlock / 100) + prefilledLabels []map[string]string + generatedSeries []Series + ) + for i := int64(0); i < int64(c.numBlocks); i++ { + offset := i * overlapDelta + mint := i*int64(c.numSamplesPerSeriesPerBlock) - offset + maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 + if len(prefilledLabels) == 0 { + generatedSeries = genSeries(c.numSeries, 10, mint, maxt) + for _, s := range generatedSeries { + prefilledLabels = append(prefilledLabels, s.Labels().Map()) + } + } else { + generatedSeries = populateSeries(prefilledLabels, mint, maxt) + } + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + testutil.Ok(b, err) + blocks = append(blocks, block) + defer block.Close() + } + + que := &querier{ + blocks: make([]Querier, 0, len(blocks)), + } + for _, blk := range blocks { + q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64) + testutil.Ok(b, err) + que.blocks = append(que.blocks, q) + } + + var sq Querier = que + if overlapPercentage > 0 { + sq = &verticalQuerier{ + querier: *que, + } + } + defer sq.Close() + + b.ResetTimer() + b.ReportAllocs() + + ss, err := sq.Select(labels.NewMustRegexpMatcher("__name__", ".*")) + testutil.Ok(b, err) + for ss.Next() { + it := ss.At().Iterator() + for it.Next() { + } + testutil.Ok(b, it.Err()) + } + testutil.Ok(b, ss.Err()) + testutil.Ok(b, err) + }) + } + } +} + +func BenchmarkQuerySeek(b *testing.B) { + cases := []struct { + numBlocks int + numSeries int + numSamplesPerSeriesPerBlock int + overlapPercentages []int // >=0, <=100, this is w.r.t. the previous block. + }{ + { + numBlocks: 20, + numSeries: 100, + numSamplesPerSeriesPerBlock: 2000, + overlapPercentages: []int{0, 10, 30, 50}, + }, + } + + for _, c := range cases { + for _, overlapPercentage := range c.overlapPercentages { + benchMsg := fmt.Sprintf("nBlocks=%d,nSeries=%d,numSamplesPerSeriesPerBlock=%d,overlap=%d%%", + c.numBlocks, c.numSeries, c.numSamplesPerSeriesPerBlock, overlapPercentage) + + b.Run(benchMsg, func(b *testing.B) { + dir, err := ioutil.TempDir("", "bench_query_iterator") + testutil.Ok(b, err) + defer func() { + testutil.Ok(b, os.RemoveAll(dir)) + }() + + var ( + blocks []*Block + overlapDelta = int64(overlapPercentage * c.numSamplesPerSeriesPerBlock / 100) + prefilledLabels []map[string]string + generatedSeries []Series + ) + for i := int64(0); i < int64(c.numBlocks); i++ { + offset := i * overlapDelta + mint := i*int64(c.numSamplesPerSeriesPerBlock) - offset + maxt := mint + int64(c.numSamplesPerSeriesPerBlock) - 1 + if len(prefilledLabels) == 0 { + generatedSeries = genSeries(c.numSeries, 10, mint, maxt) + for _, s := range generatedSeries { + prefilledLabels = append(prefilledLabels, s.Labels().Map()) + } + } else { + generatedSeries = populateSeries(prefilledLabels, mint, maxt) + } + block, err := OpenBlock(nil, createBlock(b, dir, generatedSeries), nil) + testutil.Ok(b, err) + blocks = append(blocks, block) + defer block.Close() + } + + que := &querier{ + blocks: make([]Querier, 0, len(blocks)), + } + for _, blk := range blocks { + q, err := NewBlockQuerier(blk, math.MinInt64, math.MaxInt64) + testutil.Ok(b, err) + que.blocks = append(que.blocks, q) + } + + var sq Querier = que + if overlapPercentage > 0 { + sq = &verticalQuerier{ + querier: *que, + } + } + defer sq.Close() + + mint := blocks[0].meta.MinTime + maxt := blocks[len(blocks)-1].meta.MaxTime + + b.ResetTimer() + b.ReportAllocs() + + ss, err := sq.Select(labels.NewMustRegexpMatcher("__name__", ".*")) + for ss.Next() { + it := ss.At().Iterator() + for t := mint; t <= maxt; t++ { + it.Seek(t) + } + testutil.Ok(b, it.Err()) + } + testutil.Ok(b, ss.Err()) + testutil.Ok(b, err) + }) + } + } +}