Skip to content

Commit

Permalink
Add backup parallel (#14543)
Browse files Browse the repository at this point in the history
Add backup parallel

Approved by: @reusee, @XuPeng-SH, @daviszhen, @sukki37
  • Loading branch information
LeftHandCold committed Feb 4, 2024
1 parent e4f57c7 commit 0b7dda3
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func backupConfigs(ctx context.Context, cfg *Config) error {

var backupTae = func(ctx context.Context, config *Config) error {
fs := fileservice.SubPath(config.TaeDir, taeDir)
return BackupData(ctx, config.SharedFs, fs, "")
return BackupData(ctx, config.SharedFs, fs, "", int(config.Parallelism))
}

func backupHakeeper(ctx context.Context, config *Config) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestBackupData(t *testing.T) {
for _, location := range files {
locations = append(locations, location)
}
err = execBackup(ctx, db.Opts.Fs, service, locations)
err = execBackup(ctx, db.Opts.Fs, service, locations, 1)
assert.Nil(t, err)
db.Opts.Fs = service
db.Restart(ctx)
Expand Down
240 changes: 182 additions & 58 deletions pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
"io"
"os"
"path"
runtime2 "runtime"
"strconv"
"strings"
"sync"
"time"
)

Expand Down Expand Up @@ -68,7 +72,7 @@ func getFileNames(ctx context.Context, retBytes [][][]byte) ([]string, error) {
return fileName, err
}

func BackupData(ctx context.Context, srcFs, dstFs fileservice.FileService, dir string) error {
func BackupData(ctx context.Context, srcFs, dstFs fileservice.FileService, dir string, count int) error {
v, ok := runtime.ProcessLevelRuntime().GetGlobalVariables(runtime.InternalSQLExecutor)
if !ok {
return moerr.NewNotSupported(ctx, "no implement sqlExecutor")
Expand All @@ -92,21 +96,156 @@ func BackupData(ctx context.Context, srcFs, dstFs fileservice.FileService, dir s
if err != nil {
return err
}
return execBackup(ctx, srcFs, dstFs, fileName)
return execBackup(ctx, srcFs, dstFs, fileName, count)
}

func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names []string) error {
func getParallelCount(count int) int {
if count > 0 && count < 512 {
return count
}
cupNum := runtime2.NumCPU()
if cupNum < 8 {
return 50
} else if cupNum < 16 {
return 80
} else if cupNum < 32 {
return 128
} else if cupNum < 64 {
return 256
}
return 512
}

// parallelCopyData copy data from srcFs to dstFs in parallel
func parallelCopyData(srcFs, dstFs fileservice.FileService,
files map[string]objectio.Location,
parallelCount int,
gcFileMap map[string]string,
) ([]*taeFile, error) {
var copyCount, skipCount, copySize int64
var printMutex, fileMutex sync.Mutex
stopPrint := false
defer func() {
printMutex.Lock()
if !stopPrint {
stopPrint = true
}
printMutex.Unlock()
}()
// record files
taeFileList := make([]*taeFile, 0, len(files))

jobScheduler := tasks.NewParallelJobScheduler(parallelCount)
defer jobScheduler.Stop()
go func() {
for {
printMutex.Lock()
if stopPrint {
printMutex.Unlock()
break
}
printMutex.Unlock()
fileMutex.Lock()
logutil.Info("backup", common.OperationField("copy file"),
common.AnyField("copy file size", copySize),
common.AnyField("copy file num", copyCount),
common.AnyField("skip file num", skipCount),
common.AnyField("total file num", len(files)))
fileMutex.Unlock()
time.Sleep(time.Second * 5)
}
}()

backupJobs := make([]*tasks.Job, len(files))
getJob := func(srcFs, dstFs fileservice.FileService, location objectio.Location) *tasks.Job {
job := new(tasks.Job)
job.Init(context.Background(), location.Name().String(), tasks.JTAny,
func(_ context.Context) *tasks.JobResult {

name := location.Name().String()
size := location.Extent().End() + objectio.FooterSize
checksum, err := CopyFile(context.Background(), srcFs, dstFs, location.Name().String(), "")
if err != nil {
if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) &&
isGC(gcFileMap, name) {
fileMutex.Lock()
skipCount++
fileMutex.Unlock()
return &tasks.JobResult{
Res: nil,
}
} else {
return &tasks.JobResult{
Err: err,
Res: nil,
}
}
}
fileMutex.Lock()
copyCount++
copySize += int64(size)
taeFileList = append(taeFileList, &taeFile{
path: name,
size: int64(size),
checksum: checksum,
})
fileMutex.Unlock()
return &tasks.JobResult{
Res: nil,
}
})
return job
}

idx := 0
for n := range files {
backupJobs[idx] = getJob(srcFs, dstFs, files[n])
idx++
}

for n := range backupJobs {
err := jobScheduler.Schedule(backupJobs[n])
if err != nil {
logutil.Infof("schedule job failed %v", err.Error())
return nil, err
}

}

for n := range backupJobs {

ret := backupJobs[n].WaitDone()
if ret.Err != nil {
logutil.Infof("wait job done failed %v", ret.Err.Error())
return nil, ret.Err
}
}

logutil.Info("backup", common.OperationField("copy file"),
common.AnyField("copy file size", copySize),
common.AnyField("copy file num", copyCount),
common.AnyField("skip file num", skipCount),
common.AnyField("total file num", len(files)))
return taeFileList, nil
}

func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names []string, count int) error {
backupTime := names[0]
trimInfo := names[1]
names = names[1:]
files := make(map[string]*fileservice.DirEntry, 0)
files := make(map[string]objectio.Location, 0)
table := gc.NewGCTable()
gcFileMap := make(map[string]string)
softDeletes := make(map[string]bool)
var loadDuration, copyDuration, reWriteDuration time.Duration
var oNames []objectio.ObjectName
var oNames []objectio.Location
parallelNum := getParallelCount(count)
logutil.Info("backup", common.OperationField("start backup"),
common.AnyField("backup time", backupTime),
common.AnyField("checkpoint num", len(names)),
common.AnyField("parallel num", parallelNum))
defer func() {
logutil.Info("backup", common.OperationField("exec backup"),
logutil.Info("backup", common.OperationField("end backup"),
common.AnyField("load checkpoint cost", loadDuration),
common.AnyField("copy file cost", copyDuration),
common.AnyField("rewrite checkpoint cost", reWriteDuration))
Expand All @@ -129,7 +268,7 @@ func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names
if err != nil {
return err
}
var oneNames []objectio.ObjectName
var oneNames []objectio.Location
var data *logtail.CheckpointData
if i == 0 {
oneNames, data, err = logtail.LoadCheckpointEntriesFromKey(ctx, srcFs, key, uint32(version), nil)
Expand All @@ -148,42 +287,12 @@ func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names
loadDuration += time.Since(now)
now = time.Now()
for _, oName := range oNames {
if files[oName.String()] == nil {
dentry, err := srcFs.StatFile(ctx, oName.String())
if err != nil {
if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) &&
isGC(gcFileMap, oName.String()) {
continue
} else {
return err
}
}
files[oName.String()] = dentry
}
}
// record files
taeFileList := make([]*taeFile, 0, len(files))
for _, dentry := range files {
if dentry.IsDir {
panic("not support dir")
}
checksum, err := CopyFile(ctx, srcFs, dstFs, dentry, "")
if err != nil {
if moerr.IsMoErrCode(err, moerr.ErrFileNotFound) &&
isGC(gcFileMap, dentry.Name) {
continue
} else {
return err
}

if files[oName.Name().String()] == nil {
files[oName.Name().String()] = oName
}
taeFileList = append(taeFileList, &taeFile{
path: dentry.Name,
size: dentry.Size,
checksum: checksum,
})
}

// trim checkpoint and block
var cnLoc, tnLoc, mergeStart, mergeEnd string
var end, start types.TS
var version uint64
Expand All @@ -205,6 +314,13 @@ func execBackup(ctx context.Context, srcFs, dstFs fileservice.FileService, names
}
}

// copy data
taeFileList, err := parallelCopyData(srcFs, dstFs, files, parallelNum, gcFileMap)
if err != nil {
return err
}

// copy checkpoint and gc meta
sizeList, err := CopyDir(ctx, srcFs, dstFs, "ckp", start)
if err != nil {
return err
Expand Down Expand Up @@ -281,7 +397,7 @@ func CopyDir(ctx context.Context, srcFs, dstFs fileservice.FileService, dir stri
logutil.Infof("[Backup] skip file %v", file.Name)
continue
}
checksum, err = CopyFile(ctx, srcFs, dstFs, &file, dir)
checksum, err = CopyFile(ctx, srcFs, dstFs, file.Name, dir)
if err != nil {
return nil, err
}
Expand All @@ -295,40 +411,48 @@ func CopyDir(ctx context.Context, srcFs, dstFs fileservice.FileService, dir stri
}

// CopyFile copy file from srcFs to dstFs and return checksum of the written file.
func CopyFile(ctx context.Context, srcFs, dstFs fileservice.FileService, dentry *fileservice.DirEntry, dstDir string) ([]byte, error) {
name := dentry.Name
func CopyFile(ctx context.Context, srcFs, dstFs fileservice.FileService, name, dstDir string) ([]byte, error) {
if dstDir != "" {
name = path.Join(dstDir, name)
}
var reader io.ReadCloser
ioVec := &fileservice.IOVector{
FilePath: name,
Entries: make([]fileservice.IOEntry, 1),
Policy: fileservice.SkipAllCache,
}
ioVec.Entries[0] = fileservice.IOEntry{
Offset: 0,
Size: dentry.Size,
Entries: []fileservice.IOEntry{
{
ReadCloserForRead: &reader,
Offset: 0,
Size: -1,
},
},
Policy: fileservice.SkipAllCache,
}

err := srcFs.Read(ctx, ioVec)
if err != nil {
return nil, err
}
defer reader.Close()
// hash
hasher := sha256.New()
hashingReader := io.TeeReader(reader, hasher)
dstIoVec := fileservice.IOVector{
FilePath: name,
Entries: make([]fileservice.IOEntry, 1),
Policy: fileservice.SkipAllCache,
}
dstIoVec.Entries[0] = fileservice.IOEntry{
Offset: 0,
Data: ioVec.Entries[0].Data,
Size: dentry.Size,
Entries: []fileservice.IOEntry{
{
ReaderForWrite: hashingReader,
Offset: 0,
Size: -1,
},
},
Policy: fileservice.SkipAllCache,
}

err = dstFs.Write(ctx, dstIoVec)
if err != nil {
return nil, err
}
checksum := sha256.Sum256(ioVec.Entries[0].Data)
return checksum[:], err
return hasher.Sum(nil), nil
}

func mergeGCFile(gcFiles []string, gcFileMap map[string]string) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/fileservice/local_etl_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,10 @@ func (l *LocalETLFS) ensureDir(nativePath string) error {

// create
if err := os.Mkdir(nativePath, 0755); err != nil {
if os.IsExist(err) {
// existed
return nil
}
return err
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/fileservice/local_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,10 @@ func (l *LocalFS) ensureDir(nativePath string) error {

// create
if err := os.Mkdir(nativePath, 0755); err != nil {
if os.IsExist(err) {
// existed
return nil
}
return err
}

Expand Down

0 comments on commit 0b7dda3

Please sign in to comment.