Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Greenplum: cancel backup on Postgres process failure #1260

Merged
merged 3 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/PostgreSQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ Sample metadata file (000000020000000300000071.json)
```
If the parameter value is NOMETADATA or not specified, it will fallback to default setting (no wal metadata generation)

* `WALG_ALIVE_CHECK_INTERVAL`

To control how frequently WAL-G will check if Postgres is alive during the backup-push. If the check fails, backup-push terminates.

Examples:
- `0` - disable the alive checks (default value)
- `10s` - check every 10 seconds
- `10m` - check every 10 minutes


* `WALG_STOP_BACKUP_TIMEOUT`

Timeout for the pg_stop_backup() call. By default, there is no timeout.

Examples:
- `0` - disable the timeout (default value)
- `10s` - 10 seconds timeout
- `10m` - 10 minutes timeout


Usage
-----

Expand Down
30 changes: 17 additions & 13 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ const (
StreamSplitterPartitions = "WALG_STREAM_SPLITTER_PARTITIONS"
StreamSplitterBlockSize = "WALG_STREAM_SPLITTER_BLOCK_SIZE"
StatsdAddressSetting = "WALG_STATSD_ADDRESS"
PgAliveCheckInterval = "WALG_ALIVE_CHECK_INTERVAL"
PgStopBackupTimeout = "WALG_STOP_BACKUP_TIMEOUT"

ProfileSamplingRatio = "PROFILE_SAMPLING_RATIO"
ProfileMode = "PROFILE_MODE"
Expand Down Expand Up @@ -335,19 +337,21 @@ var (

PGAllowedSettings = map[string]bool{
// Postgres
PgPortSetting: true,
PgUserSetting: true,
PgHostSetting: true,
PgDataSetting: true,
PgPasswordSetting: true,
PgDatabaseSetting: true,
PgSslModeSetting: true,
PgSlotName: true,
PgWalSize: true,
"PGPASSFILE": true,
PrefetchDir: true,
PgReadyRename: true,
PgBackRestStanza: true,
PgPortSetting: true,
PgUserSetting: true,
PgHostSetting: true,
PgDataSetting: true,
PgPasswordSetting: true,
PgDatabaseSetting: true,
PgSslModeSetting: true,
PgSlotName: true,
PgWalSize: true,
"PGPASSFILE": true,
PrefetchDir: true,
PgReadyRename: true,
PgBackRestStanza: true,
PgAliveCheckInterval: true,
PgStopBackupTimeout: true,
}

MongoAllowedSettings = map[string]bool{
Expand Down
156 changes: 116 additions & 40 deletions internal/databases/greenplum/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ type BackupWorkers struct {

// CurrBackupInfo holds all information that is harvest during the backup process
type CurrBackupInfo struct {
backupName string
segmentBackups map[string]*cluster.SegConfig
startTime time.Time
systemIdentifier *uint64
gpVersion semver.Version
segmentsMetadata map[string]PgSegmentMetaDto
backupName string
segmentBackups map[string]*cluster.SegConfig
startTime time.Time
systemIdentifier *uint64
gpVersion semver.Version
segmentsMetadata map[string]PgSegmentMetaDto
backupPidByContentID map[int]int
}

// BackupHandler is the main struct which is handling the backup process
Expand Down Expand Up @@ -123,8 +124,10 @@ func (bh *BackupHandler) buildBackupPushCommand(contentID int) string {
backupPushArgsLine,
// pass the config file location
fmt.Sprintf("--config=%s", internal.CfgFile),
// forward STDOUT& STDERR to log file
">>", formatSegmentLogPath(contentID), "2>&1 &",
// forward stdout and stderr to the log file
"&>>", formatSegmentLogPath(contentID),
// run in the background and get the launched process PID
"& echo $!",
}

cmdLine := strings.Join(cmd, " ")
Expand All @@ -140,7 +143,6 @@ func (bh *BackupHandler) HandleBackupPush() {

err := bh.checkPrerequisites()
tracelog.ErrorLogger.FatalfOnError("Backup prerequisites check failed: %v\n", err)
bh.disconnect()

tracelog.InfoLogger.Println("Running wal-g on segments")
remoteOutput := bh.globalCluster.GenerateAndExecuteCommand("Running wal-g",
Expand All @@ -152,27 +154,31 @@ func (bh *BackupHandler) HandleBackupPush() {
return "Unable to run wal-g"
}, true)

for _, command := range remoteOutput.Commands {
if command.Stderr != "" {
tracelog.ErrorLogger.Printf("stderr (segment %d):\n%s\n", command.Content, command.Stderr)
}
}

bh.currBackupInfo.backupPidByContentID, err = extractBackupPids(remoteOutput)
// this is a non-critical error since backup PIDs are only useful if backup is aborted
tracelog.ErrorLogger.PrintOnError(err)
if remoteOutput.NumErrors > 0 {
bh.abortBackup()
}

// WAL-G will reconnect later
bh.disconnect()

// wait for segments to complete their backups
waitBackupsErr := bh.waitSegmentBackups()
if waitBackupsErr != nil {
tracelog.ErrorLogger.Printf("Segment backups wait error: %v", waitBackupsErr)
}

tracelog.ErrorLogger.FatalfOnError("Failed to connect to the greenplum master: %v",
bh.connect())

if remoteOutput.NumErrors > 0 || waitBackupsErr != nil {
// handle the backup failure and exit
err := bh.handleBackupError()
if err != nil {
tracelog.WarningLogger.Printf("Non-critical error during terminating of the failed backup: %v", err)
}
tracelog.InfoLogger.Fatalf("Encountered one or more errors during the backup-push. See %s for a complete list of errors.",
gplog.GetLogFilePath())
}

for _, command := range remoteOutput.Commands {
tracelog.InfoLogger.Printf("WAL-G output (segment %d):\n%s\n", command.Content, command.Stderr)
if waitBackupsErr != nil {
bh.abortBackup()
}

restoreLSNs, err := createRestorePoint(bh.workers.Conn, bh.currBackupInfo.backupName)
Expand Down Expand Up @@ -238,14 +244,32 @@ func checkBackupStates(states map[int]SegBackupState) (int, error) {
}
runningBackupsCount++

case FailedBackupStatus:
return 0, fmt.Errorf("backup failed on segment %d at %s", contentID, state.TS)
case FailedBackupStatus, InterruptedBackupStatus:
return 0, fmt.Errorf("unexpected backup status: %s on segment %d at %s", state.Status, contentID, state.TS)
}
}

return runningBackupsCount, nil
}

func extractBackupPids(output *cluster.RemoteOutput) (map[int]int, error) {
backupPids := make(map[int]int)
var resErr error

for _, command := range output.Commands {
pid, err := strconv.Atoi(strings.TrimSpace(command.Stdout))
if err != nil {
resErr = fmt.Errorf("%w; failed to parse the backup PID: %v", resErr, err)
continue
}

backupPids[command.Content] = pid
}

tracelog.InfoLogger.Printf("WAL-G segment PIDs: %v", backupPids)
return backupPids, resErr
}

func (bh *BackupHandler) pollSegmentStates() (map[int]SegBackupState, error) {
segmentStates := make(map[int]SegBackupState)
remoteOutput := bh.globalCluster.GenerateAndExecuteCommand("Polling the segment backup-push statuses...",
Expand Down Expand Up @@ -320,21 +344,6 @@ func (bh *BackupHandler) checkPrerequisites() (err error) {
return nil
}

func (bh *BackupHandler) handleBackupError() error {
// Abort the non-finished exclusive backups on the segments.
// WAL-G in GP7+ uses the non-exclusive backups, that are terminated on connection close, so this is unnecessary.
if bh.currBackupInfo.gpVersion.Major < 7 {
tracelog.InfoLogger.Println("Terminating the running exclusive backups...")
queryRunner, err := NewGpQueryRunner(bh.workers.Conn)
if err != nil {
return err
}
return queryRunner.AbortBackup()
}

return nil
}

// nolint:gocritic
func (bh *BackupHandler) uploadSentinel(sentinelDto BackupSentinelDto) (err error) {
tracelog.InfoLogger.Println("Uploading sentinel file")
Expand Down Expand Up @@ -492,3 +501,70 @@ func (bh *BackupHandler) fetchSingleMetadata(backupID string, segCfg *cluster.Se

return &meta, nil
}

func (bh *BackupHandler) abortBackup() {
err := bh.terminateRunningBackups()
if err != nil {
tracelog.WarningLogger.Printf("Failed to stop running backups: %v", err)
}

err = bh.terminateWalgProcesses()
if err != nil {
tracelog.WarningLogger.Printf("Failed to shutdown the running WAL-G processes: %v", err)
}

tracelog.InfoLogger.Fatalf("Encountered one or more errors during the backup-push. See %s for a complete list of errors.",
gplog.GetLogFilePath())
}

func (bh *BackupHandler) terminateRunningBackups() error {
// Abort the non-finished exclusive backups on the segments.
// WAL-G in GP7+ uses the non-exclusive backups, that are terminated on connection close, so this is unnecessary.
if bh.currBackupInfo.gpVersion.Major >= 7 {
return nil
}

tracelog.InfoLogger.Println("Terminating the running exclusive backups...")
queryRunner, err := NewGpQueryRunner(bh.workers.Conn)
if err != nil {
return err
}
return queryRunner.AbortBackup()
}

func (bh *BackupHandler) terminateWalgProcesses() error {
knownPidsLen := len(bh.currBackupInfo.backupPidByContentID)
if knownPidsLen == 0 {
return fmt.Errorf("there are no known PIDs of WAL-G segment processess")
}

if knownPidsLen < len(bh.globalCluster.ContentIDs) {
tracelog.WarningLogger.Printf("Known PIDs count (%d) is less than the total segments number (%d)",
knownPidsLen, len(bh.globalCluster.ContentIDs))
}

remoteOutput := bh.globalCluster.GenerateAndExecuteCommand("Terminating the segment backup-push processes...",
cluster.ON_SEGMENTS|cluster.EXCLUDE_MIRRORS|cluster.INCLUDE_MASTER,
func(contentID int) string {
backupPid, ok := bh.currBackupInfo.backupPidByContentID[contentID]
if !ok {
return ""
}

return fmt.Sprintf("kill %d", backupPid)
})

bh.globalCluster.CheckClusterError(remoteOutput, "Unable to terminate backup-push processes", func(contentID int) string {
return fmt.Sprintf("Unable to terminate backup-push process on segment %d", contentID)
}, true)

for _, command := range remoteOutput.Commands {
if command.Stderr == "" {
continue
}

tracelog.WarningLogger.Printf("Unable to terminate backup-push process (segment %d):\n%s\n", command.Content, command.Stderr)
}

return nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package greenplum_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/greenplum"
"github.com/wal-g/wal-g/testtools"
"testing"
"time"
)

func init() {
Expand Down Expand Up @@ -68,7 +69,7 @@ func TestFetch(t *testing.T) {
isEqualTimeFinish := expectedResult.FinishTime.Equal(actualResult.FinishTime)
assert.True(t, isEqualTimeFinish)

// since assert.Equal doesn't compare time properly, just assign the actual to the expected time
// since assert.Equal doesn't compare time properly, just assign the actual to the expected time
expectedResult.StartTime = actualResult.StartTime
expectedResult.FinishTime = actualResult.FinishTime

Expand Down
22 changes: 18 additions & 4 deletions internal/databases/greenplum/segment_backup_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

"github.com/wal-g/tracelog"
Expand Down Expand Up @@ -62,15 +64,17 @@ func (r *SegBackupRunner) Run() {
done <- cmd.Wait()
}()

err = r.waitBackup(done)
err = r.waitBackup(cmd, done)
tracelog.ErrorLogger.FatalOnError(err)
}

func (r *SegBackupRunner) waitBackup(doneCh chan error) error {
func (r *SegBackupRunner) waitBackup(cmd *exec.Cmd, doneCh chan error) error {
ticker := time.NewTicker(r.stateUpdateInterval)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

for {
status, err := checkBackupStatus(ticker, doneCh)
status, err := checkBackupStatus(ticker, doneCh, sigCh)
saveErr := writeBackupState(SegBackupState{Status: status, TS: time.Now()}, r.contentID, r.backupName)
if saveErr != nil {
tracelog.WarningLogger.Printf("Failed to update the backup status file: %v", saveErr)
Expand All @@ -86,12 +90,18 @@ func (r *SegBackupRunner) waitBackup(doneCh chan error) error {
return nil
case FailedBackupStatus:
return fmt.Errorf("backup-push failed: %v", err)
case InterruptedBackupStatus:
// on receiving a SIGTERM, also broadcast it to the backup process
if termErr := cmd.Process.Signal(syscall.SIGTERM); termErr != nil {
tracelog.ErrorLogger.Printf("failed to send SIGTERM to the backup process: %v", termErr)
}
return fmt.Errorf("backup-push terminated")
}
}
}

// TODO: unit tests
func checkBackupStatus(ticker *time.Ticker, doneCh chan error) (SegBackupStatus, error) {
func checkBackupStatus(ticker *time.Ticker, doneCh chan error, sigCh chan os.Signal) (SegBackupStatus, error) {
select {
case <-ticker.C:
tracelog.DebugLogger.Printf("Tick")
Expand All @@ -103,6 +113,10 @@ func checkBackupStatus(ticker *time.Ticker, doneCh chan error) (SegBackupStatus,
}

return SuccessBackupStatus, nil

case sig := <-sigCh:
tracelog.ErrorLogger.Printf("Received signal: %s, terminating the running backup...", sig)
return InterruptedBackupStatus, nil
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/databases/greenplum/segment_backup_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func FormatSegmentStateFolderPath(contentID int) string {
type SegBackupStatus string

const (
RunningBackupStatus SegBackupStatus = "running"
FailedBackupStatus SegBackupStatus = "failed"
SuccessBackupStatus SegBackupStatus = "success"
RunningBackupStatus SegBackupStatus = "running"
FailedBackupStatus SegBackupStatus = "failed"
SuccessBackupStatus SegBackupStatus = "success"
InterruptedBackupStatus SegBackupStatus = "interrupted"
)

type SegBackupState struct {
Expand Down