Skip to content

Commit

Permalink
Validate pg_rewind works in HA tests
Browse files Browse the repository at this point in the history
Stolon can recover from a failed pg_rewind by falling-back to
pg_basebackup. This decision happens within a black-box as far as the
integration tests are concerned: after resync, it's not possible to
detect whether a rewind or basebackup got the job done.

This change adds validation that whenever pg_rewind is enabled, it was
successfully used to resync the databases. We expose the keeper logs to
give visibility into what the keeper decided to do, and check against
the log output for the rewind logging we expect.
  • Loading branch information
lawrencejones committed Jun 6, 2019
1 parent 6003d65 commit 54f0eb3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 7 deletions.
27 changes: 27 additions & 0 deletions tests/integration/ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {

// Start the other standby, it should be ahead of current on previous timeline and should full resync himself
t.Logf("Starting standby[1]: %s", standbys[1].uid)
standbys[1].ReadOutput() // flush log output until now
standbys[1].Start()
// Standby[1] will start, then it'll detect it's in another timelinehistory,
// will stop, full resync and start. We have to avoid detecting it up
Expand All @@ -1101,6 +1102,32 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
t.Fatalf("unexpected err: %v", err)
}

// If enabled, it should be possible to recover from forked timelines by using
// pg_rewind. It's possible that pg_rewind will fail and we'd still recover the cluster
// by falling-back to pg_basebackup.
//
// Most people who enable pg_rewind really don't want basebackup to happen, as it may
// take hours to recover when pg_rewind is <1m. Here we verify that the keeper
// successfully recovered the forked timeline via a rewind rather than basebackup.
if usePgrewind {
output := standbys[1].ReadOutput()
if !strings.Contains(output, "running pg_rewind") {
t.Error("expected to run pg_rewind but could not find it in logs")
}

// This will occur whenever pg_rewind is run against a Postgres that has not
// checkpointed since its timeline previously forked. pg_rewind first grabs the
// pg_control file to check if timelines are diverged, and this file won't have been
// updated until a checkpoint takes place.
if !strings.Contains(output, "no rewind required") {
t.Error("keeper tried rewinding but rewind thought it was not required")
}

if strings.Contains(output, "running pg_basebackup") {
t.Error("pg_rewind is enabled but we performed a pg_basebackup anyway")
}
}

// Check that standby 1 is receiving wals
if err := write(t, standbys[0], 5, 5); err != nil {
t.Fatalf("unexpected err: %v", err)
Expand Down
30 changes: 23 additions & 7 deletions tests/integration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package integration

import (
"bufio"
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"
Expand Down Expand Up @@ -229,12 +231,13 @@ func waitNotStolonReplicationSlots(q Querier, replSlots []string, timeout time.D
}

type Process struct {
t *testing.T
uid string
name string
args []string
cmd *gexpect.ExpectSubprocess
bin string
t *testing.T
uid string
name string
args []string
cmd *gexpect.ExpectSubprocess
bin string
output bytes.Buffer
}

func (p *Process) start() error {
Expand All @@ -253,7 +256,9 @@ func (p *Process) start() error {
go func() {
scanner := bufio.NewScanner(pr)
for scanner.Scan() {
p.t.Logf("[%s %s]: %s", p.name, p.uid, scanner.Text())
line := scanner.Text()
p.t.Logf("[%s %s]: %s", p.name, p.uid, line)
p.output.Write([]byte(line + "\n"))
}
}()

Expand Down Expand Up @@ -422,6 +427,17 @@ func NewTestKeeper(t *testing.T, dir, clusterName, pgSUUsername, pgSUPassword, p
return NewTestKeeperWithID(t, dir, uid, clusterName, pgSUUsername, pgSUPassword, pgReplUsername, pgReplPassword, storeBackend, storeEndpoints, a...)
}

// ReadOutput returns the latest output from the keeper process: reading the output
// consumes the output.
func (tk *TestKeeper) ReadOutput() string {
lines, err := tk.Process.output.ReadString('\n')
if err != nil && err != io.EOF {
tk.t.Fatalf("failed to read output from test keeper buffer: %v", err)
}

return lines
}

func (tk *TestKeeper) PGDataVersion() (int, int, error) {
fh, err := os.Open(filepath.Join(tk.dataDir, "postgres", "PG_VERSION"))
if err != nil {
Expand Down

0 comments on commit 54f0eb3

Please sign in to comment.