Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Make sure blocks don't overlap to avoid outsider chunks #348

Merged
merged 4 commits into from
Jul 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion block.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ Outer:
}

for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}}
Expand Down Expand Up @@ -539,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error {
return nil
}

// Returns true if the block overlaps [mint, maxt].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit confused. You have 2 method name OverlapsClosedInterval and comment Returns true if the block overlaps [mint, maxt]. but with different logic.

The inner comments and logic make sense though. I think we could leave it as methods and name it just isOverlapping (or ...halfOpen.. for block and ...closed... for chunk) and move inner comments to method comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are good place to explain the difference in time ranges between block and chunks maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I can see why you find it confusing. The thing is that the argument of the OverlapsClosedInterval() method is always a closed interval; the difference is in the object calling the method.

In the case of block.OverlapsClosedInterval(mint, maxt), we compare the half-open [block.MinTime, block.MaxTime) with the closed [mint, maxt]; and in the case of chunk.OverlapsClosedInterval(mint, maxt), we compare the closed [chunk.MinTime, chunk.MaxTime] with the closed [mint, maxt].

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you're right, I should expand on the difference between block and chunk intervals in those method descriptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, but one is Closed, second is Half-Closed interval, right?

Copy link
Contributor

@bwplotka bwplotka Jul 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or even Overlaps .. basically we want to know if mint and maxt overlaps with block or chunk. And it gives us right answer (e.g: if mint is touching block.maxt - there is NO overlap)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% convinced it's clearer to just call it Overlaps. I wanted to highlight the fact the mint and maxt arguments refer to a closed interval to avoid someone using it to compare a block with another block, in which case [mint, maxt) would be a half-open interval, whereas the method expects it to be closed.

That being said, if it's the only thing keeping this PR from being merged, I'm happy to make the change.

Copy link
Contributor

@bwplotka bwplotka Jul 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We need someone from maintainers to approve this PR anyway, so maybe someone can put his view on this as well, I don't want to block this PR either (:
  2. If I do block1.Overlaps(block2.min, block2.max) is there anything confusing by the fact that it will check overlaps using [a, b) because blocks are like this? I think having something like block1.OverlapsClosedInterval(block2.min, block2.max) would be confusing for future readers of this code, because it is saying closed but actually it is half open overlap logic. But I might be wrong here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But block1.OverlapsClosedInterval(block2.min, block2.max) would return the wrong result, because [block2.min, block2.max) isn't a closed interval. You would get block1.min <= block2.max && block2.min < block1.max instead of the correct value for two [a, b) intervals, which is block1.min < block2.max && block2.min < block1.max.

That's why I wanted the method name to explicitly state that it requires a closed interval as its argument; it doesn't work on other types of intervals.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, agree (:

func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool {
// The block itself is a half-open interval
// [pb.meta.MinTime, pb.meta.MaxTime).
return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime
}

func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint
Expand Down
6 changes: 6 additions & 0 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error {
return nil
}

// Returns true if the chunk overlaps [mint, maxt].
func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm.MinTime <= maxt && mint <= cm.MaxTime
}

var (
errInvalidSize = fmt.Errorf("invalid size")
errInvalidFlag = fmt.Errorf("invalid flag")
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
for i, chk := range chks {
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue
}

Expand Down
19 changes: 9 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,13 @@ func (db *DB) compact() (changes bool, err error) {
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
// We remove 1 millisecond from maxt because block
// intervals are half-open: [b.MinTime, b.MaxTime). But
// chunk intervals are closed: [c.MinTime, c.MaxTime];
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
return changes, errors.Wrap(err, "persist head block")
Expand Down Expand Up @@ -756,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
defer db.mtx.RUnlock()

for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if b.OverlapsClosedInterval(mint, maxt) {
blocks = append(blocks, b)
}
}
Expand Down Expand Up @@ -799,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
defer db.mtx.RUnlock()

for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
if b.OverlapsClosedInterval(mint, maxt) {
g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
Expand Down Expand Up @@ -848,11 +852,6 @@ func (db *DB) CleanTombstones() (err error) {
return errors.Wrap(db.reload(), "reload blocks")
}

func intervalOverlap(amin, amax, bmin, bmax int64) bool {
// Checks Overlap: http://stackoverflow.com/questions/3269434/
return amin <= bmax && bmin <= amax
}

func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
Expand Down
89 changes: 89 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
)
Expand Down Expand Up @@ -1096,3 +1098,90 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
{Min: 8, Max: 9}: {nc1[8], nc1[9]}, // 7-10, 8-9
}, OverlappingBlocks(nc1))
}

