Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pages checksum verification #736

Merged
merged 8 commits into from Aug 28, 2020
8 changes: 7 additions & 1 deletion cmd/pg/backup_push.go
@@ -1,6 +1,7 @@
package pg

import (
"github.com/spf13/viper"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"

Expand All @@ -11,8 +12,10 @@ const (
BackupPushShortDescription = "Makes backup and uploads it to storage"
PermanentFlag = "permanent"
FullBackupFlag = "full"
VerifyPagesFlag = "verify"
PermanentShorthand = "p"
FullBackupShorthand = "f"
VerifyPagesShorthand = "v"
)

var (
Expand All @@ -24,16 +27,19 @@ var (
Run: func(cmd *cobra.Command, args []string) {
uploader, err := internal.ConfigureWalUploader()
tracelog.ErrorLogger.FatalOnError(err)
internal.HandleBackupPush(uploader, args[0], permanent, fullBackup)
verifyPageChecksums = verifyPageChecksums || viper.GetBool(internal.VerifyPageChecksumsSetting)
internal.HandleBackupPush(uploader, args[0], permanent, fullBackup, verifyPageChecksums)
},
}
permanent = false
fullBackup = false
verifyPageChecksums = false
)

func init() {
Cmd.AddCommand(backupPushCmd)

backupPushCmd.Flags().BoolVarP(&permanent, PermanentFlag, PermanentShorthand, false, "Pushes permanent backup")
backupPushCmd.Flags().BoolVarP(&fullBackup, FullBackupFlag, FullBackupShorthand, false, "Make full backup-push")
backupPushCmd.Flags().BoolVarP(&verifyPageChecksums, VerifyPagesFlag, VerifyPagesShorthand, false, "Verify page checksums")
}
25 changes: 23 additions & 2 deletions internal/backup_file_description.go
@@ -1,15 +1,36 @@
package internal

import "time"
import (
"sort"
"time"
)

const MaxCorruptBlocksInFileDesc int = 10

type BackupFileDescription struct {
IsIncremented bool // should never be both incremented and Skipped
IsSkipped bool
MTime time.Time
CorruptBlocks []uint32 `json:",omitempty"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the actual number of corrupted blocks. Sometimes it helps to distinguish different bugs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe a flag to keep all blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 50e27bd

}

func NewBackupFileDescription(isIncremented, isSkipped bool, modTime time.Time) *BackupFileDescription {
return &BackupFileDescription{isIncremented, isSkipped, modTime}
return &BackupFileDescription{isIncremented, isSkipped, modTime, nil}
}

func (desc *BackupFileDescription) SetCorruptBlocks(corruptBlockNumbers []uint32) {
sort.Slice(corruptBlockNumbers, func(i, j int) bool {
return corruptBlockNumbers[i] < corruptBlockNumbers[j]
})

// write no more than MaxCorruptBlocksInFileDesc
desc.CorruptBlocks = make([]uint32, 0)
for idx, blockNo := range corruptBlockNumbers {
if idx >= MaxCorruptBlocksInFileDesc {
break
}
desc.CorruptBlocks = append(desc.CorruptBlocks, blockNo)
}
}

type BackupFileList map[string]BackupFileDescription
8 changes: 5 additions & 3 deletions internal/backup_push_handler.go
Expand Up @@ -75,6 +75,7 @@ func createAndPushBackup(
previousBackupSentinelDto BackupSentinelDto,
isPermanent, forceIncremental bool,
incrementCount int,
verifyPageChecksums bool,
) {
folder := uploader.UploadingFolder
uploader.UploadingFolder = folder.GetSubFolder(backupsFolder) // TODO: AB: this subfolder switch look ugly. I think typed storage folders could be better (i.e. interface BasebackupStorageFolder, WalStorageFolder etc)
Expand Down Expand Up @@ -121,7 +122,7 @@ func createAndPushBackup(
// Start a new tar bundle, walk the archiveDirectory and upload everything there.
err = bundle.StartQueue(NewStorageTarBallMaker(backupName, uploader.Uploader))
tracelog.ErrorLogger.FatalOnError(err)
err = bundle.SetupComposer()
err = bundle.SetupComposer(verifyPageChecksums)
tracelog.ErrorLogger.FatalOnError(err)
tracelog.InfoLogger.Println("Walking ...")
err = filepath.Walk(archiveDirectory, bundle.HandleWalkedFSObject)
Expand Down Expand Up @@ -199,7 +200,7 @@ func createAndPushBackup(

// TODO : unit tests
// HandleBackupPush is invoked to perform a wal-g backup-push
func HandleBackupPush(uploader *WalUploader, archiveDirectory string, isPermanent bool, isFullBackup bool) {
func HandleBackupPush(uploader *WalUploader, archiveDirectory string, isPermanent, isFullBackup, verifyPageChecksums bool) {
archiveDirectory = utility.ResolveSymlink(archiveDirectory)
maxDeltas, fromFull := getDeltaConfig()
checkPgVersionAndPgControl(archiveDirectory)
Expand Down Expand Up @@ -250,7 +251,8 @@ func HandleBackupPush(uploader *WalUploader, archiveDirectory string, isPermanen
tracelog.InfoLogger.Println("Doing full backup.")
}

createAndPushBackup(uploader, archiveDirectory, utility.BaseBackupPath, previousBackupName, previousBackupSentinelDto, isPermanent, false, incrementCount)
createAndPushBackup(uploader, archiveDirectory, utility.BaseBackupPath, previousBackupName,
previousBackupSentinelDto, isPermanent, false, incrementCount, verifyPageChecksums)
}

// TODO : unit tests
Expand Down
4 changes: 2 additions & 2 deletions internal/bundle.go
Expand Up @@ -103,8 +103,8 @@ func (bundle *Bundle) StartQueue(tarBallMaker TarBallMaker) error {
return bundle.TarBallQueue.StartQueue()
}

func (bundle *Bundle) SetupComposer() (err error) {
bundle.TarBallComposer, err = NewTarBallComposer(RegularComposer, bundle)
func (bundle *Bundle) SetupComposer(verifyPageChecksums bool) (err error) {
bundle.TarBallComposer, err = NewTarBallComposer(RegularComposer, bundle, verifyPageChecksums)
return err
}

Expand Down
10 changes: 9 additions & 1 deletion internal/bundle_files.go
Expand Up @@ -10,6 +10,7 @@ import (
type BundleFiles interface {
AddSkippedFile(tarHeader *tar.Header, fileInfo os.FileInfo)
AddFile(tarHeader *tar.Header, fileInfo os.FileInfo, isIncremented bool)
AddFileWithCorruptBlocks(tarHeader *tar.Header, fileInfo os.FileInfo, isIncremented bool, corruptedBlocks []uint32)
GetUnderlyingMap() *sync.Map
}

Expand All @@ -27,6 +28,13 @@ func (files *RegularBundleFiles) AddFile(tarHeader *tar.Header, fileInfo os.File
BackupFileDescription{IsSkipped: false, IsIncremented: isIncremented, MTime: fileInfo.ModTime()})
}

func (files *RegularBundleFiles) AddFileWithCorruptBlocks(tarHeader *tar.Header, fileInfo os.FileInfo,
isIncremented bool, corruptedBlocks []uint32) {
fileDescription := BackupFileDescription{IsSkipped: false, IsIncremented: isIncremented, MTime: fileInfo.ModTime()}
fileDescription.SetCorruptBlocks(corruptedBlocks)
files.Store(tarHeader.Name, fileDescription)
}

func (files *RegularBundleFiles) GetUnderlyingMap() *sync.Map {
return &files.Map
}
}
1 change: 1 addition & 0 deletions internal/catchup_push_handler.go
Expand Up @@ -26,5 +26,6 @@ func HandleCatchupPush(uploader *WalUploader, archiveDirectory string, fromLSN u
archiveDirectory, utility.CatchupPath,
"", fakePreviousBackupSentinelDto,
false, true, 0,
false,
)
}
3 changes: 3 additions & 0 deletions internal/config.go
Expand Up @@ -31,6 +31,7 @@ const (
UseWalDeltaSetting = "WALG_USE_WAL_DELTA"
UseReverseUnpackSetting = "WALG_USE_REVERSE_UNPACK"
SkipRedundantTarsSetting = "WALG_SKIP_REDUNDANT_TARS"
VerifyPageChecksumsSetting = "WALG_VERIFY_PAGE_CHECKSUMS"
LogLevelSetting = "WALG_LOG_LEVEL"
TarSizeThresholdSetting = "WALG_TAR_SIZE_THRESHOLD"
CseKmsIDSetting = "WALG_CSE_KMS_ID"
Expand Down Expand Up @@ -99,6 +100,7 @@ var (
TotalBgUploadedLimit: "32",
UseReverseUnpackSetting: "false",
SkipRedundantTarsSetting: "false",
VerifyPageChecksumsSetting: "false",

OplogArchiveTimeoutInterval: "60s",
OplogArchiveAfterSize: "16777216", // 32 << (10 * 2)
Expand Down Expand Up @@ -135,6 +137,7 @@ var (
NameStreamRestoreCmd: true,
UseReverseUnpackSetting: true,
SkipRedundantTarsSetting: true,
VerifyPageChecksumsSetting: true,

// Postgres
PgPortSetting: true,
Expand Down
21 changes: 21 additions & 0 deletions internal/ioextensions/io.go
Expand Up @@ -64,3 +64,24 @@ func CreateFileWith(filePath string, content io.Reader) error {
_, err = utility.FastCopy(file, content)
return err
}

type MultiCloser struct {
closers []io.Closer
}

func NewMultiCloser(closers []io.Closer) *MultiCloser {
return &MultiCloser{
closers: closers,
}
}
func (m *MultiCloser) Close() error {
var err error
for _, c := range m.closers {
// still call Close on each, even if one returns an error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, it will be better to save all error messages. You can combine it to a new error

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4efeef1

if e := c.Close(); e != nil {
err = e
}
}
return err
}