Skip to content

Commit

Permalink
tsdb: turn off transaction isolation for head compaction (prometheus#…
Browse files Browse the repository at this point in the history
…11317)

* tsdb: add a basic test for read/write isolation

* tsdb: store the min time with isolationAppender
So that we can see when appending has moved past a certain point in time.

* tsdb: allow RangeHead to have isolation disabled
This will be used when for head compaction.

* tsdb: do head compaction with isolation disabled
This saves a lot of work tracking appends done while compaction is ongoing.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
  • Loading branch information
bboreham authored and roidelapluie committed Nov 23, 2022
1 parent dd4b6bf commit 5e7a5da
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 8 deletions.
10 changes: 9 additions & 1 deletion tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,15 @@ func (db *DB) Compact() (returnErr error) {
// so in order to make sure that overlaps are evaluated
// consistently, we explicitly remove the last value
// from the block interval here.
if err := db.compactHead(NewRangeHead(db.head, mint, maxt-1)); err != nil {
rh := NewRangeHeadWithIsolationDisabled(db.head, mint, maxt-1)

// Compaction runs with isolation disabled, because head.compactable()
// ensures that maxt is more than chunkRange/2 back from now, and
// head.appendableMinValidTime() ensures that no new appends can start within the compaction range.
// We do need to wait for any overlapping appenders that started previously to finish.
db.head.WaitForAppendersOverlapping(rh.MaxTime())

if err := db.compactHead(rh); err != nil {
return errors.Wrap(err, "compact head")
}
// Consider only successful compactions for WAL truncation.
Expand Down
22 changes: 21 additions & 1 deletion tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,13 @@ func (h *Head) WaitForPendingReadersInTimeRange(mint, maxt int64) {
}
}

// WaitForAppendersOverlapping waits for appends overlapping maxt to finish.
func (h *Head) WaitForAppendersOverlapping(maxt int64) {
for maxt >= h.iso.lowestAppendTime() {
time.Sleep(500 * time.Millisecond)
}
}

// IsQuerierCollidingWithTruncation returns if the current querier needs to be closed and if a new querier
// has to be created. In the latter case, the method also returns the new mint to be used for creating the
// new range head and the new querier. This methods helps preventing races with the truncation of in-memory data.
Expand Down Expand Up @@ -1036,6 +1043,8 @@ func (h *Head) Stats(statsByLabelName string) *Stats {
type RangeHead struct {
head *Head
mint, maxt int64

isolationOff bool
}

// NewRangeHead returns a *RangeHead.
Expand All @@ -1048,12 +1057,23 @@ func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
}
}

// NewRangeHeadWithIsolationDisabled returns a *RangeHead that does not create an isolationState.
func NewRangeHeadWithIsolationDisabled(head *Head, mint, maxt int64) *RangeHead {
rh := NewRangeHead(head, mint, maxt)
rh.isolationOff = true
return rh
}

func (h *RangeHead) Index() (IndexReader, error) {
return h.head.indexRange(h.mint, h.maxt), nil
}

func (h *RangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State(h.mint, h.maxt))
var isoState *isolationState
if !h.isolationOff {
isoState = h.head.iso.State(h.mint, h.maxt)
}
return h.head.chunksRange(h.mint, h.maxt, isoState)
}

