Skip to content

Commit

Permalink
Store corrupted blocks count && add flag for storing all corrupted bl…
Browse files Browse the repository at this point in the history
…ocks
  • Loading branch information
usernamedt committed Aug 27, 2020
1 parent 4efeef1 commit 50e27bd
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 40 deletions.
28 changes: 17 additions & 11 deletions cmd/pg/backup_push.go
Expand Up @@ -9,13 +9,15 @@ import (
)

const (
BackupPushShortDescription = "Makes backup and uploads it to storage"
PermanentFlag = "permanent"
FullBackupFlag = "full"
VerifyPagesFlag = "verify"
PermanentShorthand = "p"
FullBackupShorthand = "f"
VerifyPagesShorthand = "v"
BackupPushShortDescription = "Makes backup and uploads it to storage"
PermanentFlag = "permanent"
FullBackupFlag = "full"
VerifyPagesFlag = "verify"
StoreAllCorruptBlocksFlag = "store-all-corrupt"
PermanentShorthand = "p"
FullBackupShorthand = "f"
VerifyPagesShorthand = "v"
StoreAllCorruptBlocksShorthand = "s"
)

var (
Expand All @@ -28,12 +30,14 @@ var (
uploader, err := internal.ConfigureWalUploader()
tracelog.ErrorLogger.FatalOnError(err)
verifyPageChecksums = verifyPageChecksums || viper.GetBool(internal.VerifyPageChecksumsSetting)
internal.HandleBackupPush(uploader, args[0], permanent, fullBackup, verifyPageChecksums)
storeAllCorruptBlocks = storeAllCorruptBlocks || viper.GetBool(internal.StoreAllCorruptBlocksSetting)
internal.HandleBackupPush(uploader, args[0], permanent, fullBackup, verifyPageChecksums, storeAllCorruptBlocks)
},
}
permanent = false
fullBackup = false
verifyPageChecksums = false
permanent = false
fullBackup = false
verifyPageChecksums = false
storeAllCorruptBlocks = false
)

