Skip to content

Commit

Permalink
br: pipeline backup schemas (#43003)
Browse files Browse the repository at this point in the history
close #43002
  • Loading branch information
Leavrth committed Apr 17, 2023
1 parent 8e7ebb3 commit 9cf0ed8
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 89 deletions.
138 changes: 96 additions & 42 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ func (bc *Client) BuildBackupRangeAndSchema(
isFullBackup bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) {
if bc.checkpointMeta == nil {
return BuildBackupRangeAndSchema(storage, tableFilter, backupTS, isFullBackup, true)
return BuildBackupRangeAndInitSchema(storage, tableFilter, backupTS, isFullBackup, true)
}
_, schemas, policies, err := BuildBackupRangeAndSchema(storage, tableFilter, backupTS, isFullBackup, false)
_, schemas, policies, err := BuildBackupRangeAndInitSchema(storage, tableFilter, backupTS, isFullBackup, false)
schemas.SetCheckpointChecksum(bc.checkpointMeta.CheckpointChecksum)
return bc.checkpointMeta.Ranges, schemas, policies, errors.Trace(err)
}
Expand Down Expand Up @@ -492,7 +492,7 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
// BuildBackupRangeAndSchema gets KV range and schema of tables.
// KV ranges are separated by Table IDs.
// Also, KV ranges are separated by Index IDs in the same table.
func BuildBackupRangeAndSchema(
func BuildBackupRangeAndInitSchema(
storage kv.Storage,
tableFilter filter.Filter,
backupTS uint64,
Expand Down Expand Up @@ -521,7 +521,7 @@ func BuildBackupRangeAndSchema(
}

ranges := make([]rtree.Range, 0)
backupSchemas := NewBackupSchemas()
schemasNum := 0
dbs, err := m.ListDatabases()
if err != nil {
return nil, nil, nil, errors.Trace(err)
Expand All @@ -533,14 +533,72 @@ func BuildBackupRangeAndSchema(
continue
}

tables, err := m.ListTables(dbInfo.ID)
hasTable := false
err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error {
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
return nil
}

schemasNum += 1
hasTable = true
if buildRange {
tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
return errors.Trace(err)
}
for _, r := range tableRanges {
// Add keyspace prefix to BackupRequest
startKey, endKey := storage.GetCodec().EncodeRange(r.StartKey, r.EndKey)
ranges = append(ranges, rtree.Range{
StartKey: startKey,
EndKey: endKey,
})
}
}

return nil
})

if err != nil {
return nil, nil, nil, errors.Trace(err)
}

if len(tables) == 0 {
if !hasTable {
log.Info("backup empty database", zap.Stringer("db", dbInfo.Name))
backupSchemas.AddSchema(dbInfo, nil)
schemasNum += 1
}
}

if schemasNum == 0 {
log.Info("nothing to backup")
return nil, nil, nil, nil
}
return ranges, NewBackupSchemas(func(storage kv.Storage, fn func(*model.DBInfo, *model.TableInfo)) error {
return BuildBackupSchemas(storage, tableFilter, backupTS, isFullBackup, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) {
fn(dbInfo, tableInfo)
})
}, schemasNum), policies, nil
}

