Skip to content

Commit

Permalink
Fix Example() function in TSDB (#10153)
Browse files Browse the repository at this point in the history
* Fix Example() function in TSDB

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix tests

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome committed Jan 11, 2022
1 parent 714bc3f commit 129ed4e
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 60 deletions.
19 changes: 19 additions & 0 deletions tsdb/chunks/chunk_write_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,22 @@ func (c *chunkWriteQueue) stop() {

c.workerWg.Wait()
}

func (c *chunkWriteQueue) queueIsEmpty() bool {
return c.queueSize() == 0
}

func (c *chunkWriteQueue) queueIsFull() bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return c.queueSize() == cap(c.jobs)+1
}

func (c *chunkWriteQueue) queueSize() int {
c.chunkRefMapMtx.Lock()
defer c.chunkRefMapMtx.Unlock()

// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(c.chunkRefMap)
}
27 changes: 4 additions & 23 deletions tsdb/chunks/chunk_write_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
}

// The queue should be full.
require.True(t, queueIsFull(q))
require.True(t, q.queueIsFull())

// Adding another job should block as long as no job from the queue gets consumed.
addedJob := atomic.NewBool(false)
Expand All @@ -166,19 +166,19 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) {
require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10)

// The queue should be full again.
require.True(t, queueIsFull(q))
require.True(t, q.queueIsFull())

// Consume <sizeLimit>+1 jobs from the queue.
// To drain the queue we need to consume <sizeLimit>+1 jobs because 1 job
// is already in the state of being processed.
for job := 0; job < sizeLimit+1; job++ {
require.False(t, queueIsEmpty(q))
require.False(t, q.queueIsEmpty())
unblockChunkWriter()
}

// Wait until all jobs have been processed.
callbackWg.Wait()
require.True(t, queueIsEmpty(q))
require.True(t, q.queueIsEmpty())
}

func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
Expand Down Expand Up @@ -266,22 +266,3 @@ func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
})
}
}

func queueIsEmpty(q *chunkWriteQueue) bool {
return queueSize(q) == 0
}

func queueIsFull(q *chunkWriteQueue) bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return queueSize(q) == cap(q.jobs)+1
}

func queueSize(q *chunkWriteQueue) int {
q.chunkRefMapMtx.Lock()
defer q.chunkRefMapMtx.Unlock()

// Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has
// been fully processed, it remains in the chunkRefMap until the processing is complete.
return len(q.chunkRefMap)
}
4 changes: 4 additions & 0 deletions tsdb/chunks/head_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ func (cdm *ChunkDiskMapper) CutNewFile() {
cdm.evtlPos.cutFileOnNextChunk()
}

func (cdm *ChunkDiskMapper) IsQueueEmpty() bool {
return cdm.writeQueue.queueIsEmpty()
}

// cutAndExpectRef creates a new m-mapped file.
// The write lock should be held before calling this.
// It ensures that the position in the new file matches the given chunk reference, if not then it errors.
Expand Down
4 changes: 4 additions & 0 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,10 @@ func TestSizeRetention(t *testing.T) {
}
require.NoError(t, headApp.Commit())

require.Eventually(t, func() bool {
return db.Head().chunkDiskMapper.IsQueueEmpty()
}, 2*time.Second, 100*time.Millisecond)

// Test that registered size matches the actual disk size.
require.NoError(t, db.reloadBlocks()) // Reload the db to register the new db size.
require.Equal(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
Expand Down
66 changes: 29 additions & 37 deletions tsdb/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,97 +16,89 @@ package tsdb
import (
"context"
"fmt"
"io/ioutil"
"math"
"testing"
"os"
"time"

"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/model/labels"
)

func TestExample(t *testing.T) {
func Example() {
// Create a random dir to work in. Open() doesn't require a pre-existing dir, but
// we want to make sure not to make a mess where we shouldn't.
dir := t.TempDir()
dir, err := ioutil.TempDir("", "tsdb-test")
noErr(err)

// Open a TSDB for reading and/or writing.
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
noErr(err)

// Open an appender for writing.
app := db.Appender(context.Background())

lbls := labels.FromStrings("foo", "bar")
var appendedSamples []sample
series := labels.FromStrings("foo", "bar")

// Ref is 0 for the first append since we don't know the reference for the series.
ts, v := time.Now().Unix(), 123.0
ref, err := app.Append(0, lbls, ts, v)
require.NoError(t, err)
appendedSamples = append(appendedSamples, sample{ts, v})
ref, err := app.Append(0, series, time.Now().Unix(), 123)
noErr(err)

// Another append for a second later.
// Re-using the ref from above since it's the same series, makes append faster.
time.Sleep(time.Second)
ts, v = time.Now().Unix(), 124
_, err = app.Append(ref, lbls, ts, v)
require.NoError(t, err)
appendedSamples = append(appendedSamples, sample{ts, v})
_, err = app.Append(ref, series, time.Now().Unix(), 124)
noErr(err)

// Commit to storage.
err = app.Commit()
require.NoError(t, err)
noErr(err)

// In case you want to do more appends after app.Commit(),
// you need a new appender.
// app = db.Appender(context.Background())
//
app = db.Appender(context.Background())
// ... adding more samples.
//
// Commit to storage.
// err = app.Commit()
// require.NoError(t, err)

// Open a querier for reading.
querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)

noErr(err)
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var queriedSamples []sample

for ss.Next() {
series := ss.At()
fmt.Println("series:", series.Labels().String())

it := series.Iterator()
for it.Next() {
ts, v := it.At()
fmt.Println("sample", ts, v)
queriedSamples = append(queriedSamples, sample{ts, v})
_, v := it.At() // We ignore the timestamp here, only to have a predictable output we can test against (below)
fmt.Println("sample", v)
}

require.NoError(t, it.Err())
fmt.Println("it.Err():", it.Err())
}
require.NoError(t, ss.Err())
fmt.Println("ss.Err():", ss.Err())
ws := ss.Warnings()
if len(ws) > 0 {
fmt.Println("warnings:", ws)
}
err = querier.Close()
require.NoError(t, err)
noErr(err)

// Clean up any last resources when done.
err = db.Close()
require.NoError(t, err)

require.Equal(t, appendedSamples, queriedSamples)
noErr(err)
err = os.RemoveAll(dir)
noErr(err)

// Output:
// series: {foo="bar"}
// sample <ts1> 123
// sample <ts2> 124
// sample 123
// sample 124
// it.Err(): <nil>
// ss.Err(): <nil>
}

func noErr(err error) {
if err != nil {
panic(err)
}
}

0 comments on commit 129ed4e

Please sign in to comment.