Skip to content

Commit

Permalink
Merge pull request #1154 from wal-g/until-binlog-last-modified-time
Browse files Browse the repository at this point in the history
Add `--until-binlog-last-modified-time` option to `wal-g-mysql binlog…
  • Loading branch information
mialinx committed Nov 22, 2021
2 parents 0841410 + 1d6b0e3 commit 700900a
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 16 deletions.
9 changes: 8 additions & 1 deletion cmd/mysql/binlog_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (

const fetchSinceFlagShortDescr = "backup name starting from which you want to fetch binlogs"
const fetchUntilFlagShortDescr = "time in RFC3339 for PITR"
const fetchUntilBinlogLastModifiedFlagShortDescr = "time in RFC3339 that is used to prevent wal-g from replaying" +
" binlogs that was created/modified after this time"

var fetchBackupName string
var fetchUntilTS string
var fetchUntilBinlogLastModifiedTS string

// binlogPushCmd represents the cron command
var binlogFetchCmd = &cobra.Command{
Expand All @@ -24,7 +27,7 @@ var binlogFetchCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
mysql.HandleBinlogFetch(folder, fetchBackupName, fetchUntilTS)
mysql.HandleBinlogFetch(folder, fetchBackupName, fetchUntilTS, fetchUntilBinlogLastModifiedTS)
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
internal.RequiredSettings[internal.MysqlBinlogDstSetting] = true
Expand All @@ -39,5 +42,9 @@ func init() {
"until",
utility.TimeNowCrossPlatformUTC().Format(time.RFC3339),
fetchUntilFlagShortDescr)
binlogFetchCmd.PersistentFlags().StringVar(&fetchUntilBinlogLastModifiedTS,
"until-binlog-last-modified-time",
"",
fetchUntilBinlogLastModifiedFlagShortDescr)
cmd.AddCommand(binlogFetchCmd)
}
7 changes: 6 additions & 1 deletion cmd/mysql/binlog_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (

const replaySinceFlagShortDescr = "backup name starting from which you want to fetch binlogs"
const replayUntilFlagShortDescr = "time in RFC3339 for PITR"
const replayUntilBinlogLastModifiedFlagShortDescr = "time in RFC3339 that is used to prevent wal-g from replaying" +
" binlogs that was created/modified after this time"

var replayBackupName string
var replayUntilTS string
var replayUntilBinlogLastModifiedTS string

var binlogReplayCmd = &cobra.Command{
Use: "binlog-replay",
Expand All @@ -23,7 +26,7 @@ var binlogReplayCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
folder, err := internal.ConfigureFolder()
tracelog.ErrorLogger.FatalOnError(err)
mysql.HandleBinlogReplay(folder, replayBackupName, replayUntilTS)
mysql.HandleBinlogReplay(folder, replayBackupName, replayUntilTS, replayUntilBinlogLastModifiedTS)
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
internal.RequiredSettings[internal.MysqlBinlogReplayCmd] = true
Expand All @@ -36,5 +39,7 @@ func init() {
binlogReplayCmd.PersistentFlags().StringVar(&replayBackupName, "since", "LATEST", replaySinceFlagShortDescr)
binlogReplayCmd.PersistentFlags().StringVar(&replayUntilTS, "until",
utility.TimeNowCrossPlatformUTC().Format(time.RFC3339), replayUntilFlagShortDescr)
binlogReplayCmd.PersistentFlags().StringVar(&replayUntilBinlogLastModifiedTS, "until-binlog-last-modified-time",
"", replayUntilBinlogLastModifiedFlagShortDescr)
cmd.AddCommand(binlogReplayCmd)
}
13 changes: 13 additions & 0 deletions docs/MySQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ or
wal-g binlog-fetch --since LATEST --until "2006-01-02T15:04:05Z07:00"
```

You can stop wal-g from fetching newly created/modified binlogs by specifying `--until-binlog-last-modified-time` option.
This may be useful to achieve exact clones of the same database in scenarios when new binlogs are uploaded concurrently whith your restore process.

```bash
wal-g binlog-replay --since LATEST --until "2006-01-02T15:04:05Z07:00" --until-binlog-last-modified-time "2006-01-02T15:04:05Z07:00"
```

### ``binlog-replay``