// Regression test for https://github.com/prometheus/tsdb/issues/347
func TestChunkAtBlockBoundary(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()

app := db.Appender()

blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")

for i := int64(0); i < 3; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
testutil.Ok(t, err)
}

err := app.Commit()
testutil.Ok(t, err)

_, err = db.compact()
testutil.Ok(t, err)

for _, block := range db.blocks {
r, err := block.Index()
testutil.Ok(t, err)
defer r.Close()

meta := block.Meta()

p, err := r.Postings(index.AllPostingsKey())
testutil.Ok(t, err)

var (
lset labels.Labels
chks []chunks.Meta
)

chunkCount := 0

for p.Next() {
err = r.Series(p.At(), &lset, &chks)
testutil.Ok(t, err)
for _, c := range chks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we at least count chunks here? We should have 2 blocks (?) one chunk in each I think. That is the only thing I wanted to add in my extension. (wanted to assert on exact chunk time range, but just counting these would be nice as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can assert the number of chunks per block, yes.

Not sure about checking the number of blocks, because it depends on this condition in db.compact():

if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
        break
}

which could change and break the test for no reason. (In fact, we only have one block here because of this condition.)

Copy link
Contributor

@bwplotka bwplotka Jun 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is a priority to test if actually, next block will include the chunk we ignore with

head := &rangeHead{
 			head: db.head,	 			head: db.head,
 			mint: mint,	 			mint: mint,
			// We remove 1 millisecond from maxt because block
			// intervals are half-open: [b.MinTime, b.MaxTime). But
			// chunk intervals are closed: [c.MinTime, c.MaxTime];
			// so in order to make sure that overlaps are evaluated
			// consistently, we explicitly remove the last value
			// from the block interval here.
			maxt: maxt - 1,
 		}	 		}

Can we somehow make it test resilient on change here?

if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
        break
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I'll look into it.

testutil.Assert(t, meta.MinTime <= c.MinTime && c.MaxTime <= meta.MaxTime,
"chunk spans beyond block boundaries: [block.MinTime=%d, block.MaxTime=%d]; [chunk.MinTime=%d, chunk.MaxTime=%d]",
meta.MinTime, meta.MaxTime, c.MinTime, c.MaxTime)
chunkCount++
}
}
testutil.Assert(t, chunkCount == 1, "expected 1 chunk in block %s, got %d", meta.ULID, chunkCount)
}
}

func TestQuerierWithBoundaryChunks(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()

app := db.Appender()

blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")

for i := int64(0); i < 5; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
}

err := app.Commit()
testutil.Ok(t, err)

_, err = db.compact()
testutil.Ok(t, err)

testutil.Assert(t, len(db.blocks) >= 3, "invalid test, less than three blocks in DB")

q, err := db.Querier(blockRange, 2*blockRange)
testutil.Ok(t, err)
defer q.Close()

// The requested interval covers 2 blocks, so the querier should contain 2 blocks.
count := len(q.(*querier).blocks)
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}
18 changes: 9 additions & 9 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s.Lock()
c := s.chunk(int(cid))

// This means that the chunk has been garbage collected.
if c == nil {
// This means that the chunk has been garbage collected or is outside
// the specified range.
if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, ErrNotFound
}

mint, maxt := c.minTime, c.maxTime
s.Unlock()

// Do not expose chunks that are outside of the specified range.
if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) {
return nil, ErrNotFound
}
return &safeChunk{
Chunk: c.chunk,
s: s,
Expand Down Expand Up @@ -852,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks

for i, c := range s.chunks {
// Do not expose chunks that are outside of the specified range.
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
*chks = append(*chks, chunks.Meta{
Expand Down Expand Up @@ -1291,6 +1286,11 @@ type memChunk struct {
minTime, maxTime int64
}

// Returns true if the chunk overlaps [mint, maxt].
func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool {
return mc.minTime <= maxt && mint <= mc.maxTime
}

type memSafeIterator struct {
chunkenc.Iterator

Expand Down