forked from pingcap/br
/
schema.go
157 lines (142 loc) · 4.33 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.
package utils
import (
"encoding/json"
"strings"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/br/pkg/logutil"
)
const (
// LockFile represents file name
LockFile = "backup.lock"
// MetaFile represents file name
MetaFile = "backupmeta"
// MetaJSONFile represents backup meta json file name
MetaJSONFile = "backupmeta.json"
// SavedMetaFile represents saved meta file name for recovering later
SavedMetaFile = "backupmeta.bak"
)
// Table wraps the schema and files of a table.
type Table struct {
DB *model.DBInfo
Info *model.TableInfo
Crc64Xor uint64
TotalKvs uint64
TotalBytes uint64
Files []*backuppb.File
TiFlashReplicas int
Stats *handle.JSONTable
}
// NoChecksum checks whether the table has a calculated checksum.
func (tbl *Table) NoChecksum() bool {
return tbl.Crc64Xor == 0 && tbl.TotalKvs == 0 && tbl.TotalBytes == 0
}
// NeedAutoID checks whether the table needs backing up with an autoid.
func NeedAutoID(tblInfo *model.TableInfo) bool {
hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
return hasRowID || hasAutoIncID
}
// Database wraps the schema and tables of a database.
type Database struct {
Info *model.DBInfo
Tables []*Table
}
// GetTable returns a table of the database by name.
func (db *Database) GetTable(name string) *Table {
for _, table := range db.Tables {
if table.Info.Name.String() == name {
return table
}
}
return nil
}
// LoadBackupTables loads schemas from BackupMeta.
func LoadBackupTables(meta *backuppb.BackupMeta) (map[string]*Database, error) {
filesMap := make(map[int64][]*backuppb.File, len(meta.Schemas))
for _, file := range meta.Files {
tableID := tablecodec.DecodeTableID(file.GetStartKey())
if tableID == 0 {
log.Panic("tableID must not equal to 0", logutil.File(file))
}
filesMap[tableID] = append(filesMap[tableID], file)
}
databases := make(map[string]*Database)
for _, schema := range meta.Schemas {
// Parse the database schema.
dbInfo := &model.DBInfo{}
err := json.Unmarshal(schema.Db, dbInfo)
if err != nil {
return nil, errors.Trace(err)
}
// If the database do not ever added into the map, initialize a database object in the map.
db, ok := databases[dbInfo.Name.String()]
if !ok {
db = &Database{
Info: dbInfo,
Tables: make([]*Table, 0),
}
databases[dbInfo.Name.String()] = db
}
// Parse the table schema.
tableInfo := &model.TableInfo{}
err = json.Unmarshal(schema.Table, tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
// stats maybe nil from old backup file.
var stats *handle.JSONTable
if schema.Stats != nil {
stats = &handle.JSONTable{}
// Parse the stats table.
err = json.Unmarshal(schema.Stats, stats)
if err != nil {
return nil, errors.Trace(err)
}
}
partitions := make(map[int64]struct{})
if tableInfo.Partition != nil {
for _, p := range tableInfo.Partition.Definitions {
partitions[p.ID] = struct{}{}
}
}
// Find the files belong to the table
tableFiles := make([]*backuppb.File, 0)
if files, exists := filesMap[tableInfo.ID]; exists {
tableFiles = append(tableFiles, files...)
}
// If the file contains a part of the data of the table, append it to the slice.
for partitionID := range partitions {
tableFiles = append(tableFiles, filesMap[partitionID]...)
}
table := &Table{
DB: dbInfo,
Info: tableInfo,
Crc64Xor: schema.Crc64Xor,
TotalKvs: schema.TotalKvs,
TotalBytes: schema.TotalBytes,
Files: tableFiles,
TiFlashReplicas: int(schema.TiflashReplicas),
Stats: stats,
}
db.Tables = append(db.Tables, table)
}
return databases, nil
}
// ArchiveSize returns the total size of the backup archive.
func ArchiveSize(meta *backuppb.BackupMeta) uint64 {
total := uint64(meta.Size())
for _, file := range meta.Files {
total += file.Size_
}
return total
}
// EncloseName formats name in sql.
func EncloseName(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}