Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP wal-restore #977

Merged
merged 20 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/dockertests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ jobs:
'make TEST="pg_wal_perftest" pg_integration_test',
'make TEST="pg_backup_perftest" pg_integration_test',
'make TEST="pg_catchup_test" pg_integration_test',
'make TEST="pg_wal_restore_test" pg_integration_test',
'make MYSQL_TEST=mysql_base_tests mysql_integration_test',
'make MYSQL_TEST=mysql_delete_tests mysql_integration_test',
'make MYSQL_TEST=mysql_copy_tests mysql_integration_test',
Expand Down
31 changes: 31 additions & 0 deletions cmd/pg/wal_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package pg

import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/postgres"
)

const (
WalRestoreUsage = "wal-restore target-pgdata source-pgdata"
WalRestoreShortDescription = "Restores WAL segments from storage."
WalRestoreLongDescription = "Restores the missing WAL segments that will be needed to perform pg_rewind from storage."
)

// walRestoreCmd represents the walRestore command
var walRestoreCmd = &cobra.Command{
Use: WalRestoreUsage,
Short: WalRestoreShortDescription,
Long: WalRestoreLongDescription,
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalfOnError("Error on configure external folder %v\n", err)
postgres.HandleWALRestore(args[0], args[1], folder)
},
}

func init() {
Cmd.AddCommand(walRestoreCmd)
}
13 changes: 13 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ services:
&& mkdir -p /export/gpdeletebeforetimebucket
&& mkdir -p /export/createrestorepointbucket
&& mkdir -p /export/storagetoolsbucket
&& mkdir -p /export/walrestorebucket
&& /usr/bin/minio server /export'

s3-another:
Expand Down Expand Up @@ -560,6 +561,18 @@ services:
links:
- s3


pg_wal_restore_test:
build:
dockerfile: docker/pg_tests/Dockerfile_wal_restore_test
context: .
image: wal-g/wal_restore_test
container_name: wal-g_pg_wal_restore_test
depends_on:
- s3
links:
- s3

pg_pgbackrest:
build:
dockerfile: docker/pg_tests/Dockerfile_pgbackrest
Expand Down
3 changes: 3 additions & 0 deletions docker/pg_tests/Dockerfile_wal_restore_test
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM wal-g/docker_prefix:latest

CMD su - postgres /tmp/tests/wal_restore_test.sh
14 changes: 14 additions & 0 deletions docker/pg_tests/scripts/configs/wal_restore_test_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE",
"AWS_SECRET_ACCESS_KEY": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"AWS_ENDPOINT": "http://s3:9000",
"AWS_S3_FORCE_PATH_STYLE": "true",
"WALG_COMPRESSION_METHOD": "brotli",
"WALG_DELTA_MAX_STEPS": "3",
"WALG_UPLOAD_CONCURRENCY": "10",
"WALG_DISK_RATE_LIMIT": "41943040",
"WALG_NETWORK_RATE_LIMIT": "10485760",
"PGSSLMODE": "allow",
"PGDATABASE": "postgres",
"PGHOST": "/var/run/postgresql",
"WALE_S3_PREFIX": "s3://walrestorebucket",
"WALG_USE_WAL_DELTA": "true"
103 changes: 103 additions & 0 deletions docker/pg_tests/scripts/tests/wal_restore_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/bin/sh
set -e -x

PGDATA="/var/lib/postgresql/10/main"
PGDATA_ALPHA="${PGDATA}_alpha"
PGDATA_BETA="${PGDATA}_beta"
ALPHA_PORT=5432
BETA_PORT=5433

# init config
CONFIG_FILE="/tmp/configs/wal_restore_test_config.json"
COMMON_CONFIG="/tmp/configs/common_config.json"
TMP_CONFIG="/tmp/configs/tmp_config.json"
cp ${CONFIG_FILE} ${TMP_CONFIG}
echo "," >> ${TMP_CONFIG}
cat ${COMMON_CONFIG} >> ${TMP_CONFIG}
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG}

# init alpha cluster
/usr/lib/postgresql/10/bin/initdb ${PGDATA_ALPHA}

