From a6372a6d78e0d83140655f011384e7639a5f7b90 Mon Sep 17 00:00:00 2001 From: akulin Date: Tue, 9 Jul 2019 12:46:43 +0500 Subject: [PATCH] mongodb extension (#303) * Implementation of operations to manage MongoDB backups: - stream-push - stream-fetch - oplog-push - delete - backup-list * Added docker configuration and integration tests for push commands * make subfolder for oplog.bson to restore more comfortable * integration test for delete command * Integration tests improvements: -test oplog deletion -remove oplog dump dir after test * move HandleStreamFetch and DownloadAndDecompressStream to internal helper * descriptions of internal functions * -race option for build in docker --- Makefile | 15 ++- cmd/mongo/backup_list.go | 27 +++++ cmd/mongo/delete.go | 71 +++++++++++ cmd/mongo/mongo.go | 37 ++++++ cmd/mongo/oplog_push.go | 29 +++++ cmd/mongo/stream_fetch.go | 29 +++++ cmd/mongo/stream_push.go | 28 +++++ cmd/mysql/delete.go | 21 +--- docker-compose.yml | 24 ++++ docker/mongo/Dockerfile | 6 + docker/mongo/mongodb.conf | 17 +++ docker/mongo_tests/Dockerfile | 23 ++++ .../scripts/run_integration_tests.sh | 7 ++ .../mongo_tests/scripts/tests/delete_test.sh | 81 +++++++++++++ .../scripts/tests/oplog_push_test.sh | 49 ++++++++ .../tests/pushing_and_fetching_stream_test.sh | 44 +++++++ internal/backup.go | 19 ++- internal/backup_push_handler.go | 7 +- internal/databases/mongo/mongo.go | 28 +++++ .../databases/mongo/oplog_push_handler.go | 24 ++++ .../databases/mongo/stream_fetch_handler.go | 58 +++++++++ .../databases/mongo/stream_push_handler.go | 32 +++++ internal/databases/mysql/mysql.go | 11 +- .../databases/mysql/stream_fetch_handler.go | 14 +-- .../databases/mysql/stream_push_handler.go | 43 ++----- internal/stream_fetch_helper.go | 112 ++++++++++++++++++ internal/stream_push_helper.go | 44 +++++++ main/mongo/main.go | 9 ++ utility/utility.go | 19 ++- 29 files changed, 847 insertions(+), 81 deletions(-) create mode 100644 cmd/mongo/backup_list.go create mode 100644 cmd/mongo/delete.go create mode 100644 cmd/mongo/mongo.go create mode 100644 cmd/mongo/oplog_push.go create mode 100644 cmd/mongo/stream_fetch.go create mode 100644 cmd/mongo/stream_push.go create mode 100644 docker/mongo/Dockerfile create mode 100644 docker/mongo/mongodb.conf create mode 100644 docker/mongo_tests/Dockerfile create mode 100755 docker/mongo_tests/scripts/run_integration_tests.sh create mode 100755 docker/mongo_tests/scripts/tests/delete_test.sh create mode 100755 docker/mongo_tests/scripts/tests/oplog_push_test.sh create mode 100755 docker/mongo_tests/scripts/tests/pushing_and_fetching_stream_test.sh create mode 100644 internal/databases/mongo/mongo.go create mode 100644 internal/databases/mongo/oplog_push_handler.go create mode 100644 internal/databases/mongo/stream_fetch_handler.go create mode 100644 internal/databases/mongo/stream_push_handler.go create mode 100644 internal/stream_fetch_helper.go create mode 100644 internal/stream_push_helper.go create mode 100644 main/mongo/main.go diff --git a/Makefile b/Makefile index 8dc756458..f93ca2c7c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ MAIN_PG_PATH := main/pg MAIN_MYSQL_PATH := main/mysql MAIN_REDIS_PATH := main/redis +MAIN_MONGO_PATH := main/mongo DOCKER_COMMON := golang ubuntu s3 CMD_FILES = $(wildcard wal-g/*.go) PKG_FILES = $(wildcard internal/**/*.go internal/**/**/*.go internal/*.go) @@ -14,7 +15,7 @@ ifdef GOTAGS override GOTAGS := -tags $(GOTAGS) endif -test: install deps lint unittest pg_build mysql_build redis_build unlink_brotli pg_integration_test mysql_integration_test redis_integration_test +test: install deps lint unittest pg_build mysql_build redis_build mongo_build unlink_brotli pg_integration_test mysql_integration_test redis_integration_test mongo_integration_test pg_test: install deps pg_build lint unittest unlink_brotli pg_integration_test @@ -52,6 +53,18 @@ mysql_clean: mysql_install: mysql_build mv $(MAIN_MYSQL_PATH)/wal-g $(GOBIN)/wal-g +mongo_test: install deps mongo_build lint unittest unlink_brotli mongo_integration_test + +mongo_build: $(CMD_FILES) $(PKG_FILES) + (cd $(MAIN_MONGO_PATH) && go build -o wal-g $(GOTAGS) -ldflags "-s -w -X github.com/wal-g/wal-g/cmd.BuildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X github.com/wal-g/wal-g/cmd.GitRevision=`git rev-parse --short HEAD` -X github.com/wal-g/wal-g/cmd.WalgVersion=`git tag -l --points-at HEAD`") + +mongo_install: mongo_build + mv $(MAIN_MONGO_PATH)/wal-g $(GOBIN)/wal-g + +mongo_integration_test: + docker-compose build $(DOCKER_COMMON) mongo mongo_tests + docker-compose up --exit-code-from mongo_tests mongo_tests + redis_test: install deps redis_build lint unittest unlink_brotli redis_integration_test redis_build: $(CMD_FILES) $(PKG_FILES) diff --git a/cmd/mongo/backup_list.go b/cmd/mongo/backup_list.go new file mode 100644 index 000000000..38884be89 --- /dev/null +++ b/cmd/mongo/backup_list.go @@ -0,0 +1,27 @@ +package mongo + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/tracelog" +) + +const BackupListShortDescription = "Prints available backups" + +// backupListCmd represents the backupList command +var backupListCmd = &cobra.Command{ + Use: "backup-list", + Short: BackupListShortDescription, // TODO : improve description + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + folder, err := internal.ConfigureFolder() + if err != nil { + tracelog.ErrorLogger.FatalError(err) + } + internal.HandleBackupList(folder) + }, +} + +func init() { + MongoCmd.AddCommand(backupListCmd) +} diff --git a/cmd/mongo/delete.go b/cmd/mongo/delete.go new file mode 100644 index 000000000..95786a158 --- /dev/null +++ b/cmd/mongo/delete.go @@ -0,0 +1,71 @@ +package mongo + +import ( + "github.com/spf13/cobra" + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/storages/storage" + "github.com/wal-g/wal-g/internal/tracelog" + "github.com/wal-g/wal-g/utility" +) + +var confirmed = false + +// deleteCmd represents the delete command +var deleteCmd = &cobra.Command{ + Use: "delete", + Short: "Clears old backups and oplog", +} + +var deleteBeforeCmd = &cobra.Command{ + Use: "before backup_name|timestamp", // TODO : improve description + Example: internal.DeleteBeforeExamples, + Args: internal.DeleteBeforeArgsValidator, + Run: runDeleteBefore, +} + +var deleteRetainCmd = &cobra.Command{ + Use: "retain backup_count", // TODO : improve description + Example: internal.DeleteRetainExamples, + ValidArgs: internal.StringModifiers, + Args: internal.DeleteRetainArgsValidator, + Run: runDeleteRetain, +} + +func runDeleteBefore(cmd *cobra.Command, args []string) { + folder, err := internal.ConfigureFolder() + if err != nil { + tracelog.ErrorLogger.FatalError(err) + } + + internal.HandleDeleteBefore(folder, args, confirmed, isFullBackup, GetLessFunc(folder)) +} + +func runDeleteRetain(cmd *cobra.Command, args []string) { + folder, err := internal.ConfigureFolder() + if err != nil { + tracelog.ErrorLogger.FatalError(err) + } + + internal.HandleDeleteRetain(folder, args, confirmed, isFullBackup, GetLessFunc(folder)) +} + +func isFullBackup(object storage.Object) bool { + return true +} + +func init() { + MongoCmd.AddCommand(deleteCmd) + deleteCmd.AddCommand(deleteBeforeCmd, deleteRetainCmd) + deleteCmd.PersistentFlags().BoolVar(&confirmed, internal.ConfirmFlag, false, "Confirms backup deletion") +} + +func GetLessFunc(folder storage.Folder) func(object1, object2 storage.Object) bool { + return func(object1, object2 storage.Object) bool { + time1, ok1 := utility.TryFetchTimeRFC3999(object1.GetName()) + time2, ok2 := utility.TryFetchTimeRFC3999(object2.GetName()) + if !ok1 || !ok2 { + return object2.GetLastModified().After(object1.GetLastModified()) + } + return time1 < time2 + } +} diff --git a/cmd/mongo/mongo.go b/cmd/mongo/mongo.go new file mode 100644 index 000000000..255e1d04f --- /dev/null +++ b/cmd/mongo/mongo.go @@ -0,0 +1,37 @@ +package mongo + +import ( + "fmt" + "os" + "strings" + + "github.com/spf13/cobra" + "github.com/wal-g/wal-g/internal" +) + +var MongoDBShortDescription = "MongoDB backup tool" + +// These variables are here only to show current version. They are set in makefile during build process +var WalgVersion = "devel" +var GitRevision = "devel" +var BuildDate = "devel" + +var MongoCmd = &cobra.Command{ + Use: "wal-g", + Short: MongoDBShortDescription, // TODO : improve description + Version: strings.Join([]string{WalgVersion, GitRevision, BuildDate, "MongoDB"}, "\t"), +} + +func Execute() { + if err := MongoCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} + +func init() { + cobra.OnInitialize(internal.InitConfig, internal.Configure) + + MongoCmd.PersistentFlags().StringVar(&internal.CfgFile, "config", "", "config file (default is $HOME/.wal-g.yaml)") + MongoCmd.InitDefaultVersionFlag() +} diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go new file mode 100644 index 000000000..5d20cd2f2 --- /dev/null +++ b/cmd/mongo/oplog_push.go @@ -0,0 +1,29 @@ +package mongo + +import ( + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/databases/mongo" + "github.com/wal-g/wal-g/internal/tracelog" + + "github.com/spf13/cobra" +) + +const oplogPushShortDescription = "" + +// oplogPushCmd represents the cron command +var oplogPushCmd = &cobra.Command{ + Use: "oplog-push", + Short: oplogPushShortDescription, + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + uploader, err := internal.ConfigureUploader() + if err != nil { + tracelog.ErrorLogger.FatalError(err) + } + mongo.HandleOplogPush(&mongo.Uploader{Uploader: uploader}) + }, +} + +func init() { + MongoCmd.AddCommand(oplogPushCmd) +} diff --git a/cmd/mongo/stream_fetch.go b/cmd/mongo/stream_fetch.go new file mode 100644 index 000000000..355b67f47 --- /dev/null +++ b/cmd/mongo/stream_fetch.go @@ -0,0 +1,29 @@ +package mongo + +import ( + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/databases/mongo" + "github.com/wal-g/wal-g/internal/tracelog" + + "github.com/spf13/cobra" +) + +const StreamFetchShortDescription = "" + +// streamFetchCmd represents the streamFetch command +var streamFetchCmd = &cobra.Command{ + Use: "stream-fetch backup-name", + Short: StreamFetchShortDescription, + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + folder, err := internal.ConfigureFolder() + if err != nil { + tracelog.ErrorLogger.FatalError(err) + } + internal.HandleStreamFetch(args[0], folder, mongo.FetchBackupStreamAndOplog) + }, +} + +func init() { + MongoCmd.AddCommand(streamFetchCmd) +} diff --git a/cmd/mongo/stream_push.go b/cmd/mongo/stream_push.go new file mode 100644 index 000000000..02bc2b100 --- /dev/null +++ b/cmd/mongo/stream_push.go @@ -0,0 +1,28 @@ +package mongo + +import ( + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/databases/mongo" + "github.com/wal-g/wal-g/internal/tracelog" + + "github.com/spf13/cobra" +) + +const StreamPushShortDescription = "" + +// streamPushCmd represents the streamPush command +var streamPushCmd = &cobra.Command{ + Use: "stream-push", + Short: StreamPushShortDescription, + Run: func(cmd *cobra.Command, args []string) { + uploader, err := internal.ConfigureUploader() + if err != nil { + tracelog.ErrorLogger.FatalError(err) + } + mongo.HandleStreamPush(&mongo.Uploader{Uploader: uploader}) + }, +} + +func init() { + MongoCmd.AddCommand(streamPushCmd) +} diff --git a/cmd/mysql/delete.go b/cmd/mysql/delete.go index 491061328..f1bb6b76d 100644 --- a/cmd/mysql/delete.go +++ b/cmd/mysql/delete.go @@ -1,20 +1,17 @@ package mysql import ( + "path" + "strings" + "github.com/spf13/cobra" "github.com/wal-g/wal-g/internal" "github.com/wal-g/wal-g/internal/databases/mysql" "github.com/wal-g/wal-g/internal/storages/storage" "github.com/wal-g/wal-g/internal/tracelog" "github.com/wal-g/wal-g/utility" - "path" - "regexp" - "strings" ) -var patternTimeRFC3339 = "[0-9]{8}T[0-9]{6}Z" -var regexpTimeRFC3339 = regexp.MustCompile(patternTimeRFC3339) -var maxCountOfRFC3339 = 1 var confirmed = false // deleteCmd represents the delete command @@ -72,11 +69,11 @@ func IsFullBackup(folder storage.Folder, object storage.Object) bool { func GetLessFunc(folder storage.Folder) func(object1, object2 storage.Object) bool { return func(object1, object2 storage.Object) bool { - time1, ok := tryFetchTimeRFC3999(object1) + time1, ok := utility.TryFetchTimeRFC3999(object1.GetName()) if !ok { return binlogLess(folder, object1, object2) } - time2, ok := tryFetchTimeRFC3999(object2) + time2, ok := utility.TryFetchTimeRFC3999(object2.GetName()) if !ok { return binlogLess(folder, object1, object2) } @@ -84,14 +81,6 @@ func GetLessFunc(folder storage.Folder) func(object1, object2 storage.Object) bo } } -func tryFetchTimeRFC3999(object storage.Object) (string, bool) { - found_lsn := regexpTimeRFC3339.FindAllString(object.GetName(), maxCountOfRFC3339) - if len(found_lsn) > 0 { - return regexpTimeRFC3339.FindAllString(object.GetName(), maxCountOfRFC3339)[0], true - } - return "", false -} - func binlogLess(folder storage.Folder, object1, object2 storage.Object) bool { binlogName1, ok := tryFetchBinlogName(folder, object1) if !ok { diff --git a/docker-compose.yml b/docker-compose.yml index 87925177f..f27e34995 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,6 +38,10 @@ services: && mkdir -p /export/mysqlfullbucket && mkdir -p /export/mysqlbinlogpushbucket && mkdir -p /export/mysqldeleteendtoendbucket + && mkdir -p /export/mongostreampushbucket + && mkdir -p /export/mongooplogpushbucket + && mkdir -p /export/mongodeletebeforebucket + && mkdir -p /export/mongodeleteretainbucket && mkdir -p /export/redisbucket && /usr/bin/minio server /export' @@ -96,6 +100,26 @@ services: context: . image: wal-g/redis_tests container_name: wal-g_redis_tests + env_file: + - docker/common/common_walg.env + depends_on: + - s3 + links: + - s3 + + mongo: + build: + dockerfile: docker/mongo/Dockerfile + context: . + image: wal-g/mongo + container_name: wal-g_mongo + + mongo_tests: + build: + dockerfile: docker/mongo_tests/Dockerfile + context: . + image: wal-g/mongo_tests + container_name: wal-g_mongo_tests env_file: - docker/common/common_walg.env depends_on: diff --git a/docker/mongo/Dockerfile b/docker/mongo/Dockerfile new file mode 100644 index 000000000..b6b2a1231 --- /dev/null +++ b/docker/mongo/Dockerfile @@ -0,0 +1,6 @@ +FROM wal-g/ubuntu:latest + +RUN apt-get update -y +RUN apt-get install mongodb -y + +COPY docker/mongo/mongodb.conf /etc/mongodb.conf diff --git a/docker/mongo/mongodb.conf b/docker/mongo/mongodb.conf new file mode 100644 index 000000000..aa797c528 --- /dev/null +++ b/docker/mongo/mongodb.conf @@ -0,0 +1,17 @@ +# mongodb.conf + +# Where to store the data. +dbpath=/var/lib/mongodb + +#where to log +logpath=/var/log/mongodb/mongodb.log + +logappend=true + +bind_ip = 127.0.0.1 +#port = 27017 + +journal=true + +master = true + diff --git a/docker/mongo_tests/Dockerfile b/docker/mongo_tests/Dockerfile new file mode 100644 index 000000000..44b50e645 --- /dev/null +++ b/docker/mongo_tests/Dockerfile @@ -0,0 +1,23 @@ +FROM wal-g/golang:latest as build + +WORKDIR /go/src/github.com/wal-g/wal-g + +RUN apt-get update && \ + apt-get install --yes --no-install-recommends --no-install-suggests + +COPY vendor/ vendor/ +COPY internal/ internal/ +COPY cmd/ cmd/ +COPY main/ main/ +COPY utility/ utility/ + +RUN cd main/mongo && \ + go build -race -o wal-g -ldflags "-s -w -X main.BuildDate=`date -u +%Y.%m.%d_%H:%M:%S`" + +FROM wal-g/mongo:latest + +COPY --from=build /go/src/github.com/wal-g/wal-g/main/mongo/wal-g /usr/bin + +COPY docker/mongo_tests/scripts/ /tmp + +CMD /tmp/run_integration_tests.sh diff --git a/docker/mongo_tests/scripts/run_integration_tests.sh b/docker/mongo_tests/scripts/run_integration_tests.sh new file mode 100755 index 000000000..a30f4a68b --- /dev/null +++ b/docker/mongo_tests/scripts/run_integration_tests.sh @@ -0,0 +1,7 @@ +#!/bin/sh +set -e -x + +for i in /tmp/tests/*; do + .$i; + echo "${i} success" +done diff --git a/docker/mongo_tests/scripts/tests/delete_test.sh b/docker/mongo_tests/scripts/tests/delete_test.sh new file mode 100755 index 000000000..f89602820 --- /dev/null +++ b/docker/mongo_tests/scripts/tests/delete_test.sh @@ -0,0 +1,81 @@ +#!/bin/sh +set -e -x + +add_test_data() { + mongo --eval "for (var i = 0; i < 10; i++) { db.getSiblingDB('test').testData.save({x: i}) }" +} + +test_delete_command() { + DELETE_COMMAND=$1 + OPLOG_DUMP_DIR=/tmp/oplog_dump + + mkdir -p $WALG_MONGO_OPLOG_DST + mkdir -p $OPLOG_DUMP_DIR + + service mongodb start + + for i in $(seq 1 5); + do + sleep 1 + add_test_data + mongodump --archive --oplog | wal-g stream-push + + if [ $i -eq 3 ]; + then + mongoexport -d test -c testData | sort > /tmp/export1.json + fi + sleep 1 + mongodump -d local -c oplog.\$main --out $OPLOG_DUMP_DIR + cat $OPLOG_DUMP_DIR/local/oplog.\$main.bson | wal-g oplog-push + done + + wal-g backup-list + + $DELETE_COMMAND + + wal-g backup-list + + pkill -9 mongod + + rm -rf /var/lib/mongodb/* + service mongodb start + + first_backup_name=`wal-g backup-list | head -n 2 | tail -n 1 | cut -f 1 -d " "` + + wal-g stream-fetch $first_backup_name | mongorestore --archive --oplogReplay + + mongoexport -d test -c testData | sort > /tmp/export2.json + + pkill -9 mongod + + diff /tmp/export1.json /tmp/export2.json + + oplogCount=`ls $WALG_MONGO_OPLOG_DST | wc -l` + if [ $oplogCount -ne 3 ] + then + echo "Expected oplog count is 3. Actual: $oplogCount" + exit 1 + fi + + rm -rf $OPLOG_DUMP_DIR + rm -rf $WALG_MONGO_OPLOG_DST + rm /tmp/export?.json +} + +export WALG_MONGO_OPLOG_DST=/tmp/fetched_oplogs + +delete_before_name() { + backup_name=`wal-g backup-list | tail -n 3 | head -n 1 | cut -f 1 -d " "` + + wal-g delete before $backup_name --confirm +} + +export WALE_S3_PREFIX=s3://mongodeletebeforebucket +test_delete_command delete_before_name + +delete_retain() { + wal-g delete retain 3 --confirm +} + +export WALE_S3_PREFIX=s3://mongodeleteretainbucket +test_delete_command delete_retain diff --git a/docker/mongo_tests/scripts/tests/oplog_push_test.sh b/docker/mongo_tests/scripts/tests/oplog_push_test.sh new file mode 100755 index 000000000..2e6d3213b --- /dev/null +++ b/docker/mongo_tests/scripts/tests/oplog_push_test.sh @@ -0,0 +1,49 @@ +#!/bin/sh +set -e -x + +export WALE_S3_PREFIX=s3://mongooplogpushbucket +export WALG_MONGO_OPLOG_DST=/tmp/fetched_oplogs +OPLOG_DUMP_DIR=/tmp/oplog_dump + +mkdir -p $WALG_MONGO_OPLOG_DST +mkdir -p $OPLOG_DUMP_DIR + +add_test_data() { + mongo --eval "for (var i = 0; i < 10; i++) { db.getSiblingDB('test').testData.save({x: i}) }" +} + +service mongodb start + +sleep 1 + +add_test_data + +mongodump --archive --oplog | wal-g stream-push + +add_test_data +mongoexport -d test -c testData | sort > /tmp/export1.json + +mongodump -d local -c oplog.\$main --out $OPLOG_DUMP_DIR +cat $OPLOG_DUMP_DIR/local/oplog.\$main.bson | wal-g oplog-push + +sleep 1 +export WALG_MONGO_OPLOG_END_TS=`date --rfc-3339=ns | sed 's/ /T/'` + +pkill -9 mongod +rm -rf /var/lib/mongodb/* +service mongodb start + +wal-g stream-fetch LATEST | mongorestore --archive --oplogReplay + +ls $WALG_MONGO_OPLOG_DST +mongorestore --oplogReplay $WALG_MONGO_OPLOG_DST/`ls $WALG_MONGO_OPLOG_DST | head -n 1` + +mongoexport -d test -c testData | sort > /tmp/export2.json + +pkill -9 mongod + +diff /tmp/export1.json /tmp/export2.json + +rm -rf $WALG_MONGO_OPLOG_DST +rm -rf $OPLOG_DUMP_DIR +rm /tmp/export?.json diff --git a/docker/mongo_tests/scripts/tests/pushing_and_fetching_stream_test.sh b/docker/mongo_tests/scripts/tests/pushing_and_fetching_stream_test.sh new file mode 100755 index 000000000..2342750ba --- /dev/null +++ b/docker/mongo_tests/scripts/tests/pushing_and_fetching_stream_test.sh @@ -0,0 +1,44 @@ +#!/bin/sh +set -e -x + +export WALE_S3_PREFIX=s3://mongostreampushbucket +export WALG_MONGO_OPLOG_DST=/tmp/fetched_oplogs + +mkdir -p $WALG_MONGO_OPLOG_DST + +add_test_data() { + mongo --eval "for (var i = 0; i < 10; i++) { db.getSiblingDB('test').testData.save({x: i}) }" +} + +service mongodb start + +for i in $(seq 1 5); +do + sleep 1 + add_test_data + mongodump --archive --oplog | wal-g stream-push + + if [ $i -eq 3 ]; + then + mongoexport -d test -c testData | sort > /tmp/export1.json + fi +done + +pkill -9 mongod +rm -rf /var/lib/mongodb/* +service mongodb start + +wal-g backup-list + +backup_name=`wal-g backup-list | tail -n 3 | head -n 1 | cut -f 1 -d " "` + +wal-g stream-fetch $backup_name | mongorestore --archive --oplogReplay + +mongoexport -d test -c testData | sort > /tmp/export2.json + +pkill -9 mongod + +diff /tmp/export1.json /tmp/export2.json + +rm -rf $WALG_MONGO_OPLOG_DST +rm /tmp/export?.json diff --git a/internal/backup.go b/internal/backup.go index dbc0bdeb4..31dd7d9f7 100644 --- a/internal/backup.go +++ b/internal/backup.go @@ -71,18 +71,27 @@ func (backup *Backup) GetTarNames() ([]string, error) { func (backup *Backup) FetchSentinel() (BackupSentinelDto, error) { sentinelDto := BackupSentinelDto{} + sentinelDtoData, err := backup.FetchSentinelData() + if err != nil { + return sentinelDto, err + } + + err = json.Unmarshal(sentinelDtoData, &sentinelDto) + return sentinelDto, errors.Wrap(err, "failed to unmarshal sentinel") +} + +// TODO : unit tests +func (backup *Backup) FetchSentinelData() ([]byte, error) { backupReaderMaker := NewStorageReaderMaker(backup.BaseBackupFolder, backup.GetStopSentinelPath()) backupReader, err := backupReaderMaker.Reader() if err != nil { - return sentinelDto, err + return make([]byte, 0), err } sentinelDtoData, err := ioutil.ReadAll(backupReader) if err != nil { - return sentinelDto, errors.Wrap(err, "failed to fetch sentinel") + return sentinelDtoData, errors.Wrap(err, "failed to fetch sentinel") } - - err = json.Unmarshal(sentinelDtoData, &sentinelDto) - return sentinelDto, errors.Wrap(err, "failed to unmarshal sentinel") + return sentinelDtoData, nil } func (backup *Backup) FetchMeta() (ExtendedMetadataDto, error) { diff --git a/internal/backup_push_handler.go b/internal/backup_push_handler.go index 09ed028dc..683673714 100644 --- a/internal/backup_push_handler.go +++ b/internal/backup_push_handler.go @@ -4,11 +4,12 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/spf13/viper" "os" "path/filepath" "time" + "github.com/spf13/viper" + "github.com/pkg/errors" "github.com/wal-g/wal-g/internal/storages/storage" "github.com/wal-g/wal-g/internal/tracelog" @@ -216,10 +217,10 @@ func UploadMetadata(uploader *Uploader, sentinelDto *BackupSentinelDto, backupNa } // TODO : unit tests -func UploadSentinel(uploader *Uploader, sentinelDto *BackupSentinelDto, backupName string) error { +func UploadSentinel(uploader *Uploader, sentinelDto interface{}, backupName string) error { sentinelName := backupName + utility.SentinelSuffix - dtoBody, err := json.Marshal(*sentinelDto) + dtoBody, err := json.Marshal(sentinelDto) if err != nil { return NewSentinelMarshallingError(sentinelName, err) } diff --git a/internal/databases/mongo/mongo.go b/internal/databases/mongo/mongo.go new file mode 100644 index 000000000..3707cb1ca --- /dev/null +++ b/internal/databases/mongo/mongo.go @@ -0,0 +1,28 @@ +package mongo + +import ( + "path" + "time" + + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/utility" +) + +const ( + OplogPrefix = "oplog_" + OplogPath = "oplog_" + utility.VersionStr + "/" + OplogEndTs = "WALG_MONGO_OPLOG_END_TS" + OplogDst = "WALG_MONGO_OPLOG_DST" +) + +func getStreamName(backupName string, extension string) string { + return utility.SanitizePath(path.Join(backupName, "stream.")) + extension +} + +type Uploader struct { + *internal.Uploader +} + +type StreamSentinelDto struct { + StartLocalTime time.Time +} diff --git a/internal/databases/mongo/oplog_push_handler.go b/internal/databases/mongo/oplog_push_handler.go new file mode 100644 index 000000000..0e2e86f79 --- /dev/null +++ b/internal/databases/mongo/oplog_push_handler.go @@ -0,0 +1,24 @@ +package mongo + +import ( + "os" + + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/tracelog" + "github.com/wal-g/wal-g/utility" +) + +func HandleOplogPush(uploader *Uploader) { + uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(OplogPath) + if !internal.FileIsPiped(os.Stdin) { + tracelog.ErrorLogger.Fatal("Use stdin\n") + } + oplogName := OplogPrefix + utility.TimeNowCrossPlatformUTC().Format("20060102T150405Z") + dstPath := oplogName + "." + uploader.Compressor.FileExtension() + err := uploader.PushStreamToDestination(os.Stdin, dstPath) + if err != nil { + tracelog.ErrorLogger.Fatalf("%+v\n", err) + } + + tracelog.InfoLogger.Println("Oplog file " + dstPath + " was uploaded") +} diff --git a/internal/databases/mongo/stream_fetch_handler.go b/internal/databases/mongo/stream_fetch_handler.go new file mode 100644 index 000000000..2082c2e2b --- /dev/null +++ b/internal/databases/mongo/stream_fetch_handler.go @@ -0,0 +1,58 @@ +package mongo + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/storages/storage" + "github.com/wal-g/wal-g/internal/tracelog" +) + +func FetchBackupStreamAndOplog(folder storage.Folder, backup *internal.Backup) error { + streamSentinel, err := fetchStreamSentinel(backup) + if err != nil { + return err + } + oplogsAreDone := make(chan error) + go fetchOplogs(folder, streamSentinel.StartLocalTime, oplogsAreDone) + err = internal.DownloadAndDecompressStream(backup) + if err != nil { + return err + } + tracelog.DebugLogger.Println("Waiting for oplogs") + err = <-oplogsAreDone + return err +} + +func fetchStreamSentinel(backup *internal.Backup) (StreamSentinelDto, error) { + sentinelDto := StreamSentinelDto{} + sentinelDtoData, err := backup.FetchSentinelData() + if err != nil { + return sentinelDto, errors.Wrap(err, "failed to fetch sentinel") + } + err = json.Unmarshal(sentinelDtoData, &sentinelDto) + return sentinelDto, errors.Wrap(err, "failed to unmarshal sentinel") +} + +func fetchOplogs(folder storage.Folder, startTime time.Time, oplogAreDone chan error) { + endTS, oplogDstFolder, err := getOplogConfigs() + if err != nil { + oplogAreDone <- err + return + } + oplogFolder := folder.GetSubFolder(OplogPath) + logsToFetch, err := internal.GetOperationLogsCoveringInterval(oplogFolder, startTime, endTS) + if err != nil { + oplogAreDone <- err + return + } + + oplogAreDone <- internal.DownloadOplogFiles(logsToFetch, oplogFolder, oplogDstFolder, "oplog.bson") +} + +func getOplogConfigs() (*time.Time, string, error) { + return internal.GetOperationLogsSettings(OplogEndTs, OplogDst) +} diff --git a/internal/databases/mongo/stream_push_handler.go b/internal/databases/mongo/stream_push_handler.go new file mode 100644 index 000000000..da667794a --- /dev/null +++ b/internal/databases/mongo/stream_push_handler.go @@ -0,0 +1,32 @@ +package mongo + +import ( + "io" + "os" + + "github.com/wal-g/wal-g/internal" + + "github.com/wal-g/wal-g/internal/tracelog" + "github.com/wal-g/wal-g/utility" +) + +func HandleStreamPush(uploader *Uploader) { + if !internal.FileIsPiped(os.Stdin) { + tracelog.ErrorLogger.Fatal("Use stdin\n") + } + uploader.UploadingFolder = uploader.UploadingFolder.GetSubFolder(utility.BaseBackupPath) + err := uploader.UploadStream(os.Stdin) + if err != nil { + tracelog.ErrorLogger.Fatalf("%+v\n", err) + } +} + +// TODO : unit tests +// UploadStream compresses a stream and uploads it. +func (uploader *Uploader) UploadStream(stream io.Reader) error { + timeStart := utility.TimeNowCrossPlatformLocal() + backupName, err := uploader.PushStream(stream) + internal.UploadSentinel(uploader.Uploader, &StreamSentinelDto{StartLocalTime: timeStart}, backupName) + + return err +} diff --git a/internal/databases/mysql/mysql.go b/internal/databases/mysql/mysql.go index 4c6948e99..670bf8587 100644 --- a/internal/databases/mysql/mysql.go +++ b/internal/databases/mysql/mysql.go @@ -6,12 +6,13 @@ import ( "database/sql" "encoding/json" "fmt" - "github.com/spf13/viper" "io/ioutil" "path" "strings" "time" + "github.com/spf13/viper" + "github.com/go-sql-driver/mysql" "github.com/pkg/errors" "github.com/wal-g/wal-g/internal" @@ -39,13 +40,7 @@ type Backup struct { // TODO : unit tests func (backup *Backup) FetchStreamSentinel() (StreamSentinelDto, error) { sentinelDto := StreamSentinelDto{} - backupReaderMaker := internal.NewStorageReaderMaker(backup.BaseBackupFolder, - backup.GetStopSentinelPath()) - backupReader, err := backupReaderMaker.Reader() - if err != nil { - return sentinelDto, err - } - sentinelDtoData, err := ioutil.ReadAll(backupReader) + sentinelDtoData, err := backup.Backup.FetchSentinelData() if err != nil { return sentinelDto, errors.Wrap(err, "failed to fetch sentinel") } diff --git a/internal/databases/mysql/stream_fetch_handler.go b/internal/databases/mysql/stream_fetch_handler.go index 346672697..33e8eb082 100644 --- a/internal/databases/mysql/stream_fetch_handler.go +++ b/internal/databases/mysql/stream_fetch_handler.go @@ -2,7 +2,6 @@ package mysql import ( "fmt" - "github.com/spf13/viper" "os" "path" "path/filepath" @@ -136,18 +135,7 @@ func BinlogShouldBeFetched(sentinel StreamSentinelDto, binlogName string, endTS } func GetBinlogConfigs() (endTS *time.Time, dstFolder string, err error) { - if viper.IsSet(BinlogEndTsSetting) { - endTSStr := viper.GetString(BinlogEndTsSetting) - t, err := time.Parse(time.RFC3339, endTSStr) - if err != nil { - return nil, "", err - } - endTS = &t - } - if !viper.IsSet(BinlogDstSetting) { - return endTS, dstFolder, internal.NewUnsetRequiredSettingError(BinlogDstSetting) - } - return endTS, viper.GetString(BinlogDstSetting), nil + return internal.GetOperationLogsSettings(BinlogEndTsSetting, BinlogDstSetting) } func ExtractBinlogName(object storage.Object, folder storage.Folder) string { diff --git a/internal/databases/mysql/stream_push_handler.go b/internal/databases/mysql/stream_push_handler.go index e7016576c..1deef0fb0 100644 --- a/internal/databases/mysql/stream_push_handler.go +++ b/internal/databases/mysql/stream_push_handler.go @@ -1,15 +1,14 @@ package mysql import ( - "bytes" "database/sql" - "encoding/json" - "github.com/wal-g/wal-g/internal" - "github.com/wal-g/wal-g/internal/tracelog" - "github.com/wal-g/wal-g/utility" "io" "os" "strings" + + "github.com/wal-g/wal-g/internal" + "github.com/wal-g/wal-g/internal/tracelog" + "github.com/wal-g/wal-g/utility" ) func HandleStreamPush(uploader *Uploader) { @@ -19,16 +18,14 @@ func HandleStreamPush(uploader *Uploader) { tracelog.ErrorLogger.Fatalf("%+v\n", err) } defer utility.LoggedClose(db, "") - backupName := StreamPrefix + utility.TimeNowCrossPlatformUTC().Format("20060102T150405Z") - stat, _ := os.Stdin.Stat() var stream io.Reader = os.Stdin - if (stat.Mode() & os.ModeCharDevice) == 0 { + if internal.FileIsPiped(os.Stdin) { tracelog.InfoLogger.Println("Data is piped from stdin") } else { tracelog.ErrorLogger.Println("WARNING: stdin is terminal: operating in test mode!") stream = strings.NewReader("testtesttest") } - err = uploader.UploadStream(backupName, db, stream) + err = uploader.UploadStream(db, stream) if err != nil { tracelog.ErrorLogger.Fatalf("%+v\n", err) } @@ -36,39 +33,17 @@ func HandleStreamPush(uploader *Uploader) { // TODO : unit tests // UploadFile compresses a file and uploads it. -func (uploader *Uploader) UploadStream(fileName string, db *sql.DB, stream io.Reader) error { +func (uploader *Uploader) UploadStream(db *sql.DB, stream io.Reader) error { binlogStart := getMySQLCurrentBinlogFile(db) tracelog.DebugLogger.Println("Binlog start file", binlogStart) timeStart := utility.TimeNowCrossPlatformLocal() - compressor := uploader.Compressor - compressed := internal.CompressAndEncrypt(stream, compressor, internal.ConfigureCrypter()) - backup := Backup{internal.NewBackup(uploader.UploadingFolder, fileName)} + fileName, err := uploader.PushStream(stream) - dstPath := getStreamName(&backup, compressor.FileExtension()) - tracelog.DebugLogger.Println("Upload path", dstPath) - - err := uploader.Upload(dstPath, compressed) - tracelog.InfoLogger.Println("FILE PATH:", dstPath) binlogEnd := getMySQLCurrentBinlogFile(db) tracelog.DebugLogger.Println("Binlog end file", binlogEnd) - uploadStreamSentinel(&StreamSentinelDto{BinLogStart: binlogStart, BinLogEnd: binlogEnd, StartLocalTime: timeStart}, uploader, fileName+utility.SentinelSuffix) + internal.UploadSentinel(uploader.Uploader, &StreamSentinelDto{BinLogStart: binlogStart, BinLogEnd: binlogEnd, StartLocalTime: timeStart}, fileName) return err } - -func uploadStreamSentinel(sentinelDto *StreamSentinelDto, uploader *Uploader, name string) error { - dtoBody, err := json.Marshal(*sentinelDto) - if err != nil { - return err - } - - uploadingErr := uploader.Upload(name, bytes.NewReader(dtoBody)) - if uploadingErr != nil { - tracelog.ErrorLogger.Printf("upload: could not upload '%s'\n", name) - tracelog.ErrorLogger.Fatalf("StorageTarBall finish: json failed to upload") - return uploadingErr - } - return nil -} diff --git a/internal/stream_fetch_helper.go b/internal/stream_fetch_helper.go new file mode 100644 index 000000000..36a4ecf1d --- /dev/null +++ b/internal/stream_fetch_helper.go @@ -0,0 +1,112 @@ +package internal + +import ( + "fmt" + "os" + "path" + "sort" + "time" + + "github.com/wal-g/wal-g/internal/compression" + "github.com/wal-g/wal-g/internal/storages/storage" + "github.com/wal-g/wal-g/internal/tracelog" + "github.com/wal-g/wal-g/utility" +) + +// GetOperationLogsSettings reads from the environment variables fetch settings +func GetOperationLogsSettings(OperationLogEndTsSetting string, operationLogsDstSetting string) (endTS *time.Time, dstFolder string, err error) { + endTSStr, ok := GetSetting(OperationLogEndTsSetting) + if ok { + t, err := time.Parse(time.RFC3339, endTSStr) + if err != nil { + return nil, "", err + } + endTS = &t + } + dstFolder, ok = GetSetting(operationLogsDstSetting) + if !ok { + return endTS, dstFolder, NewUnsetRequiredSettingError(operationLogsDstSetting) + } + return endTS, dstFolder, nil +} + +// HandleStreamFetch is invoked to perform wal-g stream-fetch +func HandleStreamFetch(backupName string, folder storage.Folder, + fetchBackup func(storage.Folder, *Backup) error) { + backup, err := GetBackupByName(backupName, folder) + if err != nil { + tracelog.ErrorLogger.Fatalf("Unable to get backup %+v\n", err) + } + if !FileIsPiped(os.Stdout) { + tracelog.ErrorLogger.Fatalf("stdout is a terminal") + } + err = fetchBackup(folder, backup) + if err != nil { + tracelog.ErrorLogger.Fatalf("%+v\n", err) + } +} + +// DownloadAndDecompressStream downloads, decompresses and writes stream to stdout +func DownloadAndDecompressStream(backup *Backup) error { + for _, decompressor := range compression.Decompressors { + archiveReader, exists, err := TryDownloadWALFile(backup.BaseBackupFolder, getStreamName(backup.Name, decompressor.FileExtension())) + if err != nil { + return err + } + if !exists { + continue + } + + err = DecompressWALFile(&EmptyWriteIgnorer{WriteCloser: os.Stdout}, archiveReader, decompressor) + if err != nil { + return err + } + utility.LoggedClose(os.Stdout, "") + return nil + } + return NewArchiveNonExistenceError(fmt.Sprintf("Archive '%s' does not exist.\n", backup.Name)) +} + +// GetOperationLogsCoveringInterval lists the operation logs that cover the interval +func GetOperationLogsCoveringInterval(folder storage.Folder, start time.Time, end *time.Time) ([]storage.Object, error) { + oplogFiles, _, err := folder.ListFolder() + if err != nil { + return nil, err + } + + sort.Slice(oplogFiles, func(i, j int) bool { + return oplogFiles[i].GetLastModified().After(oplogFiles[j].GetLastModified()) + }) + + var logsToFetch []storage.Object + + for _, oplogFile := range oplogFiles { + if oplogFile.GetLastModified().After(start) { + logsToFetch = append(logsToFetch, oplogFile) + if end != nil && oplogFile.GetLastModified().After(*end) { + break + } + } + } + return logsToFetch, err +} + +// DownloadOplogFiles downloads files to specified folder +func DownloadOplogFiles(oplogFiles []storage.Object, oplogFolder storage.Folder, oplogDstFolder string, logFileName string) error { + for _, oplogFile := range oplogFiles { + oplogName := utility.TrimFileExtension(oplogFile.GetName()) + oplogFileSubFolder := path.Join(oplogDstFolder, oplogName) + _, err := NewDiskDataFolder(oplogFileSubFolder) + if err != nil { + return err + } + oplogFilePath := path.Join(oplogFileSubFolder, logFileName) + err = DownloadWALFileTo(oplogFolder, oplogName, oplogFilePath) + if err != nil { + return err + } + tracelog.InfoLogger.Println("Operation log file " + oplogFile.GetName() + " fetched to " + oplogFilePath) + } + + return nil +} diff --git a/internal/stream_push_helper.go b/internal/stream_push_helper.go new file mode 100644 index 000000000..62b0f2450 --- /dev/null +++ b/internal/stream_push_helper.go @@ -0,0 +1,44 @@ +package internal + +import ( + "io" + "os" + "path" + + "github.com/wal-g/wal-g/internal/tracelog" + "github.com/wal-g/wal-g/utility" +) + +const ( + StreamPrefix = "stream_" +) + +// TODO : unit tests +// PushStream compresses a stream and push it +func (uploader *Uploader) PushStream(stream io.Reader) (string, error) { + backupName := StreamPrefix + utility.TimeNowCrossPlatformUTC().Format("20060102T150405Z") + dstPath := getStreamName(backupName, uploader.Compressor.FileExtension()) + err := uploader.PushStreamToDestination(stream, dstPath) + + return backupName, err +} + +// TODO : unit tests +// PushStreamToDestination compresses a stream and push it to specifyed destination +func (uploader *Uploader) PushStreamToDestination(stream io.Reader, dstPath string) error { + compressed := CompressAndEncrypt(stream, uploader.Compressor, ConfigureCrypter()) + err := uploader.Upload(dstPath, compressed) + tracelog.InfoLogger.Println("FILE PATH:", dstPath) + + return err +} + +// FileIsPiped Check if file is piped +func FileIsPiped(stream *os.File) bool { + stat, _ := stream.Stat() + return (stat.Mode() & os.ModeCharDevice) == 0 +} + +func getStreamName(backupName string, extension string) string { + return utility.SanitizePath(path.Join(backupName, "stream.")) + extension +} diff --git a/main/mongo/main.go b/main/mongo/main.go new file mode 100644 index 000000000..2c3a900e0 --- /dev/null +++ b/main/mongo/main.go @@ -0,0 +1,9 @@ +package main + +import ( + "github.com/wal-g/wal-g/cmd/mongo" +) + +func main() { + mongo.Execute() +} diff --git a/utility/utility.go b/utility/utility.go index e6b48588f..83963405d 100644 --- a/utility/utility.go +++ b/utility/utility.go @@ -95,6 +95,11 @@ func GetFileExtension(filePath string) string { return ext } +// TODO : unit tests +func TrimFileExtension(filePath string) string { + return strings.TrimSuffix(filePath, "."+GetFileExtension(filePath)) +} + func GetFileRelativePath(fileAbsPath string, directoryPath string) string { return strings.TrimPrefix(fileAbsPath, directoryPath) } @@ -171,9 +176,21 @@ func TimeNowCrossPlatformLocal() time.Time { return CeilTimeUpToMicroseconds(time.Now()) } +var patternTimeRFC3339 = "[0-9]{8}T[0-9]{6}Z" +var regexpTimeRFC3339 = regexp.MustCompile(patternTimeRFC3339) + +// TODO : unit tests +func TryFetchTimeRFC3999(name string) (string, bool) { + times := regexpTimeRFC3339.FindAllString(name, 1) + if len(times) > 0 { + return regexpTimeRFC3339.FindAllString(name, 1)[0], true + } + return "", false +} + func ConcatByteSlices(a []byte, b []byte) []byte { result := make([]byte, len(a)+len(b)) copy(result, a) copy(result[len(a):], b) return result -} \ No newline at end of file +}