Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #348 from BenoitKnecht/fix-block-boundaries
Browse files Browse the repository at this point in the history
Make sure blocks don't overlap to avoid outsider chunks
  • Loading branch information
fabxc committed Jul 5, 2018
2 parents f87d00d + 24b223c commit 77db94c
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 21 deletions.
9 changes: 8 additions & 1 deletion block.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ Outer:
}

for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}}
Expand Down Expand Up @@ -539,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error {
return nil
}

// Returns true if the block overlaps [mint, maxt].
func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
// The block itself is a half-open interval
// [pb.meta.MinTime, pb.meta.MaxTime).
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}

func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint
Expand Down
6 changes: 6 additions & 0 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error {
return nil
}

// Returns true if the chunk overlaps [mint, maxt].
func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm.MinTime <= maxt && mint <= cm.MaxTime
}

var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
for i, chk := range chks {
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue
}

Expand Down
19 changes: 9 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,13 @@ func (db *DB) compact() (changes bool, err error) {
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
// We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
return changes, errors.Wrap(err, "persist head block")
Expand Down Expand Up @@ -756,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
defer db.mtx.RUnlock()

for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if b.OverlapsClosedInterval(mint, maxt) {
blocks = append(blocks, b)
}
}
Expand Down Expand Up @@ -799,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
defer db.mtx.RUnlock()

for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if b.OverlapsClosedInterval(mint, maxt) {
g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
Expand Down Expand Up @@ -848,11 +852,6 @@ func (db *DB) CleanTombstones() (err error) {
return errors.Wrap(db.reload(), "reload blocks")
}

func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
}

func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
Expand Down
89 changes: 89 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
Expand Down Expand Up @@ -1096,3 +1098,90 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
{Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9
}, OverlappingBlocks(nc1))
}

// Regression test for https://github.com/prometheus/tsdb/issues/347
func TestChunkAtBlockBoundary(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()

app := db.Appender()

blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")

for i := int64(0); i < 3; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
}

err := app.Commit()
testutil.Ok(t, err)

_, err = db.compact()
testutil.Ok(t, err)

for _, block := range db.blocks {
r, err := block.Index()
testutil.Ok(t, err)
defer r.Close()

meta := block.Meta()

p, err := r.Postings(index.AllPostingsKey())
testutil.Ok(t, err)

var (
lset labels.Labels
chks []chunks.Meta
)

chunkCount := 0

for p.Next() {
err = r.Series(p.At(), &lset, &chks)
testutil.Ok(t, err)
for _, c := range chks {
testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
"chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]",
meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime)
chunkCount++
}
}
testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount)
}
}

func TestQuerierWithBoundaryChunks(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()

app := db.Appender()

blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")

for i := int64(0); i < 5; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
}

err := app.Commit()
testutil.Ok(t, err)

_, err = db.compact()
testutil.Ok(t, err)

testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")

q, err := db.Querier(blockRange, 2*blockRange)
testutil.Ok(t, err)
defer q.Close()

// The requested interval covers 2 blocks, so the querier should contain 2 blocks.
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}
18 changes: 9 additions & 9 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s.Lock()
c := s.chunk(int(cid))

// This means that the chunk has been garbage collected.
if c == nil {
// This means that the chunk has been garbage collected or is outside
// the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, ErrNotFound
}

mint, maxt := c.minTime, c.maxTime
s.Unlock()

// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
return nil, ErrNotFound
}
return &safeChunk{
Chunk: c.chunk,
s: s,
Expand Down Expand Up @@ -852,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks

for i, c := range s.chunks {
// Do not expose chunks that are outside of the specified range.
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
*chks = append(*chks, chunks.Meta{
Expand Down Expand Up @@ -1291,6 +1286,11 @@ type memChunk struct {
minTime, maxTime int64
}

// Returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}

type memSafeIterator struct {
chunkenc.Iterator

Expand Down

0 comments on commit 77db94c

Please sign in to comment.