Skip to content

Commit

Permalink
no overlapping on compaction when an existing block is not within def…
Browse files Browse the repository at this point in the history
…ault boundaries. (prometheus#461)

closes prometheus#4643

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
  • Loading branch information
krasi-georgiev committed Dec 4, 2018
1 parent 83a4ef1 commit bac9cbe
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -3,3 +3,5 @@
- `LastCheckpoint` used to return just the segment name and now it returns the full relative path.
- `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct.
- `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors.
- `Head.Init()` is changed to `Head.Init(minValidTime int64)` where `minValidTime` is taken from the maxt of the last persisted block and any samples below `minValidTime` will not be loaded from the wal in the head. The same value is used when using the `Heah.Appender()` to disallow adding samples below the `minValidTime` timestamp. This change was nececary to fix a bug where a `Snapshot()` with the head included would create a block with custom time range(not bound to the default time ranges) and the next block population from the head would create an overlapping block.
- https://github.com/prometheus/tsdb/issues/446
10 changes: 5 additions & 5 deletions block_test.go
Expand Up @@ -77,8 +77,9 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {
return b
}

// createPopulatedBlock creates a block with nSeries series, and nSamples samples.
func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Block {
// createPopulatedBlock creates a block with nSeries series, filled with
// samples of the given mint,maxt time range.
func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) *Block {
head, err := NewHead(nil, nil, nil, 2*60*60*1000)
testutil.Ok(tb, err)
defer head.Close()
Expand All @@ -87,12 +88,11 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries, nSamples int) *Blo
testutil.Ok(tb, err)
refs := make([]uint64, nSeries)

for n := 0; n < nSamples; n++ {
for ts := mint; ts <= maxt; ts++ {
app := head.Appender()
ts := n * 1000
for i, lbl := range lbls {
if refs[i] != 0 {
err := app.AddFast(refs[i], int64(ts), rand.Float64())
err := app.AddFast(refs[i], ts, rand.Float64())
if err == nil {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion compact_test.go
Expand Up @@ -733,7 +733,7 @@ func TestDisableAutoCompactions(t *testing.T) {
case db.compactc <- struct{}{}:
default:
}
for x := 0; x < 10; x++ {
for x := 0; x < 20; x++ {
if len(db.Blocks()) > 0 {
break
}
Expand Down
23 changes: 16 additions & 7 deletions db.go
Expand Up @@ -271,12 +271,21 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err != nil {
return nil, err
}
if err := db.head.Init(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}

if err := db.reload(); err != nil {
return nil, err
}
// Set the min valid time for the ingested samples
// to be no lower than the maxt of the last block.
blocks := db.Blocks()
minValidTime := int64(math.MinInt64)
if len(blocks) > 0 {
minValidTime = blocks[len(blocks)-1].Meta().MaxTime
}

if err := db.head.Init(minValidTime); err != nil {
return nil, errors.Wrap(err, "read WAL")
}

go db.run()

Expand Down Expand Up @@ -395,7 +404,8 @@ func (db *DB) compact() (err error) {
if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
break
}
mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0])
mint := db.head.MinTime()
maxt := rangeForTimestamp(mint, db.opts.BlockRanges[0])

// Wrap head into a range that bounds all reads to it.
head := &rangeHead{
Expand Down Expand Up @@ -826,9 +836,8 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) {
return sq, nil
}

func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
mint = (t / width) * width
return mint, mint + width
func rangeForTimestamp(t int64, width int64) (maxt int64) {
return (t/width)*width + width
}

// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
Expand Down
105 changes: 105 additions & 0 deletions db_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -1199,6 +1200,11 @@ func TestQuerierWithBoundaryChunks(t *testing.T) {
testutil.Assert(t, count == 2, "expected 2 blocks in querier, got %d", count)
}

// TestInitializeHeadTimestamp ensures that the h.minTime is set properly.
// - no blocks no WAL: set to the time of the first appended sample
// - no blocks with WAL: set to the smallest sample from the WAL
// - with blocks no WAL: set to the last block maxT
// - with blocks with WAL: same as above
func TestInitializeHeadTimestamp(t *testing.T) {
t.Run("clean", func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_head_init")
Expand Down Expand Up @@ -1441,3 +1447,102 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

// TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlaping blocks
// even when the last blocks is not within the default boundaries.
// - Lower bondary is based on the smallest sample in the head and
// upper boundary is rounded to the configured block range.
//
// This ensures that a snapshot that includes the head and creates a block with a custom time range
// will not overlap with the first block created by the next compaction.
func TestBlockRanges(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

dir, err := ioutil.TempDir("", "test_storage")
if err != nil {
t.Fatalf("Opening test dir failed: %s", err)
}

rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1

// Test that the compactor doesn't create overlapping blocks
// when a non standard block already exists.
firstBlockMaxT := int64(3)
createPopulatedBlock(t, dir, 1, 0, firstBlockMaxT)
db, err := Open(dir, logger, nil, DefaultOptions)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
defer func() {
os.RemoveAll(dir)
}()
app := db.Appender()
lbl := labels.Labels{{"a", "b"}}
_, err = app.Add(lbl, firstBlockMaxT-1, rand.Float64())
if err == nil {
t.Fatalf("appending a sample with a timestamp covered by a previous block shouldn't be possible")
}
_, err = app.Add(lbl, firstBlockMaxT+1, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, firstBlockMaxT+2, rand.Float64())
testutil.Ok(t, err)
secondBlockMaxt := firstBlockMaxT + rangeToTriggercompaction
_, err = app.Add(lbl, secondBlockMaxt, rand.Float64()) // Add samples to trigger a new compaction

testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
for x := 1; x < 10; x++ {
if len(db.Blocks()) == 2 {
break
}
time.Sleep(100 * time.Millisecond)
}
testutil.Equals(t, 2, len(db.Blocks()), "no new block created after the set timeout")

if db.Blocks()[0].Meta().MaxTime > db.Blocks()[1].Meta().MinTime {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[0].Meta(), db.Blocks()[1].Meta())
}

// Test that wal records are skipped when an existing block covers the same time ranges
// and compaction doesn't create an overlapping block.
db.DisableCompactions()
_, err = app.Add(lbl, secondBlockMaxt+1, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+2, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+3, rand.Float64())
testutil.Ok(t, err)
_, err = app.Add(lbl, secondBlockMaxt+4, rand.Float64())
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Close())

thirdBlockMaxt := secondBlockMaxt + 2
createPopulatedBlock(t, dir, 1, secondBlockMaxt+1, thirdBlockMaxt)

db, err = Open(dir, logger, nil, DefaultOptions)
if err != nil {
t.Fatalf("Opening test storage failed: %s", err)
}
defer db.Close()
testutil.Equals(t, 3, len(db.Blocks()), "db doesn't include expected number of blocks")
testutil.Equals(t, db.Blocks()[2].Meta().MaxTime, thirdBlockMaxt, "unexpected maxt of the last block")

app = db.Appender()
_, err = app.Add(lbl, thirdBlockMaxt+rangeToTriggercompaction, rand.Float64()) // Trigger a compaction
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
for x := 1; x < 10; x++ {
if len(db.Blocks()) == 4 {
break
}
time.Sleep(100 * time.Millisecond)
}

testutil.Equals(t, 4, len(db.Blocks()), "no new block created after the set timeout")

if db.Blocks()[2].Meta().MaxTime > db.Blocks()[3].Meta().MinTime {
t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta())
}
}
37 changes: 23 additions & 14 deletions head.go
Expand Up @@ -59,7 +59,8 @@ type Head struct {
appendPool sync.Pool
bytesPool sync.Pool

minTime, maxTime int64
minTime, maxTime int64 // Current min and max of the samples included in the head.
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
lastSeriesID uint64

// All series addressable by their ID or hash.
Expand Down Expand Up @@ -300,13 +301,6 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) {
}

func (h *Head) loadWAL(r *wal.Reader) error {
minValidTime := h.MinTime()
// If the min time is still uninitialized (no persisted blocks yet),
// we accept all sample timestamps from the WAL.
if minValidTime == math.MaxInt64 {
minValidTime = math.MinInt64
}

// Track number of samples that referenced a series we don't know about
// for error reporting.
var unknownRefs uint64
Expand All @@ -327,7 +321,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
inputs[i] = make(chan []RefSample, 300)

go func(input <-chan []RefSample, output chan<- []RefSample) {
unknown := h.processWALSamples(minValidTime, input, output)
unknown := h.processWALSamples(h.minValidTime, input, output)
atomic.AddUint64(&unknownRefs, unknown)
wg.Done()
}(inputs[i], outputs[i])
Expand Down Expand Up @@ -410,7 +404,7 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}
for _, s := range tstones {
for _, itv := range s.intervals {
if itv.Maxt < minValidTime {
if itv.Maxt < h.minValidTime {
continue
}
h.tombstones.addInterval(s.ref, itv)
Expand Down Expand Up @@ -443,8 +437,12 @@ func (h *Head) loadWAL(r *wal.Reader) error {
}

// Init loads data from the write ahead log and prepares the head for writes.
func (h *Head) Init() error {
// It should be called before using an appender so that
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime = minValidTime
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.

if h.wal == nil {
return nil
Expand Down Expand Up @@ -486,6 +484,7 @@ func (h *Head) Init() error {
if err := h.wal.Repair(err); err != nil {
return errors.Wrap(err, "repair corrupted WAL")
}

return nil
}

Expand All @@ -502,6 +501,7 @@ func (h *Head) Truncate(mint int64) (err error) {
return nil
}
atomic.StoreInt64(&h.minTime, mint)
h.minValidTime = mint

// Ensure that max time is at least as high as min time.
for h.MaxTime() < mint {
Expand Down Expand Up @@ -654,14 +654,23 @@ func (h *Head) Appender() Appender {

func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
minValidTime: h.MaxTime() - h.chunkRange/2,
head: h,
// Set the minimum valid time to whichever is greater the head min valid time or the compaciton window.
// This ensures that no samples will be added within the compaction window to avoid races.
minValidTime: max(h.minValidTime, h.MaxTime()-h.chunkRange/2),
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}

func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

func (h *Head) getAppendBuffer() []RefSample {
b := h.appendPool.Get()
if b == nil {
Expand Down Expand Up @@ -1411,7 +1420,7 @@ func (s *memSeries) cut(mint int64) *memChunk {

// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
_, s.nextAt = rangeForTimestamp(mint, s.chunkRange)
s.nextAt = rangeForTimestamp(mint, s.chunkRange)

app, err := c.chunk.Appender()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions head_test.go
Expand Up @@ -15,6 +15,7 @@ package tsdb

import (
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -123,7 +124,7 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Ok(t, err)
defer head.Close()

testutil.Ok(t, head.Init())
testutil.Ok(t, head.Init(math.MinInt64))
testutil.Equals(t, uint64(100), head.lastSeriesID)

s10 := head.series.getByID(10)
Expand All @@ -132,7 +133,7 @@ func TestHead_ReadWAL(t *testing.T) {
s100 := head.series.getByID(100)

testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
testutil.Equals(t, labels.FromStrings("a", "2"), s11.lset)
testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init().
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)

Expand All @@ -146,7 +147,6 @@ func TestHead_ReadWAL(t *testing.T) {
}

testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
testutil.Equals(t, 0, len(s11.chunks))
testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
}
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
testutil.Ok(t, err)
defer head.Close()

testutil.Ok(t, head.Init())
testutil.Ok(t, head.Init(math.MinInt64))

testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
}
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestWalRepair(t *testing.T) {

h, err := NewHead(nil, nil, w, 1)
testutil.Ok(t, err)
testutil.Ok(t, h.Init())
testutil.Ok(t, h.Init(math.MinInt64))

sr, err := wal.NewSegmentsReader(dir)
testutil.Ok(t, err)
Expand Down
2 changes: 1 addition & 1 deletion querier_test.go
Expand Up @@ -1292,7 +1292,7 @@ func BenchmarkPersistedQueries(b *testing.B) {
dir, err := ioutil.TempDir("", "bench_persisted")
testutil.Ok(b, err)
defer os.RemoveAll(dir)
block := createPopulatedBlock(b, dir, nSeries, nSamples)
block := createPopulatedBlock(b, dir, nSeries, 1, int64(nSamples))
defer block.Close()

q, err := NewBlockQuerier(block, block.Meta().MinTime, block.Meta().MaxTime)
Expand Down

0 comments on commit bac9cbe

Please sign in to comment.