Skip to content

Commit

Permalink
implementation + simple test
Browse files Browse the repository at this point in the history
  • Loading branch information
defntvdm committed Jan 9, 2020
1 parent 78fa709 commit ff14310
Show file tree
Hide file tree
Showing 24 changed files with 358 additions and 77 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Expand Up @@ -118,6 +118,9 @@ jobs:
- script: make TEST="pg_backup_perftest" pg_integration_test
workspaces:
use: image_cache
- script: make TEST="pg_catchup_perftest" pg_integration_test
workspaces:
use: image_cache


notifications:
Expand Down
27 changes: 27 additions & 0 deletions cmd/pg/catchup_fetch.go
@@ -0,0 +1,27 @@
package pg

import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
)

const (
CatchupFetchShortDescription = "Fetches an incremental backup from storage"
)

// catchupFetchCmd represents the catchup-fetch command
var catchupFetchCmd = &cobra.Command{
Use: "catchup-fetch PGDATA backup_name",
Short: CatchupFetchShortDescription, // TODO : improve description
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
internal.HandleCatchupFetch(folder, args[0], args[1])
},
}

func init() {
Cmd.AddCommand(catchupFetchCmd)
}
33 changes: 33 additions & 0 deletions cmd/pg/catchup_push.go
@@ -0,0 +1,33 @@
package pg

import (
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"

"github.com/spf13/cobra"
)

const (
catchupPushShortDescription = "Creates incremental backup from lsn"
)

var (
// catchupPushCmd represents the catchup-push command
catchupPushCmd = &cobra.Command{
Use: "catchup-push PGDATA --from-lsn LSN",
Short: catchupPushShortDescription,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
uploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)
internal.HandleCatchupPush(uploader, args[0], fromLSN)
},
}
fromLSN uint64
)

func init() {
Cmd.AddCommand(catchupPushCmd)

catchupPushCmd.Flags().Uint64Var(&fromLSN, "from-lsn", 0, "LSN to start incremental backup")
}
11 changes: 11 additions & 0 deletions docker-compose.yml
Expand Up @@ -324,6 +324,17 @@ services:
links:
- s3

pg_catchup_perftest:
build:
dockerfile: docker/pg_tests/Dockerfile_catchup_perftest
context: .
image: wal-g/catchup_perftest
container_name: wal-g_pg_catchup_perftest
depends_on:
- s3
links:
- s3

mysql:
build:
dockerfile: docker/mysql/Dockerfile
Expand Down
3 changes: 3 additions & 0 deletions docker/pg_tests/Docker_catchup_perftest
@@ -0,0 +1,3 @@
FROM wal-g/docker_prefix:latest

CMD su postgres -c "/tmp/tests/catchup_perftest.sh"
5 changes: 5 additions & 0 deletions docker/pg_tests/scripts/configs/catchup_perftest_config.json
@@ -0,0 +1,5 @@
"WALE_S3_PREFIX": "s3://deletebeforepermanentfullbucket",
"WALG_USE_WAL_DELTA": "true",
"WALG_UPLOAD_CONCURRENCY": "1",
"WALG_UPLOAD_QUEUE": "1",
"WALG_UPLOAD_DISK_CONCURRENCY": "1"
14 changes: 14 additions & 0 deletions docker/pg_tests/scripts/scripts/wait_while_replication_complete.sh
@@ -0,0 +1,14 @@
#!/bin/bash

DATABASE=$1
TABLE=$2
PORT=$3

ROW_COUNT=`psql --port ${PORT} -d ${DATABASE} -c "SELECT COUNT(*) from ${TABLE}" | grep -E '[0-9]+' | head -1`

while [[ "${ROW_COUNT}" != " 10000000" ]]
do
echo "Wait a sec to replication end"
sleep 1
ROW_COUNT=`psql --port ${PORT} -c "SELECT COUNT(*) from ${TABLE}" | grep -E '[0-9]+' | head -1`
done
93 changes: 93 additions & 0 deletions docker/pg_tests/scripts/tests/catchup_perftest.sh
@@ -0,0 +1,93 @@
#!/bin/sh
set -e -x

PGDATA="/var/lib/postgresql/10/main"
PGDATA_ALPHA="${PGDATA}_alpha"
PGDATA_BETA="${PGDATA}_beta"
ALPHA_DUMP="/tmp/alpha_dump"
ALPHA_PORT=5432
BETA_DUMP="/tmp/beta_dump"
BETA_PORT=5433

# init config
CONFIG_FILE="/tmp/configs/catchup_perftest_config.json"
COMMON_CONFIG="/tmp/configs/common_config.json"
TMP_CONFIG="/tmp/configs/tmp_config.json"
cp ${CONFIG_FILE} ${TMP_CONFIG}
echo "," >> ${TMP_CONFIG}
cat ${COMMON_CONFIG} >> ${TMP_CONFIG}
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG}

# init alpha cluster
/usr/lib/postgresql/10/bin/initdb ${PGDATA_ALPHA}
/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -w start
PGDATA=${PGDATA_ALPHA} /tmp/scripts/wait_while_pg_not_ready.sh

