diff --git a/block.go b/block.go index 1d5ebfb1..664ead49 100644 --- a/block.go +++ b/block.go @@ -447,7 +447,7 @@ Outer: } for _, chk := range chks { - if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { + if chk.OverlapsClosedInterval(mint, maxt) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) stones[p.At()] = Intervals{{tmin, tmax}} @@ -539,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error { return nil } +// Returns true if the block overlaps [mint, maxt]. +func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { + // The block itself is a half-open interval + // [pb.meta.MinTime, pb.meta.MaxTime). + return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime +} + func clampInterval(a, b, mint, maxt int64) (int64, int64) { if a < mint { a = mint diff --git a/chunks/chunks.go b/chunks/chunks.go index 795740ff..5eab2398 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error { return nil } +// Returns true if the chunk overlaps [mint, maxt]. +func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { + // The chunk itself is a closed interval [cm.MinTime, cm.MaxTime]. + return cm.MinTime <= maxt && mint <= cm.MaxTime +} + var ( errInvalidSize = fmt.Errorf("invalid size") errInvalidFlag = fmt.Errorf("invalid flag") diff --git a/compact.go b/compact.go index 8743c8ba..aae8cc1c 100644 --- a/compact.go +++ b/compact.go @@ -592,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if len(dranges) > 0 { // Re-encode the chunk to not have deleted values. for i, chk := range chks { - if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { + if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { continue } diff --git a/db.go b/db.go index 28fafc09..fcfbeeeb 100644 --- a/db.go +++ b/db.go @@ -360,7 +360,13 @@ func (db *DB) compact() (changes bool, err error) { head := &rangeHead{ head: db.head, mint: mint, - maxt: maxt, + // We remove 1 millisecond from maxt because block + // intervals are half-open: [b.MinTime, b.MaxTime). But + // chunk intervals are closed: [c.MinTime, c.MaxTime]; + // so in order to make sure that overlaps are evaluated + // consistently, we explicitly remove the last value + // from the block interval here. + maxt: maxt - 1, } if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { return changes, errors.Wrap(err, "persist head block") @@ -756,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { defer db.mtx.RUnlock() for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) } } @@ -799,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { defer db.mtx.RUnlock() for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if b.OverlapsClosedInterval(mint, maxt) { g.Go(func(b *Block) func() error { return func() error { return b.Delete(mint, maxt, ms...) } }(b)) @@ -848,11 +852,6 @@ func (db *DB) CleanTombstones() (err error) { return errors.Wrap(db.reload(), "reload blocks") } -func intervalOverlap(amin, amax, bmin, bmax int64) bool { - // Checks Overlap: http://stackoverflow.com/questions/3269434/ - return amin <= bmax && bmin <= amax -} - func isBlockDir(fi os.FileInfo) bool { if !fi.IsDir() { return false diff --git a/db_test.go b/db_test.go index 72817a10..7ed21aac 100644 --- a/db_test.go +++ b/db_test.go @@ -26,6 +26,8 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -1096,3 +1098,90 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { {Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9 }, OverlappingBlocks(nc1)) } + +// Regression test for https://github.com/prometheus/tsdb/issues/347 +func TestChunkAtBlockBoundary(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + defer db.Close() + + app := db.Appender() + + blockRange := DefaultOptions.BlockRanges[0] + label := labels.FromStrings("foo", "bar") + + for i := int64(0); i < 3; i++ { + _, err := app.Add(label, i*blockRange, 0) + testutil.Ok(t, err) + _, err = app.Add(label, i*blockRange+1000, 0) + testutil.Ok(t, err) + } + + err := app.Commit() + testutil.Ok(t, err) + + _, err = db.compact() + testutil.Ok(t, err) + + for _, block := range db.blocks { + r, err := block.Index() + testutil.Ok(t, err) + defer r.Close() + + meta := block.Meta() + + p, err := r.Postings(index.AllPostingsKey()) + testutil.Ok(t, err) + + var ( + lset labels.Labels + chks []chunks.Meta + ) + + chunkCount := 0 + + for p.Next() { + err = r.Series(p.At(), &lset, &chks) + testutil.Ok(t, err) + for _, c := range chks { + testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime, + "chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]", + meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime) + chunkCount++ + } + } + testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount) + } +} + +func TestQuerierWithBoundaryChunks(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + defer db.Close() + + app := db.Appender() + + blockRange := DefaultOptions.BlockRanges[0] + label := labels.FromStrings("foo", "bar") + + for i := int64(0); i < 5; i++ { + _, err := app.Add(label, i*blockRange, 0) + testutil.Ok(t, err) + } + + err := app.Commit() + testutil.Ok(t, err) + + _, err = db.compact() + testutil.Ok(t, err) + + testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB") + + q, err := db.Querier(blockRange, 2*blockRange) + testutil.Ok(t, err) + defer q.Close() + + // The requested interval covers 2 blocks, so the querier should contain 2 blocks. + count := len(q.(*querier).blocks) + testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count) +} diff --git a/head.go b/head.go index e8a4582f..c694d20c 100644 --- a/head.go +++ b/head.go @@ -735,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s.Lock() c := s.chunk(int(cid)) - // This means that the chunk has been garbage collected. - if c == nil { + // This means that the chunk has been garbage collected or is outside + // the specified range. + if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() return nil, ErrNotFound } - - mint, maxt := c.minTime, c.maxTime s.Unlock() - // Do not expose chunks that are outside of the specified range. - if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) { - return nil, ErrNotFound - } return &safeChunk{ Chunk: c.chunk, s: s, @@ -852,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks for i, c := range s.chunks { // Do not expose chunks that are outside of the specified range. - if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } *chks = append(*chks, chunks.Meta{ @@ -1291,6 +1286,11 @@ type memChunk struct { minTime, maxTime int64 } +// Returns true if the chunk overlaps [mint, maxt]. +func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { + return mc.minTime <= maxt && mint <= mc.maxTime +} + type memSafeIterator struct { chunkenc.Iterator