func (h *RangeHead) Tombstones() (tombstones.Reader, error) {
Expand Down
5 changes: 3 additions & 2 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (h *Head) Appender(_ context.Context) storage.Appender {
}

func (h *Head) appender() *headAppender {
appendID, cleanupAppendIDsBelow := h.iso.newAppendID() // Every appender gets an ID that is cleared upon commit/rollback.
minValidTime := h.appendableMinValidTime()
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.

// Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf []exemplarWithSeriesRef
Expand All @@ -124,7 +125,7 @@ func (h *Head) appender() *headAppender {

return &headAppender{
head: h,
minValidTime: h.appendableMinValidTime(),
minValidTime: minValidTime,
mint: math.MaxInt64,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
Expand Down
4 changes: 3 additions & 1 deletion tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ type headChunkReader struct {
}

func (h *headChunkReader) Close() error {
h.isoState.Close()
if h.isoState != nil {
h.isoState.Close()
}
return nil
}

Expand Down
20 changes: 19 additions & 1 deletion tsdb/isolation.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tsdb

import (
"math"
"sync"
)

Expand Down Expand Up @@ -45,6 +46,7 @@ func (i *isolationState) IsolationDisabled() bool {

type isolationAppender struct {
appendID uint64
minTime int64
prev *isolationAppender
next *isolationAppender
}
Expand Down Expand Up @@ -116,6 +118,21 @@ func (i *isolation) lowWatermarkLocked() uint64 {
return i.appendsOpenList.next.appendID
}

// lowestAppendTime returns the lowest minTime for any open appender,
// or math.MaxInt64 if no open appenders.
func (i *isolation) lowestAppendTime() int64 {
var lowest int64 = math.MaxInt64
i.appendMtx.RLock()
defer i.appendMtx.RUnlock()

for a := i.appendsOpenList.next; a != i.appendsOpenList; a = a.next {
if lowest > a.minTime {
lowest = a.minTime
}
}
return lowest
}

// State returns an object used to control isolation
// between a query and appends. Must be closed when complete.
func (i *isolation) State(mint, maxt int64) *isolationState {
Expand Down Expand Up @@ -163,7 +180,7 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
// newAppendID increments the transaction counter and returns a new transaction
// ID. The first ID returned is 1.
// Also returns the low watermark, to keep lock/unlock operations down.
func (i *isolation) newAppendID() (uint64, uint64) {
func (i *isolation) newAppendID(minTime int64) (uint64, uint64) {
if i.disabled {
return 0, 0
}
Expand All @@ -176,6 +193,7 @@ func (i *isolation) newAppendID() (uint64, uint64) {

app := i.appendersPool.Get().(*isolationAppender)
app.appendID = i.appendsOpenList.appendID
app.minTime = minTime
app.prev = i.appendsOpenList.prev
app.next = i.appendsOpenList

Expand Down
63 changes: 61 additions & 2 deletions tsdb/isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,67 @@ import (
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestIsolation(t *testing.T) {
type result struct {
id uint64
lowWatermark uint64
}
var appendA, appendB result
iso := newIsolation(false)

// Low watermark starts at 1.
require.Equal(t, uint64(0), iso.lowWatermark())
require.Equal(t, int64(math.MaxInt64), iso.lowestAppendTime())

// Pretend we are starting to append.
appendA.id, appendA.lowWatermark = iso.newAppendID(10)
require.Equal(t, result{1, 1}, appendA)
require.Equal(t, uint64(1), iso.lowWatermark())

require.Equal(t, 0, countOpenReads(iso))
require.Equal(t, int64(10), iso.lowestAppendTime())

// Now we start a read.
stateA := iso.State(10, 20)
require.Equal(t, 1, countOpenReads(iso))

// Second appender.
appendB.id, appendB.lowWatermark = iso.newAppendID(20)
require.Equal(t, result{2, 1}, appendB)
require.Equal(t, uint64(1), iso.lowWatermark())
require.Equal(t, int64(10), iso.lowestAppendTime())

iso.closeAppend(appendA.id)
// Low watermark remains at 1 because stateA is still open
require.Equal(t, uint64(1), iso.lowWatermark())

require.Equal(t, 1, countOpenReads(iso))
require.Equal(t, int64(20), iso.lowestAppendTime())

// Finish the read and low watermark should rise.
stateA.Close()
require.Equal(t, uint64(2), iso.lowWatermark())

require.Equal(t, 0, countOpenReads(iso))

iso.closeAppend(appendB.id)
require.Equal(t, uint64(2), iso.lowWatermark())
require.Equal(t, int64(math.MaxInt64), iso.lowestAppendTime())
}

func countOpenReads(iso *isolation) int {
count := 0
iso.TraverseOpenReads(func(s *isolationState) bool {
count++
return true
})
return count
}

func BenchmarkIsolation(b *testing.B) {
for _, goroutines := range []int{10, 100, 1000, 10000} {
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
Expand All @@ -36,7 +95,7 @@ func BenchmarkIsolation(b *testing.B) {
<-start

for i := 0; i < b.N; i++ {
appendID, _ := iso.newAppendID()
appendID, _ := iso.newAppendID(0)

iso.closeAppend(appendID)
}
Expand Down Expand Up @@ -66,7 +125,7 @@ func BenchmarkIsolationWithState(b *testing.B) {
<-start

for i := 0; i < b.N; i++ {
appendID, _ := iso.newAppendID()
appendID, _ := iso.newAppendID(0)

iso.closeAppend(appendID)
}
Expand Down

0 comments on commit 5e7a5da

Please sign in to comment.