func init() {
Expand All @@ -42,4 +46,6 @@ func init() {
backupPushCmd.Flags().BoolVarP(&permanent, PermanentFlag, PermanentShorthand, false, "Pushes permanent backup")
backupPushCmd.Flags().BoolVarP(&fullBackup, FullBackupFlag, FullBackupShorthand, false, "Make full backup-push")
backupPushCmd.Flags().BoolVarP(&verifyPageChecksums, VerifyPagesFlag, VerifyPagesShorthand, false, "Verify page checksums")
backupPushCmd.Flags().BoolVarP(&storeAllCorruptBlocks, StoreAllCorruptBlocksFlag, StoreAllCorruptBlocksShorthand,
false, "Store all corrupt blocks found during page checksum verification")
}
23 changes: 18 additions & 5 deletions internal/backup_file_description.go
Expand Up @@ -11,25 +11,38 @@ type BackupFileDescription struct {
IsIncremented bool // should never be both incremented and Skipped
IsSkipped bool
MTime time.Time
CorruptBlocks []uint32 `json:",omitempty"`
CorruptBlocks *CorruptBlocksInfo `json:",omitempty"`
}

func NewBackupFileDescription(isIncremented, isSkipped bool, modTime time.Time) *BackupFileDescription {
return &BackupFileDescription{isIncremented, isSkipped, modTime, nil}
}

func (desc *BackupFileDescription) SetCorruptBlocks(corruptBlockNumbers []uint32) {
type CorruptBlocksInfo struct {
CorruptBlocksCount int
SomeCorruptBlocks []uint32
}

func (desc *BackupFileDescription) SetCorruptBlocks(corruptBlockNumbers []uint32, storeAllBlocks bool) {
if len(corruptBlockNumbers) == 0 {
return
}
sort.Slice(corruptBlockNumbers, func(i, j int) bool {
return corruptBlockNumbers[i] < corruptBlockNumbers[j]
})

corruptBlocksCount := len(corruptBlockNumbers)
// write no more than MaxCorruptBlocksInFileDesc
desc.CorruptBlocks = make([]uint32, 0)
someCorruptBlocks := make([]uint32, 0)
for idx, blockNo := range corruptBlockNumbers {
if idx >= MaxCorruptBlocksInFileDesc {
if !storeAllBlocks && idx >= MaxCorruptBlocksInFileDesc {
break
}
desc.CorruptBlocks = append(desc.CorruptBlocks, blockNo)
someCorruptBlocks = append(someCorruptBlocks, blockNo)
}
desc.CorruptBlocks = &CorruptBlocksInfo{
CorruptBlocksCount: corruptBlocksCount,
SomeCorruptBlocks: someCorruptBlocks,
}
}

Expand Down
9 changes: 5 additions & 4 deletions internal/backup_push_handler.go
Expand Up @@ -75,7 +75,7 @@ func createAndPushBackup(
previousBackupSentinelDto BackupSentinelDto,
isPermanent, forceIncremental bool,
incrementCount int,
verifyPageChecksums bool,
verifyPageChecksums, storeAllCorruptBlocks bool,
) {
folder := uploader.UploadingFolder
uploader.UploadingFolder = folder.GetSubFolder(backupsFolder) // TODO: AB: this subfolder switch look ugly. I think typed storage folders could be better (i.e. interface BasebackupStorageFolder, WalStorageFolder etc)
Expand Down Expand Up @@ -122,7 +122,7 @@ func createAndPushBackup(
// Start a new tar bundle, walk the archiveDirectory and upload everything there.
err = bundle.StartQueue(NewStorageTarBallMaker(backupName, uploader.Uploader))
tracelog.ErrorLogger.FatalOnError(err)
err = bundle.SetupComposer(verifyPageChecksums)
err = bundle.SetupComposer(NewTarBallFilePackerOptions(verifyPageChecksums, storeAllCorruptBlocks))
tracelog.ErrorLogger.FatalOnError(err)
tracelog.InfoLogger.Println("Walking ...")
err = filepath.Walk(archiveDirectory, bundle.HandleWalkedFSObject)
Expand Down Expand Up @@ -200,7 +200,8 @@ func createAndPushBackup(

// TODO : unit tests
// HandleBackupPush is invoked to perform a wal-g backup-push
func HandleBackupPush(uploader *WalUploader, archiveDirectory string, isPermanent, isFullBackup, verifyPageChecksums bool) {
func HandleBackupPush(uploader *WalUploader, archiveDirectory string, isPermanent, isFullBackup,
verifyPageChecksums, storeAllCorruptBlocks bool) {
archiveDirectory = utility.ResolveSymlink(archiveDirectory)
maxDeltas, fromFull := getDeltaConfig()
checkPgVersionAndPgControl(archiveDirectory)
Expand Down Expand Up @@ -252,7 +253,7 @@ func HandleBackupPush(uploader *WalUploader, archiveDirectory string, isPermanen
}

createAndPushBackup(uploader, archiveDirectory, utility.BaseBackupPath, previousBackupName,
previousBackupSentinelDto, isPermanent, false, incrementCount, verifyPageChecksums)
previousBackupSentinelDto, isPermanent, false, incrementCount, verifyPageChecksums, storeAllCorruptBlocks)
}

// TODO : unit tests
Expand Down
4 changes: 2 additions & 2 deletions internal/bundle.go
Expand Up @@ -103,8 +103,8 @@ func (bundle *Bundle) StartQueue(tarBallMaker TarBallMaker) error {
return bundle.TarBallQueue.StartQueue()
}

func (bundle *Bundle) SetupComposer(verifyPageChecksums bool) (err error) {
bundle.TarBallComposer, err = NewTarBallComposer(RegularComposer, bundle, verifyPageChecksums)
func (bundle *Bundle) SetupComposer(filePackOptions TarBallFilePackerOptions) (err error) {
bundle.TarBallComposer, err = NewTarBallComposer(RegularComposer, bundle, filePackOptions)
return err
}

Expand Down
7 changes: 4 additions & 3 deletions internal/bundle_files.go
Expand Up @@ -10,7 +10,8 @@ import (
type BundleFiles interface {
AddSkippedFile(tarHeader *tar.Header, fileInfo os.FileInfo)
AddFile(tarHeader *tar.Header, fileInfo os.FileInfo, isIncremented bool)
AddFileWithCorruptBlocks(tarHeader *tar.Header, fileInfo os.FileInfo, isIncremented bool, corruptedBlocks []uint32)
AddFileWithCorruptBlocks(tarHeader *tar.Header, fileInfo os.FileInfo, isIncremented bool,
corruptedBlocks []uint32, storeAllBlocks bool)
GetUnderlyingMap() *sync.Map
}

Expand All @@ -29,9 +30,9 @@ func (files *RegularBundleFiles) AddFile(tarHeader *tar.Header, fileInfo os.File
}

func (files *RegularBundleFiles) AddFileWithCorruptBlocks(tarHeader *tar.Header, fileInfo os.FileInfo,
isIncremented bool, corruptedBlocks []uint32) {
isIncremented bool, corruptedBlocks []uint32, storeAllBlocks bool) {
fileDescription := BackupFileDescription{IsSkipped: false, IsIncremented: isIncremented, MTime: fileInfo.ModTime()}
fileDescription.SetCorruptBlocks(corruptedBlocks)
fileDescription.SetCorruptBlocks(corruptedBlocks, storeAllBlocks)
files.Store(tarHeader.Name, fileDescription)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/catchup_push_handler.go
Expand Up @@ -26,6 +26,6 @@ func HandleCatchupPush(uploader *WalUploader, archiveDirectory string, fromLSN u
archiveDirectory, utility.CatchupPath,
"", fakePreviousBackupSentinelDto,
false, true, 0,
false,
false, false,
)
}
3 changes: 3 additions & 0 deletions internal/config.go
Expand Up @@ -32,6 +32,7 @@ const (
UseReverseUnpackSetting = "WALG_USE_REVERSE_UNPACK"
SkipRedundantTarsSetting = "WALG_SKIP_REDUNDANT_TARS"
VerifyPageChecksumsSetting = "WALG_VERIFY_PAGE_CHECKSUMS"
StoreAllCorruptBlocksSetting = "WALG_STORE_ALL_CORRUPT_BLOCKS"
LogLevelSetting = "WALG_LOG_LEVEL"
TarSizeThresholdSetting = "WALG_TAR_SIZE_THRESHOLD"
CseKmsIDSetting = "WALG_CSE_KMS_ID"
Expand Down Expand Up @@ -101,6 +102,7 @@ var (
UseReverseUnpackSetting: "false",
SkipRedundantTarsSetting: "false",
VerifyPageChecksumsSetting: "false",
StoreAllCorruptBlocksSetting: "false",

OplogArchiveTimeoutInterval: "60s",
OplogArchiveAfterSize: "16777216", // 32 << (10 * 2)
Expand Down Expand Up @@ -138,6 +140,7 @@ var (
UseReverseUnpackSetting: true,
SkipRedundantTarsSetting: true,
VerifyPageChecksumsSetting: true,
StoreAllCorruptBlocksSetting: true,

// Postgres
PgPortSetting: true,
Expand Down
5 changes: 3 additions & 2 deletions internal/tar_ball_composer.go
Expand Up @@ -38,11 +38,12 @@ const (
RegularComposer TarBallComposerType = iota + 1
)

func NewTarBallComposer(composerType TarBallComposerType, bundle *Bundle, verifyPageChecksums bool) (TarBallComposer, error) {
func NewTarBallComposer(composerType TarBallComposerType, bundle *Bundle,
filePackOptions TarBallFilePackerOptions) (TarBallComposer, error) {
switch composerType {
case RegularComposer:
bundleFiles := &RegularBundleFiles{}
tarBallFilePacker := newTarBallFilePacker(bundle.DeltaMap, bundle.IncrementFromLsn, bundleFiles, verifyPageChecksums)
tarBallFilePacker := newTarBallFilePacker(bundle.DeltaMap, bundle.IncrementFromLsn, bundleFiles, filePackOptions)
return NewRegularTarBallComposer(bundle.TarBallQueue, tarBallFilePacker, bundleFiles, bundle.Crypter), nil
default:
return nil, errors.New("NewTarBallComposer: Unknown TarBallComposerType")
Expand Down
35 changes: 24 additions & 11 deletions internal/tar_ball_file_packer.go
Expand Up @@ -39,21 +39,33 @@ func (err IgnoredFileError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}

type TarBallFilePackerOptions struct {
verifyPageChecksums bool
storeAllCorruptBlocks bool
}

func NewTarBallFilePackerOptions(verifyPageChecksums, storeAllCorruptBlocks bool) TarBallFilePackerOptions {
return TarBallFilePackerOptions{
verifyPageChecksums: verifyPageChecksums,
storeAllCorruptBlocks: storeAllCorruptBlocks,
}
}

// TarBallFilePacker is used to pack bundle file into tarball.
type TarBallFilePacker struct {
deltaMap PagedFileDeltaMap
incrementFromLsn *uint64
files BundleFiles
verifyPageChecksums bool
deltaMap PagedFileDeltaMap
incrementFromLsn *uint64
files BundleFiles
options TarBallFilePackerOptions
}

func newTarBallFilePacker(deltaMap PagedFileDeltaMap, incrementFromLsn *uint64, files BundleFiles,
verifyPageChecksums bool) *TarBallFilePacker {
options TarBallFilePackerOptions) *TarBallFilePacker {
return &TarBallFilePacker{
deltaMap: deltaMap,
incrementFromLsn: incrementFromLsn,
files: files,
verifyPageChecksums: verifyPageChecksums,
deltaMap: deltaMap,
incrementFromLsn: incrementFromLsn,
files: files,
options: options,
}
}

Expand Down Expand Up @@ -84,7 +96,7 @@ func (p *TarBallFilePacker) PackFileIntoTar(cfi *ComposeFileInfo, tarBall TarBal
}
errorGroup, _ := errgroup.WithContext(context.Background())

if p.verifyPageChecksums {
if p.options.verifyPageChecksums {
var secondReadCloser io.ReadCloser
// newTeeReadCloser is used to provide the fileReadCloser to two consumers:
// fileReadCloser is needed for PackFileTo, secondReadCloser is for the page verification
Expand All @@ -94,7 +106,8 @@ func (p *TarBallFilePacker) PackFileIntoTar(cfi *ComposeFileInfo, tarBall TarBal
if err != nil {
return err
}
p.files.AddFileWithCorruptBlocks(cfi.header, cfi.fileInfo, cfi.isIncremented, corruptBlocks)
p.files.AddFileWithCorruptBlocks(cfi.header, cfi.fileInfo, cfi.isIncremented,
corruptBlocks, p.options.storeAllCorruptBlocks)
return nil
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion internal/walk_test.go
Expand Up @@ -349,7 +349,7 @@ func TestWalk(t *testing.T) {
t.Log(err)
}

err = bundle.SetupComposer(false)
err = bundle.SetupComposer(internal.TarBallFilePackerOptions{})
if err != nil {
t.Log(err)
}
Expand Down

0 comments on commit 50e27bd

Please sign in to comment.