From 0b7dda3e786fe8a76a09ac744893240288ce854a Mon Sep 17 00:00:00 2001 From: GreatRiver <2552853833@qq.com> Date: Mon, 5 Feb 2024 00:26:03 +0800 Subject: [PATCH] Add backup parallel (#14543) Add backup parallel Approved by: @reusee, @XuPeng-SH, @daviszhen, @sukki37 --- pkg/backup/backup.go | 2 +- pkg/backup/backup_test.go | 2 +- pkg/backup/tae.go | 240 +++++++++++++++++++++------- pkg/fileservice/local_etl_fs.go | 4 + pkg/fileservice/local_fs.go | 4 + pkg/vm/engine/tae/logtail/backup.go | 14 +- 6 files changed, 199 insertions(+), 67 deletions(-) diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 4826353beb4b..2f96e05b9bca 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -125,7 +125,7 @@ func backupConfigs(ctx context.Context, cfg *Config) error { var backupTae = func(ctx context.Context, config *Config) error { fs := fileservice.SubPath(config.TaeDir, taeDir) - return BackupData(ctx, config.SharedFs, fs, "") + return BackupData(ctx, config.SharedFs, fs, "", int(config.Parallelism)) } func backupHakeeper(ctx context.Context, config *Config) error { diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 9acf77a6a07a..9e008b0c365b 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -121,7 +121,7 @@ func TestBackupData(t *testing.T) { for _, location := range files { locations = append(locations, location) } - err = execBackup(ctx, db.Opts.Fs, service, locations) + err = execBackup(ctx, db.Opts.Fs, service, locations, 1) assert.Nil(t, err) db.Opts.Fs = service db.Restart(ctx) diff --git a/pkg/backup/tae.go b/pkg/backup/tae.go index fbb18396baae..f4133461a900 100644 --- a/pkg/backup/tae.go +++ b/pkg/backup/tae.go @@ -33,10 +33,14 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" + "io" "os" "path" + runtime2 "runtime" "strconv" "strings" + "sync" "time" ) @@ -68,7 +72,7 @@ func getFileNames(ctx context.Context, retBytes [][][]byte) ([]string, error) { return fileName, err } -func BackupData(ctx context.Context, srcFs, dstFs fileservice.FileService, dir string) error { +func BackupData(ctx context.Context, srcFs, dstFs fileservice.FileService, dir string, count int) error { v, ok := runtime.ProcessLevelRuntime().GetGlobalVariables(runtime.InternalSQLExecutor) if !ok { return moerr.NewNotSupported(ctx, "no implement sqlExecutor") @@ -92,21 +96,156 @@ func BackupData(ctx context.Context, srcFs, dstFs fileservice.FileService, dir s if err != nil { return err } - return execBackup(ctx, srcFs, dstFs, fileName) + return execBackup(ctx, srcFs, dstFs, fileName, count) } -func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names []string) error { +func getParallelCount(count int) int { + if count > 0 && count < 512 { + return count + } + cupNum := runtime2.NumCPU() + if cupNum < 8 { + return 50 + } else if cupNum < 16 { + return 80 + } else if cupNum < 32 { + return 128 + } else if cupNum < 64 { + return 256 + } + return 512 +} + +// parallelCopyData copy data from srcFs to dstFs in parallel +func parallelCopyData(srcFs, dstFs fileservice.FileService, + files map[string]objectio.Location, + parallelCount int, + gcFileMap map[string]string, +) ([]*taeFile, error) { + var copyCount, skipCount, copySize int64 + var printMutex, fileMutex sync.Mutex + stopPrint := false + defer func() { + printMutex.Lock() + if !stopPrint { + stopPrint = true + } + printMutex.Unlock() + }() + // record files + taeFileList := make([]*taeFile, 0, len(files)) + + jobScheduler := tasks.NewParallelJobScheduler(parallelCount) + defer jobScheduler.Stop() + go func() { + for { + printMutex.Lock() + if stopPrint { + printMutex.Unlock() + break + } + printMutex.Unlock() + fileMutex.Lock() + logutil.Info("backup", common.OperationField("copy file"), + common.AnyField("copy file size", copySize), + common.AnyField("copy file num", copyCount), + common.AnyField("skip file num", skipCount), + common.AnyField("total file num", len(files))) + fileMutex.Unlock() + time.Sleep(time.Second * 5) + } + }() + + backupJobs := make([]*tasks.Job, len(files)) + getJob := func(srcFs, dstFs fileservice.FileService, location objectio.Location) *tasks.Job { + job := new(tasks.Job) + job.Init(context.Background(), location.Name().String(), tasks.JTAny, + func(_ context.Context) *tasks.JobResult { + + name := location.Name().String() + size := location.Extent().End() + objectio.FooterSize + checksum, err := CopyFile(context.Background(), srcFs, dstFs, location.Name().String(), "") + if err != nil { + if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) && + isGC(gcFileMap, name) { + fileMutex.Lock() + skipCount++ + fileMutex.Unlock() + return &tasks.JobResult{ + Res: nil, + } + } else { + return &tasks.JobResult{ + Err: err, + Res: nil, + } + } + } + fileMutex.Lock() + copyCount++ + copySize += int64(size) + taeFileList = append(taeFileList, &taeFile{ + path: name, + size: int64(size), + checksum: checksum, + }) + fileMutex.Unlock() + return &tasks.JobResult{ + Res: nil, + } + }) + return job + } + + idx := 0 + for n := range files { + backupJobs[idx] = getJob(srcFs, dstFs, files[n]) + idx++ + } + + for n := range backupJobs { + err := jobScheduler.Schedule(backupJobs[n]) + if err != nil { + logutil.Infof("schedule job failed %v", err.Error()) + return nil, err + } + + } + + for n := range backupJobs { + + ret := backupJobs[n].WaitDone() + if ret.Err != nil { + logutil.Infof("wait job done failed %v", ret.Err.Error()) + return nil, ret.Err + } + } + + logutil.Info("backup", common.OperationField("copy file"), + common.AnyField("copy file size", copySize), + common.AnyField("copy file num", copyCount), + common.AnyField("skip file num", skipCount), + common.AnyField("total file num", len(files))) + return taeFileList, nil +} + +func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names []string, count int) error { backupTime := names[0] trimInfo := names[1] names = names[1:] - files := make(map[string]*fileservice.DirEntry, 0) + files := make(map[string]objectio.Location, 0) table := gc.NewGCTable() gcFileMap := make(map[string]string) softDeletes := make(map[string]bool) var loadDuration, copyDuration, reWriteDuration time.Duration - var oNames []objectio.ObjectName + var oNames []objectio.Location + parallelNum := getParallelCount(count) + logutil.Info("backup", common.OperationField("start backup"), + common.AnyField("backup time", backupTime), + common.AnyField("checkpoint num", len(names)), + common.AnyField("parallel num", parallelNum)) defer func() { - logutil.Info("backup", common.OperationField("exec backup"), + logutil.Info("backup", common.OperationField("end backup"), common.AnyField("load checkpoint cost", loadDuration), common.AnyField("copy file cost", copyDuration), common.AnyField("rewrite checkpoint cost", reWriteDuration)) @@ -129,7 +268,7 @@ func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names if err != nil { return err } - var oneNames []objectio.ObjectName + var oneNames []objectio.Location var data *logtail.CheckpointData if i == 0 { oneNames, data, err = logtail.LoadCheckpointEntriesFromKey(ctx, srcFs, key, uint32(version), nil) @@ -148,42 +287,12 @@ func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names loadDuration += time.Since(now) now = time.Now() for _, oName := range oNames { - if files[oName.String()] == nil { - dentry, err := srcFs.StatFile(ctx, oName.String()) - if err != nil { - if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) && - isGC(gcFileMap, oName.String()) { - continue - } else { - return err - } - } - files[oName.String()] = dentry - } - } - // record files - taeFileList := make([]*taeFile, 0, len(files)) - for _, dentry := range files { - if dentry.IsDir { - panic("not support dir") - } - checksum, err := CopyFile(ctx, srcFs, dstFs, dentry, "") - if err != nil { - if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) && - isGC(gcFileMap, dentry.Name) { - continue - } else { - return err - } - + if files[oName.Name().String()] == nil { + files[oName.Name().String()] = oName } - taeFileList = append(taeFileList, &taeFile{ - path: dentry.Name, - size: dentry.Size, - checksum: checksum, - }) } + // trim checkpoint and block var cnLoc, tnLoc, mergeStart, mergeEnd string var end, start types.TS var version uint64 @@ -205,6 +314,13 @@ func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names } } + // copy data + taeFileList, err := parallelCopyData(srcFs, dstFs, files, parallelNum, gcFileMap) + if err != nil { + return err + } + + // copy checkpoint and gc meta sizeList, err := CopyDir(ctx, srcFs, dstFs, "ckp", start) if err != nil { return err @@ -281,7 +397,7 @@ func CopyDir(ctx context.Context, srcFs, dstFs fileservice.FileService, dir stri logutil.Infof("[Backup] skip file %v", file.Name) continue } - checksum, err = CopyFile(ctx, srcFs, dstFs, &file, dir) + checksum, err = CopyFile(ctx, srcFs, dstFs, file.Name, dir) if err != nil { return nil, err } @@ -295,40 +411,48 @@ func CopyDir(ctx context.Context, srcFs, dstFs fileservice.FileService, dir stri } // CopyFile copy file from srcFs to dstFs and return checksum of the written file. -func CopyFile(ctx context.Context, srcFs, dstFs fileservice.FileService, dentry *fileservice.DirEntry, dstDir string) ([]byte, error) { - name := dentry.Name +func CopyFile(ctx context.Context, srcFs, dstFs fileservice.FileService, name, dstDir string) ([]byte, error) { if dstDir != "" { name = path.Join(dstDir, name) } + var reader io.ReadCloser ioVec := &fileservice.IOVector{ FilePath: name, - Entries: make([]fileservice.IOEntry, 1), - Policy: fileservice.SkipAllCache, - } - ioVec.Entries[0] = fileservice.IOEntry{ - Offset: 0, - Size: dentry.Size, + Entries: []fileservice.IOEntry{ + { + ReadCloserForRead: &reader, + Offset: 0, + Size: -1, + }, + }, + Policy: fileservice.SkipAllCache, } + err := srcFs.Read(ctx, ioVec) if err != nil { return nil, err } + defer reader.Close() + // hash + hasher := sha256.New() + hashingReader := io.TeeReader(reader, hasher) dstIoVec := fileservice.IOVector{ FilePath: name, - Entries: make([]fileservice.IOEntry, 1), - Policy: fileservice.SkipAllCache, - } - dstIoVec.Entries[0] = fileservice.IOEntry{ - Offset: 0, - Data: ioVec.Entries[0].Data, - Size: dentry.Size, + Entries: []fileservice.IOEntry{ + { + ReaderForWrite: hashingReader, + Offset: 0, + Size: -1, + }, + }, + Policy: fileservice.SkipAllCache, } + err = dstFs.Write(ctx, dstIoVec) if err != nil { return nil, err } - checksum := sha256.Sum256(ioVec.Entries[0].Data) - return checksum[:], err + return hasher.Sum(nil), nil } func mergeGCFile(gcFiles []string, gcFileMap map[string]string) { diff --git a/pkg/fileservice/local_etl_fs.go b/pkg/fileservice/local_etl_fs.go index 6da622800b54..4ecbb40c9dc1 100644 --- a/pkg/fileservice/local_etl_fs.go +++ b/pkg/fileservice/local_etl_fs.go @@ -521,6 +521,10 @@ func (l *LocalETLFS) ensureDir(nativePath string) error { // create if err := os.Mkdir(nativePath, 0755); err != nil { + if os.IsExist(err) { + // existed + return nil + } return err } diff --git a/pkg/fileservice/local_fs.go b/pkg/fileservice/local_fs.go index 8267cd72c0a8..321538536658 100644 --- a/pkg/fileservice/local_fs.go +++ b/pkg/fileservice/local_fs.go @@ -825,6 +825,10 @@ func (l *LocalFS) ensureDir(nativePath string) error { // create if err := os.Mkdir(nativePath, 0755); err != nil { + if os.IsExist(err) { + // existed + return nil + } return err } diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 9820d7a93241..69d9a5e8ecf5 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -430,17 +430,17 @@ func LoadCheckpointEntriesFromKey( location objectio.Location, version uint32, softDeletes *map[string]bool, -) ([]objectio.ObjectName, *CheckpointData, error) { - locations := make([]objectio.ObjectName, 0) +) ([]objectio.Location, *CheckpointData, error) { + locations := make([]objectio.Location, 0) data, err := getCheckpointData(ctx, fs, location, version) if err != nil { return nil, nil, err } - locations = append(locations, location.Name()) + locations = append(locations, location) for _, location = range data.locations { - locations = append(locations, location.Name()) + locations = append(locations, location) } for i := 0; i < data.bats[ObjectInfoIDX].Length(); i++ { var objectStats objectio.ObjectStats @@ -456,7 +456,7 @@ func LoadCheckpointEntriesFromKey( panic(fmt.Sprintf("object %v is not deleted", objectStats.ObjectName().String())) } - locations = append(locations, objectStats.ObjectName()) + locations = append(locations, objectStats.ObjectLocation()) if !deletedAt.IsEmpty() { if softDeletes != nil { if !(*softDeletes)[objectStats.ObjectName().String()] { @@ -488,7 +488,7 @@ func LoadCheckpointEntriesFromKey( data.bats[BLKMetaInsertIDX].GetVectorByName(catalog.BlockMeta_MetaLoc).Get(i).([]byte)) panic(fmt.Sprintf("block %v deltaLoc is empty", metaLoc.String())) } - locations = append(locations, deltaLoc.Name()) + locations = append(locations, deltaLoc) } for i := 0; i < data.bats[BLKCNMetaInsertIDX].Length(); i++ { metaLoc := objectio.Location( @@ -509,7 +509,7 @@ func LoadCheckpointEntriesFromKey( if deltaLoc.IsEmpty() { panic(fmt.Sprintf("block %v deltaLoc is empty", deltaLoc.String())) } - locations = append(locations, deltaLoc.Name()) + locations = append(locations, deltaLoc) } return locations, data, nil }