# preparation for replication
pushd ${PGDATA_ALPHA}
psql -c "CREATE ROLE repl WITH REPLICATION PASSWORD 'password' LOGIN;"
echo "host replication repl 127.0.0.1/32 md5" >> pg_hba.conf
echo "wal_level = replica" >> postgresql.conf
echo "wal_keep_segments = 100" >> postgresql.conf
echo "max_wal_senders = 4" >> postgresql.conf
/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -w restart
PGDATA=${PGDATA_ALPHA} /tmp/scripts/wait_while_pg_not_ready.sh
popd

# init beta cluster (replica of alpha)
/usr/lib/postgresql/10/bin/pg_basebackup --wal-method=stream -D ${PGDATA_BETA} -U repl -h 127.0.0.1 -p ${ALPHA_PORT}
pushd ${PGDATA_BETA}
echo "port = ${BETA_PORT}" >> postgresql.conf
echo "hot_standby = on" >> postgresql.conf
cat > recovery.conf << EOF
standby_mode = 'on'
primary_conninfo = 'host=127.0.0.1 port=${ALPHA_PORT} user=repl password=password'
restore_command = 'cp ${PGDATA_BETA}/archive/%f %p'
trigger_file = '/tmp/postgresql.trigger.${BETA_PORT}'
EOF
/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_BETA} -w start
popd

# fill database postgres
pgbench -i -s 100 -h 127.0.0.1 -p ${ALPHA_PORT} postgres

LSN=`psql -c "SELECT pg_current_wal_lsn() - '0/0'::pg_lsn;" | grep -E '[0-9]+' | head -1`

/tmp/scripts/wait_while_replication_complete.sh postgres pgbench_accounts ${BETA_PORT}
# script above waits only one table, so just in case sleep
sleep 5

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_BETA} --mode smart -w stop
sleep 5

# change database postgres and dump database
pgbench -i -s 200 -h 127.0.0.1 -p ${ALPHA_PORT} postgres
/usr/lib/postgresql/10/bin/pg_dump -h 127.0.0.1 -p ${ALPHA_PORT} -f ${ALPHA_DUMP} postgres

wal-g --config=${TMP_CONFIG} catchup-push ${PGDATA_ALPHA} --from-lsn ${LSN} 2>/tmp/stderr 1>/tmp/stdout
cat /tmp/stderr /tmp/stdout

BACKUP_NAME=`grep -oE 'base_.*' /tmp/stderr`

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_ALPHA} -w stop
sleep 5

wal-g --config=${TMP_CONFIG} catchup-fetch ${PGDATA_BETA} $BACKUP_NAME

# rename recovery.conf to don't care about wals and remove backup_label
# lately it can be started as replica again when recovery.conf will be renamed back
# and postgres restarted
pushd ${PGDATA_BETA}
mv recovery.conf{,.bak}
rm backup_label
/usr/lib/postgresql/10/bin/pg_resetwal -f .
popd

/usr/lib/postgresql/10/bin/pg_ctl -D ${PGDATA_BETA} -w start
sleep 5

/usr/lib/postgresql/10/bin/pg_dump -h 127.0.0.1 -p ${BETA_PORT} -f ${BETA_DUMP} postgres

diff ${ALPHA_DUMP} ${BETA_DUMP}

echo "Catchup perftest success"
17 changes: 13 additions & 4 deletions internal/backup.go
Expand Up @@ -188,15 +188,24 @@ func setTablespacePaths(spec TablespaceSpec) error {
return nil
}