# preparation for replication
cd ${PGDATA_ALPHA}
echo "host replication repl 127.0.0.1/32 md5" >> pg_hba.conf
{
echo "wal_level = replica"
echo "wal_keep_segments = 3"
echo "max_wal_senders = 2"
echo "hot_standby = on"
echo "listen_addresses = 'localhost'"
echo "wal_log_hints = on"

echo "archive_mode = on"
echo "archive_command = '/usr/bin/timeout 600 /usr/bin/wal-g wal-push %p --config=${TMP_CONFIG}'"
echo "archive_timeout = 600"
} >> postgresql.conf

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -w start
PGDATA=${PGDATA_ALPHA} /tmp/scripts/wait_while_pg_not_ready.sh

timeout 30 wal-g --config=${TMP_CONFIG} delete everything FORCE --confirm

psql -c "CREATE ROLE repl WITH REPLICATION PASSWORD 'password' LOGIN;"

# init beta cluster (replica of alpha)
/usr/lib/postgresql/10/bin/pg_basebackup --wal-method=stream -D ${PGDATA_BETA} -U repl -h 127.0.0.1 -p ${ALPHA_PORT}

# preparation for replication
cd ${PGDATA_BETA}
{
echo "wal_log_hints = on"
echo "max_wal_size = 32MB"
echo "min_wal_size = 32MB"

echo "port = ${BETA_PORT}"
echo "hot_standby = on"

echo "archive_mode = on"
echo "archive_command = '/usr/bin/timeout 600 /usr/bin/wal-g wal-push %p --config=${TMP_CONFIG}'"
echo "archive_timeout = 600"
} >> postgresql.conf
cat > recovery.conf << EOF
standby_mode = 'on'
primary_conninfo = 'host=127.0.0.1 port=${ALPHA_PORT} user=repl password=password'
restore_command = 'cp ${PGDATA_BETA}/archive/%f %p'
trigger_file = '/tmp/postgresql.trigger.${BETA_PORT}'
EOF

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_BETA} -w start

# fill database postgres
pgbench -i -s 10 -h 127.0.0.1 -p ${ALPHA_PORT} postgres

# db table conn_port row_count
/tmp/scripts/wait_while_replication_complete.sh postgres pgbench_accounts ${ALPHA_PORT} 1000000 # 10 * 100000, 10 is value of -s in pgbench
# script above waits only one table, so just in case sleep
sleep 3

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -m fast -w stop
sleep 7

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_BETA} -w promote

pgbench -i -s 20 -h 127.0.0.1 -p ${BETA_PORT} postgres

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_BETA} -m fast -W stop
sleep 10

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -w start
PGDATA=${PGDATA_ALPHA} /tmp/scripts/wait_while_pg_not_ready.sh

pgbench -i -s 5 -h 127.0.0.1 -p ${ALPHA_PORT} postgres

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -m fast -W stop
sleep 10

# for more info to log
ls "${PGDATA_BETA}/pg_wal"

timeout 30 wal-g --config=${TMP_CONFIG} wal-restore ${PGDATA_ALPHA} ${PGDATA_BETA}
sleep 10

