Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement database composer and partial fetch for Postgres #1434

Merged
merged 15 commits into from
Mar 18, 2023
23 changes: 20 additions & 3 deletions cmd/pg/backup_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package pg
import (
"fmt"

"github.com/wal-g/wal-g/internal/databases/postgres"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/postgres"
"github.com/wal-g/wal-g/pkg/storages/storage"
)

Expand All @@ -21,13 +20,16 @@ For information about pattern syntax view: https://golang.org/pkg/path/filepath/
reverseDeltaUnpackDescription = "Unpack delta backups in reverse order (beta feature)"
skipRedundantTarsDescription = "Skip tars with no useful data (requires reverse delta unpack)"
targetUserDataDescription = "Fetch storage backup which has the specified user data"
restoreOnlyDescription = `[Experimental] Downloads only databases specified by passed db ids from default tablespace.
Sets reverse delta unpack & skip redundant tars options automatically`
)

var fileMask string
var restoreSpec string
var reverseDeltaUnpack bool
var skipRedundantTars bool
var fetchTargetUserData string
var onlyDatabases []int

var backupFetchCmd = &cobra.Command{
Use: "backup-fetch destination_directory [backup_name | --target-user-data <data>]",
Expand All @@ -46,9 +48,21 @@ var backupFetchCmd = &cobra.Command{
tracelog.ErrorLogger.FatalOnError(err)

var pgFetcher func(folder storage.Folder, backup internal.Backup)

if onlyDatabases != nil {
skipRedundantTars = true
reverseDeltaUnpack = true
}
reverseDeltaUnpack = reverseDeltaUnpack || viper.GetBool(internal.UseReverseUnpackSetting)
skipRedundantTars = skipRedundantTars || viper.GetBool(internal.SkipRedundantTarsSetting)
extractProv := postgres.ExtractProviderImpl{}

var extractProv postgres.ExtractProvider

if onlyDatabases != nil {
extractProv = postgres.NewExtractProviderDBSpec(onlyDatabases)
} else {
extractProv = postgres.ExtractProviderImpl{}
}

if reverseDeltaUnpack {
pgFetcher = postgres.GetPgFetcherNew(args[0], fileMask, restoreSpec, skipRedundantTars, extractProv)
Expand Down Expand Up @@ -85,5 +99,8 @@ func init() {
false, skipRedundantTarsDescription)
backupFetchCmd.Flags().StringVar(&fetchTargetUserData, "target-user-data",
"", targetUserDataDescription)
backupFetchCmd.Flags().IntSliceVar(&onlyDatabases, "restore-only",
nil, restoreOnlyDescription)

Cmd.AddCommand(backupFetchCmd)
}
11 changes: 11 additions & 0 deletions cmd/pg/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
storeAllCorruptBlocksFlag = "store-all-corrupt"
useRatingComposerFlag = "rating-composer"
useCopyComposerFlag = "copy-composer"
useDatabaseComposerFlag = "database-composer"
deltaFromUserDataFlag = "delta-from-user-data"
deltaFromNameFlag = "delta-from-name"
addUserDataFlag = "add-user-data"
Expand All @@ -31,6 +32,7 @@ const (
storeAllCorruptBlocksShorthand = "s"
useRatingComposerShorthand = "r"
useCopyComposerShorthand = "c"
useDatabaseComposerShorthand = "b"
)

var (
Expand Down Expand Up @@ -102,6 +104,7 @@ var (
verifyPageChecksums = false
storeAllCorruptBlocks = false
useRatingComposer = false
useDatabaseComposer = false
useCopyComposer = false
deltaFromName = ""
deltaFromUserData = ""
Expand All @@ -116,6 +119,12 @@ func chooseTarBallComposer() postgres.TarBallComposerType {
if useRatingComposer {
tarBallComposerType = postgres.RatingComposer
}

useDatabaseComposer = useDatabaseComposer || viper.GetBool(internal.UseDatabaseComposerSetting)
if useDatabaseComposer {
tarBallComposerType = postgres.DatabaseComposer
}

useCopyComposer = useCopyComposer || viper.GetBool(internal.UseCopyComposerSetting)
if useCopyComposer {
fullBackup = true
Expand All @@ -140,6 +149,8 @@ func init() {
false, "Use rating tar composer (beta)")
backupPushCmd.Flags().BoolVarP(&useCopyComposer, useCopyComposerFlag, useCopyComposerShorthand,
false, "Use copy tar composer (beta)")
backupPushCmd.Flags().BoolVarP(&useDatabaseComposer, useDatabaseComposerFlag, useDatabaseComposerShorthand,
false, "Use database tar composer (experimental)")
backupPushCmd.Flags().StringVar(&deltaFromName, deltaFromNameFlag,
"", "Select the backup specified by name as the target for the delta backup")
backupPushCmd.Flags().StringVar(&deltaFromUserData, deltaFromUserDataFlag,
Expand Down
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services:
command: >
-c 'mkdir -p /export/fullbucket
&& mkdir -p /export/fullratingcomposerbucket
&& mkdir -p /export/fulldatabasecomposerbucket
&& mkdir -p /export/fullcopycomposerbucket
&& mkdir -p /export/fullwithoutfilesmetadatabucket
&& mkdir -p /export/fullscandeltabucket
Expand Down Expand Up @@ -344,6 +345,17 @@ services:
links:
- s3

pg_full_backup_database_composer_test:
build:
dockerfile: docker/pg_tests/Dockerfile_full_backup_database_composer_test
context: .
image: wal-g/full_backup_database_composer_test
container_name: wal-g_pg_full_backup_database_composer_test
depends_on:
- s3
links:
- s3

pg_full_backup_without_files_metadata_test:
build:
dockerfile: docker/pg_tests/Dockerfile_full_backup_without_files_metadata_test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM wal-g/docker_prefix:latest

CMD su postgres -c "/tmp/tests/full_backup_database_composer_test.sh"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"WALE_S3_PREFIX": "s3://fulldatabasecomposerbucket",
"WALG_DELTA_MAX_STEPS": "6",
"WALG_PGP_KEY_PATH": "/tmp/PGP_KEY",
"WALG_USE_DATABASE_COMPOSER": "true"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh
set -e -x
CONFIG_FILE="/tmp/configs/full_backup_database_composer_test_config.json"
COMMON_CONFIG="/tmp/configs/common_config.json"
TMP_CONFIG="/tmp/configs/tmp_config.json"
cat ${CONFIG_FILE} > ${TMP_CONFIG}
echo "," >> ${TMP_CONFIG}
cat ${COMMON_CONFIG} >> ${TMP_CONFIG}
/tmp/scripts/wrap_config_file.sh ${TMP_CONFIG}

. /tmp/tests/test_functions/test_full_backup.sh
test_full_backup ${TMP_CONFIG}
1 change: 1 addition & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
StoreAllCorruptBlocksSetting = "WALG_STORE_ALL_CORRUPT_BLOCKS"
UseRatingComposerSetting = "WALG_USE_RATING_COMPOSER"
UseCopyComposerSetting = "WALG_USE_COPY_COMPOSER"
UseDatabaseComposerSetting = "WALG_USE_DATABASE_COMPOSER"
WithoutFilesMetadataSetting = "WALG_WITHOUT_FILES_METADATA"
DeltaFromNameSetting = "WALG_DELTA_FROM_NAME"
DeltaFromUserDataSetting = "WALG_DELTA_FROM_USER_DATA"
Expand Down
30 changes: 15 additions & 15 deletions internal/databases/greenplum/backup_fetch_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,38 @@ import (

func TestPrepareContentIDsToFetch(t *testing.T) {
testcases := []struct {
fetchContentId []int
segmentConfig []cluster.SegConfig
fetchContentId []int
segmentConfig []cluster.SegConfig
contentIDsToFetch map[int]bool
} {
}{
{
fetchContentId: []int{},
segmentConfig: []cluster.SegConfig{},
fetchContentId: []int{},
segmentConfig: []cluster.SegConfig{},
contentIDsToFetch: map[int]bool{},
},
{
fetchContentId: []int{},
segmentConfig: []cluster.SegConfig{{ContentID: 21}, {ContentID: 42}},
fetchContentId: []int{},
segmentConfig: []cluster.SegConfig{{ContentID: 21}, {ContentID: 42}},
contentIDsToFetch: map[int]bool{21: true, 42: true},
},
{
fetchContentId: []int{1},
segmentConfig: []cluster.SegConfig{{ContentID: 1231}, {ContentID: 6743}, {ContentID: 7643}},
fetchContentId: []int{1},
segmentConfig: []cluster.SegConfig{{ContentID: 1231}, {ContentID: 6743}, {ContentID: 7643}},
contentIDsToFetch: map[int]bool{1: true},
},
{
fetchContentId: []int{65, 42, 12, 76, 22},
segmentConfig: []cluster.SegConfig{},
fetchContentId: []int{65, 42, 12, 76, 22},
segmentConfig: []cluster.SegConfig{},
contentIDsToFetch: map[int]bool{65: true, 42: true, 12: true, 76: true, 22: true},
},
{
fetchContentId: []int{5, 4, 3, 2, 1},
segmentConfig: []cluster.SegConfig{{ContentID: 4}, {ContentID: 5}, {ContentID: 6}},
fetchContentId: []int{5, 4, 3, 2, 1},
segmentConfig: []cluster.SegConfig{{ContentID: 4}, {ContentID: 5}, {ContentID: 6}},
contentIDsToFetch: map[int]bool{1: true, 2: true, 3: true, 4: true, 5: true},
},
{
fetchContentId: []int{6, 7, 8, 9, 10},
segmentConfig: []cluster.SegConfig{{ContentID: 1}, {ContentID: 5}, {ContentID: 7}},
fetchContentId: []int{6, 7, 8, 9, 10},
segmentConfig: []cluster.SegConfig{{ContentID: 1}, {ContentID: 5}, {ContentID: 7}},
contentIDsToFetch: map[int]bool{6: true, 7: true, 8: true, 9: true, 10: true},
},
}
Expand Down
5 changes: 2 additions & 3 deletions internal/databases/postgres/backup_fetch_handler_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package postgres

import (
"fmt"

"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/pkg/storages/storage"
Expand All @@ -25,14 +24,14 @@ func GetPgFetcherNew(dbDataDirectory, fileMask, restoreSpecPath string, skipRedu
tracelog.ErrorLogger.FatalfOnError(errMessege, err)
}

// directory must be empty before starting a deltaFetch
// directory must be empty before starting
Catmoonlight marked this conversation as resolved.
Show resolved Hide resolved
isEmpty, err := utility.IsDirectoryEmpty(dbDataDirectory)
tracelog.ErrorLogger.FatalfOnError("Failed to fetch backup: %v\n", err)

if !isEmpty {
tracelog.ErrorLogger.FatalfOnError("Failed to fetch backup: %v\n",
NewNonEmptyDBDataDirectoryError(dbDataDirectory))
}

config := NewFetchConfig(pgBackup.Name,
utility.ResolveSymlink(dbDataDirectory), folder, spec, filesToUnwrap, skipRedundantTars, extractProv)
err = deltaFetchRecursionNew(config)
Expand Down
138 changes: 138 additions & 0 deletions internal/databases/postgres/dir_database_tar_ball_composer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package postgres

import (
"archive/tar"
"os"
"path"
"strings"

"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/crypto"
"golang.org/x/sync/errgroup"
)

type DirDatabaseTarBallComposerMaker struct {
filePackerOptions TarBallFilePackerOptions
files internal.BundleFiles
tarFileSets internal.TarFileSets
}

func NewDirDatabaseTarBallComposerMaker(files internal.BundleFiles, filePackerOptions TarBallFilePackerOptions,
tarFileSets internal.TarFileSets) *DirDatabaseTarBallComposerMaker {
return &DirDatabaseTarBallComposerMaker{
files: files,
filePackerOptions: filePackerOptions,
tarFileSets: tarFileSets,
}
}

func (m DirDatabaseTarBallComposerMaker) Make(bundle *Bundle) (internal.TarBallComposer, error) {
tarPacker := NewTarBallFilePacker(bundle.DeltaMap, bundle.IncrementFromLsn, m.files, m.filePackerOptions)
return newDirDatabaseTarBallComposer(
m.files,
bundle.TarBallQueue,
tarPacker,
m.tarFileSets,
bundle.Crypter,
), nil
}

type DirDatabaseTarBallComposer struct {
// Packing stuff
files internal.BundleFiles
tarBallQueue *internal.TarBallQueue
tarFilePacker *TarBallFilePackerImpl
tarFileSets internal.TarFileSets
crypter crypto.Crypter
//
fileDirCollection map[string][]*internal.ComposeFileInfo
}

func newDirDatabaseTarBallComposer(
files internal.BundleFiles,
tarBallQueue *internal.TarBallQueue,
tarFilePacker *TarBallFilePackerImpl,
sets internal.TarFileSets,
crypter crypto.Crypter,
) *DirDatabaseTarBallComposer {
return &DirDatabaseTarBallComposer{
files: files,
tarBallQueue: tarBallQueue,
tarFilePacker: tarFilePacker,
tarFileSets: sets,
fileDirCollection: make(map[string][]*internal.ComposeFileInfo),
crypter: crypter,
}
}

func (d DirDatabaseTarBallComposer) AddFile(info *internal.ComposeFileInfo) {
if strings.Contains(info.Path, DefaultTablespace) {
d.fileDirCollection[path.Dir(info.Path)] = append(d.fileDirCollection[path.Dir(info.Path)], info)
Catmoonlight marked this conversation as resolved.
Show resolved Hide resolved
} else {
d.fileDirCollection[""] = append(d.fileDirCollection[""], info)
}
}

func (d DirDatabaseTarBallComposer) AddHeader(header *tar.Header, fileInfo os.FileInfo) error {
tarBall := d.tarBallQueue.Deque()
tarBall.SetUp(d.crypter)
defer d.tarBallQueue.EnqueueBack(tarBall)
d.tarFileSets.AddFile(tarBall.Name(), header.Name)
d.files.AddFile(header, fileInfo, false)
return tarBall.TarWriter().WriteHeader(header)
}

func (d DirDatabaseTarBallComposer) SkipFile(tarHeader *tar.Header, fileInfo os.FileInfo) {
d.files.AddSkippedFile(tarHeader, fileInfo)
}

func (d DirDatabaseTarBallComposer) FinishComposing() (internal.TarFileSets, error) {
// Push Headers in first part
err := d.addListToTar(make([]*internal.ComposeFileInfo, 0))
if err != nil {
return nil, err
}

eg := errgroup.Group{}
for _, fileInfos := range d.fileDirCollection {
thisInfos := fileInfos
eg.Go(func() error {
if len(thisInfos) == 0 {
return nil
}
return d.addListToTar(thisInfos)
})
}

if err := eg.Wait(); err != nil {
return nil, err
}
return d.tarFileSets, nil
}

func (d DirDatabaseTarBallComposer) GetFiles() internal.BundleFiles {
return d.files
}

func (d DirDatabaseTarBallComposer) addListToTar(files []*internal.ComposeFileInfo) error {
tarBall := d.tarBallQueue.Deque()
tarBall.SetUp(d.crypter)

for _, file := range files {
d.tarFileSets.AddFile(tarBall.Name(), file.Header.Name)
err := d.tarFilePacker.PackFileIntoTar(file, tarBall)
if err != nil {
return err
}

if tarBall.Size() > d.tarBallQueue.TarSizeThreshold {
err := d.tarBallQueue.FinishTarBall(tarBall)
if err != nil {
return err
}
tarBall = d.tarBallQueue.Deque()
tarBall.SetUp(d.crypter)
}
}
return d.tarBallQueue.FinishTarBall(tarBall)
}