// TODO : unit tests
// Do the job of unpacking Backup object
func (backup *Backup) unwrap(dbDataDirectory string, sentinelDto BackupSentinelDto, filesToUnwrap map[string]bool) error {
// check that directory is empty before unwrap
func (backup *Backup) unwrapToEmptyDirectory(
dbDataDirectory string, sentinelDto BackupSentinelDto, filesToUnwrap map[string]bool, createIncrementalFiles bool,
) error {
err := checkDbDirectoryForUnwrap(dbDataDirectory, sentinelDto)
if err != nil {
return err
}

tarInterpreter := NewFileTarInterpreter(dbDataDirectory, sentinelDto, filesToUnwrap)
return backup.unwrap(dbDataDirectory, sentinelDto, filesToUnwrap, createIncrementalFiles)
}

// TODO : unit tests
// Do the job of unpacking Backup object
func (backup *Backup) unwrap(
dbDataDirectory string, sentinelDto BackupSentinelDto, filesToUnwrap map[string]bool, createIncrementalFiles bool,
) error {
tarInterpreter := NewFileTarInterpreter(dbDataDirectory, sentinelDto, filesToUnwrap, createIncrementalFiles)
tarsToExtract, pgControlKey, err := backup.getTarsToExtract(sentinelDto, filesToUnwrap)
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions internal/backup_fetch_handler.go
Expand Up @@ -91,14 +91,14 @@ func GetStreamFetcher(writeCloser io.WriteCloser) func(folder storage.Folder, ba
// HandleBackupFetch is invoked to perform wal-g backup-fetch
func HandleBackupFetch(folder storage.Folder, backupName string, fetcher func(folder storage.Folder, backup Backup)) {
tracelog.DebugLogger.Printf("HandleBackupFetch(%s, folder,)\n", backupName)
backup, err := GetBackupByName(backupName, folder)
backup, err := GetBackupByName(backupName, utility.BaseBackupPath, folder)
tracelog.ErrorLogger.FatalfOnError("Failed to fetch backup: %v\n", err)

fetcher(folder, *backup)
}

func GetBackupByName(backupName string, folder storage.Folder) (*Backup, error) {
baseBackupFolder := folder.GetSubFolder(utility.BaseBackupPath)
func GetBackupByName(backupName, subfolder string, folder storage.Folder) (*Backup, error) {
baseBackupFolder := folder.GetSubFolder(subfolder)

var backup *Backup
if backupName == LatestString {
Expand Down Expand Up @@ -140,7 +140,7 @@ func chooseTablespaceSpecification(sentinelDto BackupSentinelDto, spec *Tablespa
// deltaFetchRecursion function composes Backup object and recursively searches for necessary base backup
func deltaFetchRecursion(backupName string, folder storage.Folder, dbDataDirectory string,
tablespaceSpec *TablespaceSpec, filesToUnwrap map[string]bool) error {
backup, err := GetBackupByName(backupName, folder)
backup, err := GetBackupByName(backupName, utility.BaseBackupPath, folder)
if err != nil {
return err
}
Expand All @@ -163,7 +163,7 @@ func deltaFetchRecursion(backupName string, folder storage.Folder, dbDataDirecto
tracelog.InfoLogger.Printf("%v fetched. Upgrading from LSN %x to LSN %x \n", *(sentinelDto.IncrementFrom), *(sentinelDto.IncrementFromLSN), *(sentinelDto.BackupStartLSN))
}

return backup.unwrap(dbDataDirectory, sentinelDto, filesToUnwrap)
return backup.unwrapToEmptyDirectory(dbDataDirectory, sentinelDto, filesToUnwrap, false)
}

func GetBaseFilesToUnwrap(backupFileStates BackupFileList, currentFilesToUnwrap map[string]bool) (map[string]bool, error) {
Expand Down
8 changes: 4 additions & 4 deletions internal/backup_fetch_test.go
Expand Up @@ -12,7 +12,7 @@ import (

func TestGetBackupByName_Latest(t *testing.T) {
folder := testtools.CreateMockStorageFolder()
backup, err := internal.GetBackupByName(internal.LatestString, folder)
backup, err := internal.GetBackupByName(internal.LatestString, utility.BaseBackupPath, folder)
assert.NoError(t, err)
assert.Equal(t, folder.GetSubFolder(utility.BaseBackupPath), backup.BaseBackupFolder)
assert.Equal(t, "base_000", backup.Name)
Expand All @@ -21,22 +21,22 @@ func TestGetBackupByName_Latest(t *testing.T) {
func TestGetBackupByName_LatestNoBackups(t *testing.T) {
folder := testtools.MakeDefaultInMemoryStorageFolder()
folder.PutObject("folder123/nop", &bytes.Buffer{})
_, err := internal.GetBackupByName(internal.LatestString, folder)
_, err := internal.GetBackupByName(internal.LatestString, utility.BaseBackupPath, folder)
assert.Error(t, err)
assert.IsType(t, internal.NewNoBackupsFoundError(), err)
}

func TestGetBackupByName_Exists(t *testing.T) {
folder := testtools.CreateMockStorageFolder()
backup, err := internal.GetBackupByName("base_123", folder)
backup, err := internal.GetBackupByName("base_123", utility.BaseBackupPath, folder)
assert.NoError(t, err)
assert.Equal(t, folder.GetSubFolder(utility.BaseBackupPath), backup.BaseBackupFolder)
assert.Equal(t, "base_123", backup.Name)
}

func TestGetBackupByName_NotExists(t *testing.T) {
folder := testtools.CreateMockStorageFolder()
_, err := internal.GetBackupByName("base_321", folder)
_, err := internal.GetBackupByName("base_321", utility.BaseBackupPath, folder)
assert.Error(t, err)
assert.IsType(t, internal.NewBackupNonExistenceError(""), err)
}
3 changes: 2 additions & 1 deletion internal/backup_list_handler.go
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/jedib0t/go-pretty/table"
"github.com/wal-g/storages/storage"
"github.com/wal-g/wal-g/utility"
"github.com/wal-g/tracelog"
)

Expand Down Expand Up @@ -91,7 +92,7 @@ func HandleBackupListWithFlags(folder storage.Folder, pretty bool, json bool, de
func getBackupDetails(folder storage.Folder, backups []BackupTime) ([]BackupDetail, error) {
backupDetails := make([]BackupDetail, len(backups))
for i := len(backups) - 1; i >= 0; i-- {
backup, err := GetBackupByName(backups[i].BackupName, folder)
backup, err := GetBackupByName(backups[i].BackupName, utility.BaseBackupPath, folder)
if err != nil {
return nil, err
} else {
Expand Down

0 comments on commit ff14310

Please sign in to comment.