Fetches binlogs from storage and passes them to `WALG_MYSQL_BINLOG_REPLAY_COMMAND` to replay on running MySQL server.
Expand All @@ -161,6 +168,12 @@ or
wal-g binlog-replay --since LATEST --until "2006-01-02T15:04:05Z07:00"
```

You can stop wal-g from applying newly created/modified binlogs by specifying `--until-binlog-last-modified-time` option.
This may be useful to achieve exact clones of the same database in scenarios when new binlogs are uploaded concurrently whith your restore process.

```bash
wal-g binlog-replay --since LATEST --until "2006-01-02T15:04:05Z07:00" --until-binlog-last-modified-time "2006-01-02T15:04:05Z07:00"
```

Typical configurations
-----
Expand Down
6 changes: 3 additions & 3 deletions internal/databases/mysql/binlog_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ func (ih *indexHandler) createIndexFile() error {
return nil
}

func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string) {
func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string, untilBinlogLastModifiedTS string) {
dstDir, err := internal.GetLogsDstSettings(internal.MysqlBinlogDstSetting)
tracelog.ErrorLogger.FatalOnError(err)

startTS, endTS, err := getTimestamps(folder, backupName, untilTS)
startTS, endTS, endBinlogTS, err := getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS)
tracelog.ErrorLogger.FatalOnError(err)

handler := newIndexHandler(dstDir)

tracelog.InfoLogger.Printf("Fetching binlogs since %s until %s", startTS, endTS)
err = fetchLogs(folder, dstDir, startTS, endTS, handler)
err = fetchLogs(folder, dstDir, startTS, endTS, endBinlogTS, handler)
tracelog.ErrorLogger.FatalfOnError("Failed to fetch binlogs: %v", err)

err = handler.createIndexFile()
Expand Down
21 changes: 13 additions & 8 deletions internal/databases/mysql/binlog_replay_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,42 @@ func (rh *replayHandler) handleBinlog(binlogPath string) error {
}
}

func HandleBinlogReplay(folder storage.Folder, backupName string, untilTS string) {
func HandleBinlogReplay(folder storage.Folder, backupName string, untilTS string, untilBinlogLastModifiedTS string) {
dstDir, err := internal.GetLogsDstSettings(internal.MysqlBinlogDstSetting)
tracelog.ErrorLogger.FatalOnError(err)

startTS, endTS, err := getTimestamps(folder, backupName, untilTS)
startTS, endTS, endBinlogTS, err := getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS)
tracelog.ErrorLogger.FatalOnError(err)

handler := newReplayHandler(endTS)

tracelog.InfoLogger.Printf("Fetching binlogs since %s until %s", startTS, endTS)
err = fetchLogs(folder, dstDir, startTS, endTS, handler)
err = fetchLogs(folder, dstDir, startTS, endTS, endBinlogTS, handler)
tracelog.ErrorLogger.FatalfOnError("Failed to fetch binlogs: %v", err)

err = handler.wait()
tracelog.ErrorLogger.FatalfOnError("Failed to apply binlogs: %v", err)
}

func getTimestamps(folder storage.Folder, backupName, untilTS string) (time.Time, time.Time, error) {
func getTimestamps(folder storage.Folder, backupName, untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) {
backup, err := internal.GetBackupByName(backupName, utility.BaseBackupPath, folder)
if err != nil {
return time.Time{}, time.Time{}, errors.Wrap(err, "Unable to get backup")
return time.Time{}, time.Time{}, time.Time{}, errors.Wrap(err, "Unable to get backup")
}

startTS, err := getBinlogSinceTS(folder, backup)
if err != nil {
return time.Time{}, time.Time{}, err
return time.Time{}, time.Time{}, time.Time{}, err
}

endTS, err := utility.ParseUntilTS(untilTS)
if err != nil {
return time.Time{}, time.Time{}, err
return time.Time{}, time.Time{}, time.Time{}, err
}
return startTS, endTS, nil

endBinlogTS, err := utility.ParseUntilTS(untilBinlogLastModifiedTS)
if err != nil {
return time.Time{}, time.Time{}, time.Time{}, err
}
return startTS, endTS, endBinlogTS, nil
}
9 changes: 6 additions & 3 deletions internal/databases/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,12 @@ type binlogHandler interface {
handleBinlog(binlogPath string) error
}

func fetchLogs(folder storage.Folder, dstDir string, startTS time.Time, endTS time.Time, handler binlogHandler) error {
func fetchLogs(folder storage.Folder, dstDir string, startTS, endTS, endBinlogTS time.Time, handler binlogHandler) error {
logFolder := folder.GetSubFolder(BinlogPath)
includeStart := true
outer:
for {
logsToFetch, err := getLogsCoveringInterval(logFolder, startTS, includeStart)
logsToFetch, err := getLogsCoveringInterval(logFolder, startTS, includeStart, endBinlogTS)
includeStart = false
if err != nil {
return err
Expand Down Expand Up @@ -261,7 +261,7 @@ func getBinlogSinceTS(folder storage.Folder, backup internal.Backup) (time.Time,
}

// getLogsCoveringInterval lists the operation logs that cover the interval
func getLogsCoveringInterval(folder storage.Folder, start time.Time, includeStart bool) ([]storage.Object, error) {
func getLogsCoveringInterval(folder storage.Folder, start time.Time, includeStart bool, endBinlogTS time.Time) ([]storage.Object, error) {
logFiles, _, err := folder.ListFolder()
if err != nil {
return nil, err
Expand All @@ -271,6 +271,9 @@ func getLogsCoveringInterval(folder storage.Folder, start time.Time, includeStar
})
var logsToFetch []storage.Object
for _, logFile := range logFiles {
if logFile.GetLastModified().After(endBinlogTS) {
continue // don't fetch binlogs from future
}
if start.Before(logFile.GetLastModified()) || includeStart && start.Equal(logFile.GetLastModified()) {
logsToFetch = append(logsToFetch, logFile)
}
Expand Down

0 comments on commit 700900a

Please sign in to comment.