func BuildBackupSchemas(
storage kv.Storage,
tableFilter filter.Filter,
backupTS uint64,
isFullBackup bool,
fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo),
) error {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

dbs, err := m.ListDatabases()
if err != nil {
return errors.Trace(err)
}

for _, dbInfo := range dbs {
// skip system databases
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
continue
}

Expand All @@ -550,10 +608,11 @@ func BuildBackupRangeAndSchema(
dbInfo.PlacementPolicyRef = nil
}

for _, tableInfo := range tables {
hasTable := false
err = m.IterTables(dbInfo.ID, func(tableInfo *model.TableInfo) error {
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
continue
return nil
}

logger := log.L().With(
Expand All @@ -573,7 +632,7 @@ func BuildBackupRangeAndSchema(
globalAutoID, err = autoIDAccess.RowID().Get()
}
if err != nil {
return nil, nil, nil, errors.Trace(err)
return errors.Trace(err)
}
tableInfo.AutoIncID = globalAutoID + 1
if !isFullBackup {
Expand All @@ -590,7 +649,7 @@ func BuildBackupRangeAndSchema(
var globalAutoRandID int64
globalAutoRandID, err = autoIDAccess.RandomID().Get()
if err != nil {
return nil, nil, nil, errors.Trace(err)
return errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID + 1
logger.Debug("change table AutoRandID",
Expand All @@ -609,59 +668,54 @@ func BuildBackupRangeAndSchema(
}
tableInfo.Indices = tableInfo.Indices[:n]

backupSchemas.AddSchema(dbInfo, tableInfo)
fn(dbInfo, tableInfo)
hasTable = true

if buildRange {
tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
for _, r := range tableRanges {
ranges = append(ranges, rtree.Range{
StartKey: r.StartKey,
EndKey: r.EndKey,
})
}
}
return nil
})

if err != nil {
return errors.Trace(err)
}
}

if backupSchemas.Len() == 0 {
log.Info("nothing to backup")
return nil, nil, nil, nil
if !hasTable {
log.Info("backup empty database", zap.Stringer("db", dbInfo.Name))
fn(dbInfo, nil)
}
}
return ranges, backupSchemas, policies, nil

return nil
}

// BuildFullSchema builds a full backup schemas for databases and tables.
func BuildFullSchema(storage kv.Storage, backupTS uint64) (*Schemas, error) {
func BuildFullSchema(storage kv.Storage, backupTS uint64, fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo)) error {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

newBackupSchemas := NewBackupSchemas()
dbs, err := m.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

for _, db := range dbs {
tables, err := m.ListTables(db.ID)
hasTable := false
err = m.IterTables(db.ID, func(table *model.TableInfo) error {
// add table
fn(db, table)
hasTable = true
return nil
})
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

// backup this empty db if this schema is empty.
if len(tables) == 0 {
newBackupSchemas.AddSchema(db, nil)
}

for _, table := range tables {
// add table
newBackupSchemas.AddSchema(db, table)
if !hasTable {
fn(db, nil)
}
}

return newBackupSchemas, nil
return nil
}

func skipUnsupportedDDLJob(job *model.Job) bool {
Expand Down
48 changes: 21 additions & 27 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package backup
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -41,18 +40,22 @@ type schemaInfo struct {
stats *handle.JSONTable
}

type iterFuncTp func(kv.Storage, func(*model.DBInfo, *model.TableInfo)) error

// Schemas is task for backuping schemas.
type Schemas struct {
// name -> schema
schemas map[string]*schemaInfo
iterFunc iterFuncTp

size int

// checkpoint: table id -> checksum
checkpointChecksum map[int64]*checkpoint.ChecksumItem
}

func NewBackupSchemas() *Schemas {
func NewBackupSchemas(iterFunc iterFuncTp, size int) *Schemas {
return &Schemas{
schemas: make(map[string]*schemaInfo),
iterFunc: iterFunc,
size: size,
checkpointChecksum: nil,
}
}
Expand All @@ -61,23 +64,6 @@ func (ss *Schemas) SetCheckpointChecksum(checkpointChecksum map[int64]*checkpoin
ss.checkpointChecksum = checkpointChecksum
}

func (ss *Schemas) AddSchema(
dbInfo *model.DBInfo, tableInfo *model.TableInfo,
) {
if tableInfo == nil {
ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{
dbInfo: dbInfo,
}
return
}
name := fmt.Sprintf("%s.%s",
utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
ss.schemas[name] = &schemaInfo{
tableInfo: tableInfo,
dbInfo: dbInfo,
}
}

// BackupSchemas backups table info, including checksum and stats.
func (ss *Schemas) BackupSchemas(
ctx context.Context,
Expand All @@ -102,10 +88,15 @@ func (ss *Schemas) BackupSchemas(
startAll := time.Now()
op := metautil.AppendSchema
metaWriter.StartWriteMetasAsync(ctx, op)
for _, s := range ss.schemas {
schema := s
// Because schema.dbInfo is a pointer that many tables point to.
// Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations.
err := ss.iterFunc(store, func(dbInfo *model.DBInfo, tableInfo *model.TableInfo) {
// because the field of `dbInfo` would be modified, which affects the later iteration.
// so copy the `dbInfo` for each to `newDBInfo`
newDBInfo := *dbInfo
schema := &schemaInfo{
tableInfo: tableInfo,
dbInfo: &newDBInfo,
}

if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
Expand Down Expand Up @@ -176,6 +167,9 @@ func (ss *Schemas) BackupSchemas(
}
return nil
})
})
if err != nil {
return errors.Trace(err)
}
if err := errg.Wait(); err != nil {
return errors.Trace(err)
Expand All @@ -187,7 +181,7 @@ func (ss *Schemas) BackupSchemas(

// Len returns the number of schemas.
func (ss *Schemas) Len() int {
return len(ss.schemas)
return ss.size
}

func (s *schemaInfo) calculateChecksum(
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
// Table t1 is not exist.
testFilter, err := filter.Parse([]string{"test.t1"})
require.NoError(t, err)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(
m.Storage, testFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.NotNil(t, backupSchemas)

// Database is not exist.
fooFilter, err := filter.Parse([]string{"foo.t1"})
require.NoError(t, err)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, fooFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.Nil(t, backupSchemas)
Expand All @@ -124,7 +124,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
require.NoError(t, err)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, noFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.NotNil(t, backupSchemas)
Expand All @@ -136,7 +136,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
tk.MustExec("create placement policy fivereplicas followers=4;")

var policies []*backuppb.PlacementPolicy
_, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema(
_, backupSchemas, policies, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, testFilter, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
tk.MustExec("insert into t2 values (10);")
tk.MustExec("insert into t2 values (11);")

_, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema(
_, backupSchemas, policies, err = backup.BuildBackupRangeAndInitSchema(
m.Storage, noFilter, math.MaxUint64, true, true)
require.NoError(t, err)
require.Equal(t, 2, backupSchemas.Len())
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
f, err := filter.Parse([]string{"test.t3"})
require.NoError(t, err)

_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false, true)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
// recover the statistics.
tk.MustExec("analyze table t3;")

_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false, true)
_, backupSchemas, _, err = backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -294,7 +294,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) {

f, err := filter.Parse([]string{"mysql.systable*"})
require.NoError(t, err)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false, true)
_, backupSchemas, _, err := backup.BuildBackupRangeAndInitSchema(m.Storage, f, math.MaxUint64, false, true)
require.NoError(t, err)
require.Equal(t, systemTablesCount, backupSchemas.Len())

Expand Down

0 comments on commit 9cf0ed8

Please sign in to comment.