Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve MySQL restore performance #1131

Merged
merged 20 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/fdb/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var backupPushCmd = &cobra.Command{

uploader, err := internal.ConfigureUploader()
tracelog.ErrorLogger.FatalOnError(err)
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath)
uploader.ChangeDirectory(utility.BaseBackupPath)

backupCmd, err := internal.GetCommandSetting(internal.NameStreamCreateCmd)
tracelog.ErrorLogger.FatalOnError(err)
Expand Down
5 changes: 3 additions & 2 deletions cmd/mongo/backup_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/mongo"
"github.com/wal-g/wal-g/utility"
)

Expand All @@ -32,8 +31,10 @@ var backupFetchCmd = &cobra.Command{
restoreCmd.Stdout = os.Stdout
restoreCmd.Stderr = os.Stderr

err = mongo.HandleBackupFetch(ctx, folder, args[0], restoreCmd)
backupSelector, err := internal.NewBackupNameSelector(args[0], true)
tracelog.ErrorLogger.FatalOnError(err)

internal.HandleBackupFetch(folder, backupSelector, internal.GetBackupToCommandFetcher(restoreCmd))
},
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/mongo/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ var backupPushCmd = &cobra.Command{
mongoClient, err := client.NewMongoClient(ctx, mongodbURL)
tracelog.ErrorLogger.FatalOnError(err)

uplProvider, err := internal.ConfigureUploader()
uplProvider, err := internal.ConfigureSplitUploader()
tracelog.ErrorLogger.FatalOnError(err)
uplProvider.UploadingFolder = uplProvider.UploadingFolder.GetSubFolder(utility.BaseBackupPath)
uplProvider.ChangeDirectory(utility.BaseBackupPath)

backupCmd, err := internal.GetCommandSettingContext(ctx, internal.NameStreamCreateCmd)
tracelog.ErrorLogger.FatalOnError(err)
backupCmd.Stderr = os.Stderr
uploader := archive.NewStorageUploader(uplProvider)
metaConstructor := archive.NewBackupMongoMetaConstructor(ctx, mongoClient, uplProvider.UploadingFolder, permanent)
metaConstructor := archive.NewBackupMongoMetaConstructor(ctx, mongoClient, uplProvider.Folder(), permanent)

err = mongo.HandleBackupPush(uploader, metaConstructor, backupCmd)
tracelog.ErrorLogger.FatalfOnError("Backup creation failed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
if err != nil {
return err
}
uplProvider.UploadingFolder = uplProvider.UploadingFolder.GetSubFolder(models.OplogArchBasePath)
uplProvider.ChangeDirectory(models.OplogArchBasePath)
uploader := archive.NewStorageUploader(uplProvider)

// set up mongodb client and oplog fetcher
Expand Down
4 changes: 3 additions & 1 deletion cmd/mysql/backup_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/internal/databases/mysql"
"github.com/wal-g/wal-g/utility"
)

const (
Expand All @@ -27,7 +28,8 @@ var (
tracelog.ErrorLogger.FatalOnError(err)
},
Run: func(cmd *cobra.Command, args []string) {
uploader, err := internal.ConfigureUploader()
uploader, err := internal.ConfigureSplitUploader()
uploader.ChangeDirectory(utility.BaseBackupPath)
tracelog.ErrorLogger.FatalOnError(err)
backupCmd, err := internal.GetCommandSetting(internal.NameStreamCreateCmd)
tracelog.ErrorLogger.FatalOnError(err)
Expand Down
17 changes: 13 additions & 4 deletions internal/backup_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"bytes"
"fmt"
"io"
"os/exec"

"github.com/wal-g/tracelog"
Expand All @@ -16,6 +17,8 @@ type BackupNonExistenceError struct {
error
}

type StreamFetcher = func(backup Backup, writeCloser io.WriteCloser) error

func NewBackupNonExistenceError(backupName string) BackupNonExistenceError {
return BackupNonExistenceError{errors.Errorf("Backup '%s' does not exist.", backupName)}
}
Expand All @@ -24,15 +27,21 @@ func (err BackupNonExistenceError) Error() string {
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
}

func GetCommandStreamFetcher(cmd *exec.Cmd) func(folder storage.Folder, backup Backup) {
// GetBackupToCommandFetcher returns function that copies all bytes from backup to cmd's stdin
func GetBackupToCommandFetcher(cmd *exec.Cmd) func(folder storage.Folder, backup Backup) {
return func(folder storage.Folder, backup Backup) {
stdin, err := cmd.StdinPipe()
tracelog.ErrorLogger.FatalfOnError("Failed to fetch backup: %v\n", err)
stderr := &bytes.Buffer{}
cmd.Stderr = stderr
err = cmd.Start()
tracelog.ErrorLogger.FatalfOnError("Failed to start restore command: %v\n", err)
err = downloadAndDecompressStream(backup, stdin)

fetcher, err := GetBackupStreamFetcher(backup)
tracelog.ErrorLogger.FatalfOnError("Failed to detect backup format: %v\n", err)

err = fetcher(backup, stdin)

cmdErr := cmd.Wait()
if err != nil || cmdErr != nil {
tracelog.ErrorLogger.Printf("Restore command output:\n%s", stderr.String())
Expand All @@ -58,7 +67,7 @@ func StreamBackupToCommandStdin(cmd *exec.Cmd, backup Backup) error {
if err != nil {
return fmt.Errorf("failed to start command: %v", err)
}
err = downloadAndDecompressStream(backup, stdin)
err = DownloadAndDecompressStream(backup, stdin)
if err != nil {
return errors.Wrap(err, "failed to download and decompress stream")
}
Expand All @@ -79,7 +88,7 @@ func HandleBackupFetch(folder storage.Folder,
fetcher func(folder storage.Folder, backup Backup)) {
backupName, err := targetBackupSelector.Select(folder)
tracelog.ErrorLogger.FatalOnError(err)
tracelog.DebugLogger.Printf("HandleBackupFetch(%s, folder,)\n", backupName)
tracelog.DebugLogger.Printf("HandleBackupFetch(%s)\n", backupName)
backup, err := GetBackupByName(backupName, utility.BaseBackupPath, folder)
tracelog.ErrorLogger.FatalfOnError("Failed to fetch backup: %v\n", err)

Expand Down
8 changes: 8 additions & 0 deletions internal/backup_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func SentinelNameFromBackup(backupName string) string {
return backupName + utility.SentinelSuffix
}

func MetadataNameFromBackup(backupName string) string {
return backupName + "/" + utility.MetadataFileName
}

func StreamMetadataNameFromBackup(backupName string) string {
return backupName + "/" + utility.StreamMetadataFileName
}

// UnwrapLatestModifier checks if LATEST is provided instead of backupName
// if so, replaces it with the name of the latest backup
func UnwrapLatestModifier(backupName string, folder storage.Folder) (string, error) {
Expand Down
13 changes: 13 additions & 0 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ const (
PrefetchDir = "WALG_PREFETCH_DIR"
PgReadyRename = "PG_READY_RENAME"
SerializerTypeSetting = "WALG_SERIALIZER_TYPE"
StreamSplitterPartitions = "WALG_STREAM_SPLITTER_PARTITIONS"
StreamSplitterBlockSize = "WALG_STREAM_SPLITTER_BLOCK_SIZE"

MongoDBUriSetting = "MONGODB_URI"
MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL"
Expand Down Expand Up @@ -164,6 +166,11 @@ var (
OplogArchiveTimeoutInterval: "60s",
OplogArchiveAfterSize: "16777216", // 32 << (10 * 2)
MongoDBLastWriteUpdateInterval: "3s",
StreamSplitterBlockSize: "1048576",
}

MysqlDefaultSettings = map[string]string{
StreamSplitterBlockSize: "1048576",
}

SQLServerDefaultSettings = map[string]string{
Expand Down Expand Up @@ -325,6 +332,8 @@ var (
OplogPushWaitForBecomePrimary: true,
OplogPushPrimaryCheckInterval: true,
OplogPITRDiscoveryInterval: true,
StreamSplitterBlockSize: true,
StreamSplitterPartitions: true,
}

SQLServerAllowedSettings = map[string]bool{
Expand All @@ -347,6 +356,8 @@ var (
MysqlBackupPrepareCmd: true,
MysqlTakeBinlogsFromMaster: true,
MysqlCheckGTIDs: true,
StreamSplitterPartitions: true,
StreamSplitterBlockSize: true,
}

RedisAllowedSettings = map[string]bool{
Expand Down Expand Up @@ -377,6 +388,8 @@ func ConfigureSettings(currentType string) {
dbSpecificDefaultSettings = PGDefaultSettings
case MONGO:
dbSpecificDefaultSettings = MongoDefaultSettings
case MYSQL:
dbSpecificDefaultSettings = MysqlDefaultSettings
case SQLSERVER:
dbSpecificDefaultSettings = SQLServerDefaultSettings
case GP:
Expand Down
18 changes: 18 additions & 0 deletions internal/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ func ConfigureUploaderWithoutCompressMethod() (uploader *Uploader, err error) {
return uploader, err
}

func ConfigureSplitUploader() (uploader UploaderProvider, err error) {
folder, err := ConfigureFolder()
if err != nil {
return nil, errors.Wrap(err, "failed to configure folder")
}

compressor, err := ConfigureCompressor()
if err != nil {
return nil, errors.Wrap(err, "failed to configure compression")
}

var partitions = viper.GetInt(StreamSplitterPartitions)
var blockSize = viper.GetSizeInBytes(StreamSplitterBlockSize)

uploader = NewSplitStreamUploader(compressor, folder, partitions, int(blockSize))
return uploader, err
}

// ConfigureCrypter uses environment variables to create and configure a crypter.
// In case no configuration in environment variables found, return `<nil>` value.
func ConfigureCrypter() crypto.Crypter {
Expand Down
2 changes: 1 addition & 1 deletion internal/databases/fdb/backup_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ func HandleBackupFetch(ctx context.Context,
folder storage.Folder,
targetBackupSelector internal.BackupSelector,
restoreCmd *exec.Cmd) {
internal.HandleBackupFetch(folder, targetBackupSelector, internal.GetCommandStreamFetcher(restoreCmd))
internal.HandleBackupFetch(folder, targetBackupSelector, internal.GetBackupToCommandFetcher(restoreCmd))
}
17 changes: 0 additions & 17 deletions internal/databases/mongo/backup_fetch_handler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1 @@
package mongo

import (
"context"
"os/exec"

"github.com/wal-g/wal-g/internal"
"github.com/wal-g/wal-g/pkg/storages/storage"
"github.com/wal-g/wal-g/utility"
)

func HandleBackupFetch(ctx context.Context, folder storage.Folder, backupName string, restoreCmd *exec.Cmd) error {
backup, err := internal.GetBackupByName(backupName, utility.BaseBackupPath, folder)
if err != nil {
return err
}
return internal.StreamBackupToCommandStdin(restoreCmd, backup)
}
4 changes: 3 additions & 1 deletion internal/databases/mysql/backup_fetch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ func HandleBackupFetch(folder storage.Folder,
targetBackupSelector internal.BackupSelector,
restoreCmd *exec.Cmd,
prepareCmd *exec.Cmd) {
internal.HandleBackupFetch(folder, targetBackupSelector, internal.GetCommandStreamFetcher(restoreCmd))
internal.HandleBackupFetch(folder, targetBackupSelector, internal.GetBackupToCommandFetcher(restoreCmd))

// Prepare Backup
if prepareCmd != nil {
err := prepareCmd.Run()
tracelog.ErrorLogger.FatalfOnError("failed to prepare fetched backup: %v", err)
Expand Down
1 change: 1 addition & 0 deletions internal/databases/mysql/backup_list_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type BackupDetail struct {
UserData interface{} `json:"user_data,omitempty"`
}

//nolint:gocritic,hugeParam
func NewBackupDetail(backupTime internal.BackupTime, sentinel StreamSentinelDto) BackupDetail {
return BackupDetail{
BackupName: backupTime.BackupName,
Expand Down
4 changes: 1 addition & 3 deletions internal/databases/mysql/backup_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (
"github.com/wal-g/wal-g/utility"
)

func HandleBackupPush(uploader *internal.Uploader, backupCmd *exec.Cmd, isPermanent bool, userDataRaw string) {
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath)

func HandleBackupPush(uploader internal.UploaderProvider, backupCmd *exec.Cmd, isPermanent bool, userDataRaw string) {
db, err := getMySQLConnection()
tracelog.ErrorLogger.FatalOnError(err)
defer utility.LoggedClose(db, "")
Expand Down
8 changes: 4 additions & 4 deletions internal/databases/mysql/binlog_push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type LogsCache struct {
}

//gocyclo:ignore
func HandleBinlogPush(uploader *internal.Uploader, untilBinlog string, checkGTIDs bool) {
rootFolder := uploader.UploadingFolder
uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(BinlogPath)
func HandleBinlogPush(uploader internal.UploaderProvider, untilBinlog string, checkGTIDs bool) {
rootFolder := uploader.Folder()
uploader.ChangeDirectory(BinlogPath)

db, err := getMySQLConnection()
tracelog.ErrorLogger.FatalOnError(err)
Expand Down Expand Up @@ -134,7 +134,7 @@ func getMySQLBinlogsFolder(db *sql.DB) (string, error) {
return path.Dir(logBinBasename), nil
}

func archiveBinLog(uploader *internal.Uploader, dataDir string, binLog string) error {
func archiveBinLog(uploader internal.UploaderProvider, dataDir string, binLog string) error {
tracelog.InfoLogger.Printf("Archiving %v\n", binLog)

filename := path.Join(dataDir, binLog)
Expand Down
1 change: 1 addition & 0 deletions internal/databases/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type StreamSentinelDto struct {

IsPermanent bool `json:"IsPermanent,omitempty"`
UserData interface{} `json:"UserData,omitempty"`

//todo: add other fields from internal.GenericMetadata
}

Expand Down
21 changes: 21 additions & 0 deletions internal/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,27 @@ func (e EmptyWriteIgnorer) Write(p []byte) (int, error) {
return e.WriteCloser.Write(p)
}

type DevNullWriter struct {
io.WriteCloser
statPrinter sync.Once
totalBytes int64
}

func (e *DevNullWriter) Write(p []byte) (int, error) {
e.statPrinter.Do(func() {
go func() {
for {
time.Sleep(1 * time.Second)
tracelog.ErrorLogger.Printf("/dev/null size %d", e.totalBytes)
}
}()
})
e.totalBytes += int64(len(p))
return len(p), nil
}

var _ io.Writer = &DevNullWriter{}

// TODO : unit tests
// Extract exactly one tar bundle.
func extractOne(tarInterpreter TarInterpreter, source io.Reader) error {
Expand Down
7 changes: 3 additions & 4 deletions internal/fetch_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
"os/user"
"path/filepath"

"github.com/wal-g/wal-g/internal/ioextensions"

"github.com/pkg/errors"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal/compression"
"github.com/wal-g/wal-g/internal/ioextensions"
"github.com/wal-g/wal-g/pkg/storages/storage"
"github.com/wal-g/wal-g/utility"
)
Expand Down Expand Up @@ -53,8 +52,8 @@ func DownloadFile(folder storage.Folder, filename, ext string, writeCloser io.Wr
return nil
}

func TryDownloadFile(folder storage.Folder, path string) (walFileReader io.ReadCloser, exists bool, err error) {
walFileReader, err = folder.ReadObject(path)
func TryDownloadFile(folder storage.Folder, path string) (fileReader io.ReadCloser, exists bool, err error) {
fileReader, err = folder.ReadObject(path)
if err == nil {
exists = true
return
Expand Down