/usr/lib/postgresql/10/bin/pg_rewind -D ${PGDATA_ALPHA} --source-pgdata=${PGDATA_BETA}
9 changes: 9 additions & 0 deletions docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,15 @@ Usage:
wal-g wal-purge
```

### ``wal-restore``

Restores the missing WAL segments that will be needed to perform pg_rewind from storage. The current version supports only local clusters.

Usage:
```bash
wal-g wal-restore path/to/target-pgdata path/to/source-pgdata
```

pgBackRest backups support
-----------
### ``pgbackrest backup-list``
Expand Down
74 changes: 74 additions & 0 deletions internal/databases/postgres/pg_control_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package postgres

import (
"encoding/binary"
"io"
"os"
"path"

"github.com/wal-g/tracelog"
)

const pgControlSize = 8192

// PgControlData represents data contained in pg_control file
type PgControlData struct {
systemIdentifier uint64 // systemIdentifier represents system ID of PG cluster (f.e. [0-8] bytes in pg_control)
currentTimeline uint32 // currentTimeline represents current timeline of PG cluster (f.e. [48-52] bytes in pg_control v. 1100+)
// Any data from pg_control
}

// ExtractPgControl extract pg_control data of cluster by storage
func ExtractPgControl(folder string) (*PgControlData, error) {
pgControlReadCloser, err := os.Open(path.Join(folder, PgControlPath))
if err != nil {
return nil, err
}

result, err := extractPgControlData(pgControlReadCloser)
if err != nil {
closeErr := pgControlReadCloser.Close()
tracelog.WarningLogger.Printf("Error on closing pg_control file: %v\n", closeErr)
return nil, err
}

err = pgControlReadCloser.Close()
if err != nil {
return nil, err
}

return result, nil
}

func extractPgControlData(pgControlReader io.Reader) (*PgControlData, error) {
bytes := make([]byte, pgControlSize)

_, err := io.ReadAtLeast(pgControlReader, bytes, pgControlSize)
if err != nil {
return nil, err
}

systemID := binary.LittleEndian.Uint64(bytes[0:8])
pgControlVersion := binary.LittleEndian.Uint32(bytes[8:12])
currentTimeline := uint32(0)

if pgControlVersion < 1100 {
currentTimeline = binary.LittleEndian.Uint32(bytes[56:60])
} else {
currentTimeline = binary.LittleEndian.Uint32(bytes[48:52])
}

// Parse bytes from pg_control file and share this data
return &PgControlData{
systemIdentifier: systemID,
currentTimeline: currentTimeline,
}, nil
}

func (data *PgControlData) GetSystemIdentifier() uint64 {
return data.systemIdentifier
}

func (data *PgControlData) GetCurrentTimeline() uint32 {
return data.currentTimeline
}
55 changes: 55 additions & 0 deletions internal/databases/postgres/pg_control_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package postgres

import (
bytes2 "bytes"
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractPgControlData_IncorrectPgControlSize(t *testing.T) {
bytes := make([]byte, pgControlSize-1)
_, err := extractPgControlData(bytes2.NewReader(bytes))
assert.Error(t, err)
}

func TestExtractPgControlData_OldVersion(t *testing.T) {
bytesContainsSystemId := make([]byte, 8)
binary.LittleEndian.PutUint64(bytesContainsSystemId, 9876)
bytesContainsVersion := make([]byte, 4)
binary.LittleEndian.PutUint32(bytesContainsVersion, 1099)
bytesContainsTimeline := make([]byte, 4)
binary.LittleEndian.PutUint32(bytesContainsTimeline, 7)

bytes := bytesContainsSystemId
bytes = append(bytes, bytesContainsVersion...)
bytes = append(bytes, make([]byte, 44)...)
bytes = append(bytes, bytesContainsTimeline...)
bytes = append(bytes, make([]byte, pgControlSize-8-4-44-4)...)

pgControlData, err := extractPgControlData(bytes2.NewReader(bytes))
assert.Nil(t, err)
assert.Equal(t, uint64(9876), pgControlData.GetSystemIdentifier())
assert.Equal(t, uint32(7), pgControlData.GetCurrentTimeline())
}

func TestExtractPgControlData_NewVersion(t *testing.T) {
bytesContainsSystemId := make([]byte, 8)
binary.LittleEndian.PutUint64(bytesContainsSystemId, 9876)
bytesContainsVersion := make([]byte, 4)
binary.LittleEndian.PutUint32(bytesContainsVersion, 1100)
bytesContainsTimeline := make([]byte, 4)
binary.LittleEndian.PutUint32(bytesContainsTimeline, 7)

bytes := bytesContainsSystemId
bytes = append(bytes, bytesContainsVersion...)
bytes = append(bytes, make([]byte, 36)...)
bytes = append(bytes, bytesContainsTimeline...)
bytes = append(bytes, make([]byte, pgControlSize-8-4-36-4)...)

pgControlData, err := extractPgControlData(bytes2.NewReader(bytes))
assert.Nil(t, err)
assert.Equal(t, uint64(9876), pgControlData.GetSystemIdentifier())
assert.Equal(t, uint32(7), pgControlData.GetCurrentTimeline())
}