Skip to content
This repository was archived by the owner on Jul 24, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
go.uber.org/multierr v1.5.0
go.uber.org/zap v1.15.0
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
google.golang.org/api v0.15.1
google.golang.org/grpc v1.26.0
)
207 changes: 92 additions & 115 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"github.com/pingcap/br/pkg/conn"
"github.com/pingcap/br/pkg/glue"
Expand Down Expand Up @@ -65,9 +66,8 @@ type Client struct {
mgr ClientMgr
clusterID uint64

backupMeta kvproto.BackupMeta
storage storage.ExternalStorage
backend *kvproto.StorageBackend
storage storage.ExternalStorage
backend *kvproto.StorageBackend

gcTTL int64
}
Expand Down Expand Up @@ -165,22 +165,35 @@ func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBacken
return nil
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
// BuildBackupMeta constructs the backup meta file from its components.
func BuildBackupMeta(
req *kvproto.BackupRequest,
files []*kvproto.File,
rawRanges []*kvproto.RawRange,
ddlJobs []*model.Job,
) (backupMeta kvproto.BackupMeta, err error) {
backupMeta.StartVersion = req.StartVersion
backupMeta.EndVersion = req.EndVersion
backupMeta.IsRawKv = req.IsRawKv
backupMeta.RawRanges = rawRanges
backupMeta.Files = files
backupMeta.Ddls, err = json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
err = errors.Trace(err)
return
}
return
}

bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context, backupMeta *kvproto.BackupMeta) error {
backupMetaData, err := proto.Marshal(backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
log.Debug("backup meta", zap.Reflect("meta", backupMeta))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs)))
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("size", len(backupMetaData)))
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -380,25 +393,46 @@ func (bc *Client) BackupRanges(
ctx context.Context,
ranges []rtree.Range,
req kvproto.BackupRequest,
concurrency uint,
updateCh glue.Progress,
) error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("Backup Ranges", zap.Duration("take", elapsed))
}()

) ([]*kvproto.File, error) {
errCh := make(chan error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// we collect all files in a single goroutine to avoid thread safety issues.
filesCh := make(chan []*kvproto.File, concurrency)
allFiles := make([]*kvproto.File, 0, len(ranges))
allFilesCollected := make(chan struct{}, 1)
go func() {
init := time.Now()
start, cur := init, init
for files := range filesCh {
cur, start = start, time.Now()
allFiles = append(allFiles, files...)
summary.CollectSuccessUnit("backup ranges", 1, cur.Sub(start))
}
log.Info("Backup Ranges", zap.Duration("take", cur.Sub(init)))
allFilesCollected <- struct{}{}
}()

go func() {
defer close(filesCh)
workerPool := utils.NewWorkerPool(concurrency, "Ranges")
eg, ectx := errgroup.WithContext(ctx)
for _, r := range ranges {
err := bc.BackupRange(
ctx, r.StartKey, r.EndKey, req, updateCh)
if err != nil {
errCh <- err
return
}
sk, ek := r.StartKey, r.EndKey
workerPool.ApplyOnErrorGroup(eg, func() error {
files, err := bc.BackupRange(ectx, sk, ek, req, updateCh)
if err == nil {
filesCh <- files
}
return err
})
}
if err := eg.Wait(); err != nil {
errCh <- err
return
}
close(errCh)
}()
Expand All @@ -407,62 +441,35 @@ func (bc *Client) BackupRanges(
t := time.NewTicker(time.Second * 5)
defer t.Stop()

backupTS := req.EndVersion
// use lastBackupTS as safePoint if exists
if req.StartVersion > 0 {
backupTS = req.StartVersion
}

log.Info("current backup safePoint job",
zap.Uint64("backupTS", backupTS))

finished := false
for {
err := UpdateServiceSafePoint(ctx, bc.mgr.GetPDClient(), bc.GetGCTTL(), backupTS)
for err := range errCh {
if err != nil {
log.Error("update GC safePoint with TTL failed", zap.Error(err))
return err
}
err = CheckGCSafePoint(ctx, bc.mgr.GetPDClient(), backupTS)
if err != nil {
log.Error("check GC safePoint failed", zap.Error(err))
return err
}
if finished {
// Return error (if there is any) before finishing backup.
return err
}
select {
case err, ok := <-errCh:
if !ok {
// Before finish backup, we have to make sure
// the backup ts does not fall behind with GC safepoint.
finished = true
}
if err != nil {
return err
}
case <-t.C:
return nil, err
}
}

select {
case <-allFilesCollected:
return allFiles, nil
case <-ctx.Done():
return nil, errors.Trace(ctx.Err())
}
}

// BackupRange make a backup of the given key range.
// Returns an array of files backed up.
func (bc *Client) BackupRange(
ctx context.Context,
startKey, endKey []byte,
req kvproto.BackupRequest,
updateCh glue.Progress,
) (err error) {
) (files []*kvproto.File, err error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
log.Info("backup range finished", zap.Duration("take", elapsed))
key := "range start:" + hex.EncodeToString(startKey) + " end:" + hex.EncodeToString(endKey)
if err != nil {
summary.CollectFailureUnit(key, err)
} else {
summary.CollectSuccessUnit(key, 1, elapsed)
}
}()
log.Info("backup started",
Expand All @@ -476,7 +483,7 @@ func (bc *Client) BackupRange(
var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}

req.ClusterId = bc.clusterID
Expand All @@ -489,7 +496,7 @@ func (bc *Client) BackupRange(
var results rtree.RangeTree
results, err = push.pushBackup(req, allStores, updateCh)
if err != nil {
return err
return nil, err
}
log.Info("finish backup push down", zap.Int("Ok", results.Len()))

Expand All @@ -499,15 +506,10 @@ func (bc *Client) BackupRange(
ctx, startKey, endKey, req.StartVersion,
req.EndVersion, req.RateLimit, req.Concurrency, results, updateCh)
if err != nil {
return err
return nil, err
}

bc.backupMeta.StartVersion = req.StartVersion
bc.backupMeta.EndVersion = req.EndVersion
bc.backupMeta.IsRawKv = req.IsRawKv
if req.IsRawKv {
bc.backupMeta.RawRanges = append(bc.backupMeta.RawRanges,
&kvproto.RawRange{StartKey: startKey, EndKey: endKey, Cf: req.Cf})
log.Info("backup raw ranges",
zap.Stringer("startKey", utils.WrapKey(startKey)),
zap.Stringer("endKey", utils.WrapKey(endKey)),
Expand All @@ -520,14 +522,15 @@ func (bc *Client) BackupRange(

results.Ascend(func(i btree.Item) bool {
r := i.(*rtree.Range)
bc.backupMeta.Files = append(bc.backupMeta.Files, r.Files...)
files = append(files, r.Files...)
return true
})

// Check if there are duplicated files.
checkDupFiles(&results)
collectFileInfo(files)

return nil
return files, nil
}

func (bc *Client) findRegionLeader(
Expand Down Expand Up @@ -819,12 +822,12 @@ func SendBackup(
}

// ChecksumMatches tests whether the "local" checksum matches the checksum from TiKV.
func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
if len(local) != len(bc.backupMeta.Schemas) {
func ChecksumMatches(backupMeta *kvproto.BackupMeta, local []Checksum) (bool, error) {
if len(local) != len(backupMeta.Schemas) {
return false, nil
}

for i, schema := range bc.backupMeta.Schemas {
for i, schema := range backupMeta.Schemas {
localChecksum := local[i]
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
Expand Down Expand Up @@ -860,35 +863,30 @@ func (bc *Client) ChecksumMatches(local []Checksum) (bool, error) {
return true, nil
}

// ArchiveSize returns the total size of the archive (before encryption).
func (bc *Client) ArchiveSize() uint64 {
return utils.ArchiveSize(&bc.backupMeta)
}

// CollectFileInfo collects ungrouped file summary information, like kv count and size.
func (bc *Client) CollectFileInfo() {
for _, file := range bc.backupMeta.Files {
// collectFileInfo collects ungrouped file summary information, like kv count and size.
func collectFileInfo(files []*kvproto.File) {
for _, file := range files {
summary.CollectSuccessUnit(summary.TotalKV, 1, file.TotalKvs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, file.TotalBytes)
}
}

// CollectChecksums check data integrity by xor all(sst_checksum) per table
// it returns the checksum of all local files.
func (bc *Client) CollectChecksums() ([]Checksum, error) {
func CollectChecksums(backupMeta *kvproto.BackupMeta) ([]Checksum, error) {
start := time.Now()
defer func() {
elapsed := time.Since(start)
summary.CollectDuration("backup fast checksum", elapsed)
}()

dbs, err := utils.LoadBackupTables(&bc.backupMeta)
dbs, err := utils.LoadBackupTables(backupMeta)
if err != nil {
return nil, err
}

checksums := make([]Checksum, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
checksums := make([]Checksum, 0, len(backupMeta.Schemas))
for _, schema := range backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err = json.Unmarshal(schema.Db, dbInfo)
if err != nil {
Expand Down Expand Up @@ -924,37 +922,16 @@ func (bc *Client) CollectChecksums() ([]Checksum, error) {
return checksums, nil
}

// CompleteMeta wait response of admin checksum from TiDB to complete backup meta.
func (bc *Client) CompleteMeta(backupSchemas *Schemas) error {
schemas, err := backupSchemas.FinishTableChecksum()
if err != nil {
return err
}
bc.backupMeta.Schemas = schemas
return nil
}

// CopyMetaFrom copies schema metadata directly from pending backupSchemas, without calculating checksum.
// use this when user skip the checksum generating.
func (bc *Client) CopyMetaFrom(backupSchemas *Schemas) {
schemas := make([]*kvproto.Schema, 0, len(backupSchemas.schemas))
for _, v := range backupSchemas.schemas {
schema := v
schemas = append(schemas, &schema)
}
bc.backupMeta.Schemas = schemas
}

// FilterSchema filter schema that doesn't have backup files
// FilterSchema filter in-place schemas that doesn't have backup files
// this is useful during incremental backup, no files in backup means no files to restore
// so we can skip some DDL in restore to speed up restoration.
func (bc *Client) FilterSchema() error {
dbs, err := utils.LoadBackupTables(&bc.backupMeta)
func FilterSchema(backupMeta *kvproto.BackupMeta) error {
dbs, err := utils.LoadBackupTables(backupMeta)
if err != nil {
return err
}
schemas := make([]*kvproto.Schema, 0, len(bc.backupMeta.Schemas))
for _, schema := range bc.backupMeta.Schemas {
schemas := make([]*kvproto.Schema, 0, len(backupMeta.Schemas))
for _, schema := range backupMeta.Schemas {
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
Expand All @@ -970,6 +947,6 @@ func (bc *Client) FilterSchema() error {
schemas = append(schemas, schema)
}
}
bc.backupMeta.Schemas = schemas
backupMeta.Schemas = schemas
return nil
}
Loading