Skip to content

Commit

Permalink
Fix queries after a failed snapshot replay (#9980) (#9987)
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome committed Dec 9, 2021
1 parent f29cacc commit 902817e
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 2 deletions.
4 changes: 3 additions & 1 deletion tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,9 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
// limits the ingested samples to the head min valid time.
func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
defer h.postings.EnsureOrder()
defer func() {
h.postings.EnsureOrder()
}()
defer h.gc() // After loading the wal remove the obsolete data from the head.
defer func() {
// Loading of m-mapped chunks and snapshot can make the mint of the Head
Expand Down
104 changes: 104 additions & 0 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2827,3 +2827,107 @@ func TestSnapshotError(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 0, len(tm))
}

// Tests https://github.com/prometheus/prometheus/issues/9725.
func TestChunkSnapshotReplayBug(t *testing.T) {
dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err)

// Write few series records and samples such that the series references are not in order in the WAL
// for status_code="200".
var buf []byte
for i := 1; i <= 1000; i++ {
var ref uint64
if i <= 500 {
ref = uint64(i * 100)
} else {
ref = uint64((i - 500) * 50)
}
seriesRec := record.RefSeries{
Ref: ref,
Labels: labels.Labels{
{Name: "__name__", Value: "request_duration"},
{Name: "status_code", Value: "200"},
{Name: "foo", Value: fmt.Sprintf("baz%d", rand.Int())},
},
}
// Add a sample so that the series is not garbage collected.
samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000}
var enc record.Encoder

rec := enc.Series([]record.RefSeries{seriesRec}, buf)
buf = rec[:0]
require.NoError(t, wlog.Log(rec))
rec = enc.Samples([]record.RefSample{samplesRec}, buf)
buf = rec[:0]
require.NoError(t, wlog.Log(rec))
}

// Write a corrupt snapshot to fail the replay on startup.
snapshotName := chunkSnapshotDir(0, 100)
cpdir := filepath.Join(dir, snapshotName)
require.NoError(t, os.MkdirAll(cpdir, 0o777))

err = ioutil.WriteFile(filepath.Join(cpdir, "00000000"), []byte{1, 5, 3, 5, 6, 7, 4, 2, 2}, 0o777)
require.NoError(t, err)

opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
opts.EnableMemorySnapshotOnShutdown = true
head, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
defer func() {
require.NoError(t, head.Close())
}()

// Snapshot replay should error out.
require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))

// Querying `request_duration{status_code!="200"}` should return no series since all of
// them have status_code="200".
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q,
labels.MustNewMatcher(labels.MatchEqual, "__name__", "request_duration"),
labels.MustNewMatcher(labels.MatchNotEqual, "status_code", "200"),
)
require.Len(t, series, 0, "there should be no series found")
}

func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
dir := t.TempDir()
wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, true)
require.NoError(t, err)

// Write a snapshot with .tmp suffix. This used to fail taking any further snapshots or replay of snapshots.
snapshotName := chunkSnapshotDir(0, 100) + ".tmp"
cpdir := filepath.Join(dir, snapshotName)
require.NoError(t, os.MkdirAll(cpdir, 0o777))

opts := DefaultHeadOptions()
opts.ChunkDirRoot = dir
opts.EnableMemorySnapshotOnShutdown = true
head, err := NewHead(nil, nil, wlog, opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))

require.Equal(t, 0.0, prom_testutil.ToFloat64(head.metrics.snapshotReplayErrorTotal))

// Add some samples for the snapshot.
app := head.Appender(context.Background())
_, err = app.Append(0, labels.Labels{{Name: "foo", Value: "bar"}}, 10, 10)
require.NoError(t, err)
require.NoError(t, app.Commit())

// Should not return any error for a successful snapshot.
require.NoError(t, head.Close())

// Verify the snapshot.
name, idx, offset, err := LastChunkSnapshot(dir)
require.NoError(t, err)
require.True(t, name != "")
require.Equal(t, 0, idx)
require.Greater(t, offset, 0)
}
3 changes: 2 additions & 1 deletion tsdb/head_wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,8 @@ func LastChunkSnapshot(dir string) (string, int, int, error) {

splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
if len(splits) != 2 {
return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name())
// Chunk snapshots is not in the right format, we do not care about it.
continue
}

idx, err := strconv.Atoi(splits[0])
Expand Down

0 comments on commit 902817e

Please sign in to comment.