diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 65ff4288987a1..a80396f4fddd2 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//br/pkg/storage", "//br/pkg/summary", "//br/pkg/utils", + "//br/pkg/version", "//ddl", "//distsql", "//kv", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 48d4b2656f999..2102211125f1d 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/kv" @@ -553,6 +554,15 @@ func BuildBackupRangeAndSchema( } for _, tableInfo := range tables { + if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION { + // normally this shouldn't happen in a production env. + // because we had a unit test to avoid table info version update silencly. + // and had version check before run backup. + return nil, nil, nil, errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br", + tableInfo.Name.String(), + tableInfo.Version, + ) + } if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) { // Skip tables other than the given table. continue diff --git a/br/pkg/stream/rewrite_meta_rawkv.go b/br/pkg/stream/rewrite_meta_rawkv.go index 7398abdbb2cb9..348fb8a7ea440 100644 --- a/br/pkg/stream/rewrite_meta_rawkv.go +++ b/br/pkg/stream/rewrite_meta_rawkv.go @@ -384,6 +384,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr return &kv.Entry{Key: newKey, Value: result.NewValue}, nil } +func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) { + newKey, needWrite, err := sr.rewriteKeyForTable( + e.Key, + cf, + meta.ParseAutoIncrementIDKey, + meta.AutoIncrementIDKey, + ) + if err != nil || !needWrite { + return nil, errors.Trace(err) + } + + return &kv.Entry{Key: newKey, Value: e.Value}, nil +} + func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) { newKey, needWrite, err := sr.rewriteKeyForTable( e.Key, @@ -533,6 +547,8 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err } else if meta.IsDBkey(rawKey.Key) { if meta.IsTableKey(rawKey.Field) { return sr.rewriteEntryForTable(e, cf) + } else if meta.IsAutoIncrementIDKey(rawKey.Field) { + return sr.rewriteEntryForAutoIncrementIDKey(e, cf) } else if meta.IsAutoTableIDKey(rawKey.Field) { return sr.rewriteEntryForAutoTableIDKey(e, cf) } else if meta.IsSequenceKey(rawKey.Field) { diff --git a/br/pkg/stream/rewrite_meta_rawkv_test.go b/br/pkg/stream/rewrite_meta_rawkv_test.go index d2cbe24e8295d..00b966ae061b4 100644 --- a/br/pkg/stream/rewrite_meta_rawkv_test.go +++ b/br/pkg/stream/rewrite_meta_rawkv_test.go @@ -7,6 +7,7 @@ import ( "encoding/json" "testing" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" filter "github.com/pingcap/tidb/util/table-filter" "github.com/stretchr/testify/require" @@ -82,7 +83,80 @@ func TestRewriteValueForDB(t *testing.T) { require.Equal(t, newId, sr.DbMap[dbID].NewDBID) } -func TestRewriteValueForTable(t *testing.T) { +func TestRewriteKeyForTable(t *testing.T) { + var ( + dbID int64 = 1 + tableID int64 = 57 + ts uint64 = 400036290571534337 + ) + cases := []struct { + encodeTableFn func(int64) []byte + decodeTableFn func([]byte) (int64, error) + }{ + { + meta.TableKey, + meta.ParseTableKey, + }, + { + meta.AutoIncrementIDKey, + meta.ParseAutoIncrementIDKey, + }, + { + meta.AutoTableIDKey, + meta.ParseAutoTableIDKey, + }, + { + meta.AutoRandomTableIDKey, + meta.ParseAutoRandomTableIDKey, + }, + { + meta.SequenceKey, + meta.ParseSequenceKey, + }, + } + + for _, ca := range cases { + encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts) + // create schemasReplace. + sr := MockEmptySchemasReplace(nil) + + newKey, needWrite, err := sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn) + require.Nil(t, err) + require.True(t, needWrite) + require.Equal(t, len(sr.DbMap), 1) + require.Equal(t, len(sr.DbMap[dbID].TableMap), 1) + downStreamDbID := sr.DbMap[dbID].NewDBID + downStreamTblID := sr.DbMap[dbID].TableMap[tableID].NewTableID + + decodedKey, err := ParseTxnMetaKeyFrom(newKey) + require.Nil(t, err) + require.Equal(t, decodedKey.Ts, ts) + + newDbID, err := meta.ParseDBKey(decodedKey.Key) + require.Nil(t, err) + require.Equal(t, newDbID, downStreamDbID) + newTblID, err := ca.decodeTableFn(decodedKey.Field) + require.Nil(t, err) + require.Equal(t, newTblID, downStreamTblID) + + // rewrite it again, and get the same result. + newKey, needWrite, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn) + require.True(t, needWrite) + require.Nil(t, err) + decodedKey, err = ParseTxnMetaKeyFrom(newKey) + require.Nil(t, err) + require.Equal(t, decodedKey.Ts, sr.RewriteTS) + + newDbID, err = meta.ParseDBKey(decodedKey.Key) + require.Nil(t, err) + require.Equal(t, newDbID, downStreamDbID) + newTblID, err = ca.decodeTableFn(decodedKey.Field) + require.Nil(t, err) + require.Equal(t, newTblID, downStreamTblID) + } +} + +func TestRewriteTableInfo(t *testing.T) { var ( dbId int64 = 40 tableID int64 = 100 diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 190ad8b72ed9c..41bdcd2a96d49 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/cobra" @@ -103,6 +104,12 @@ const ( flagFullBackupType = "type" ) +const ( + // Once TableInfoVersion updated. BR need to check compatibility with + // new TableInfoVersion. both snapshot restore and pitr need to be checked. + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5 +) + // FullBackupType type when doing full backup or restore type FullBackupType string diff --git a/br/pkg/version/BUILD.bazel b/br/pkg/version/BUILD.bazel index 8171081ae5df1..f969242b7aeaa 100644 --- a/br/pkg/version/BUILD.bazel +++ b/br/pkg/version/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//br/pkg/logutil", "//br/pkg/utils", "//br/pkg/version/build", + "//parser/model", "//sessionctx/variable", "//util/engine", "@com_github_coreos_go_semver//semver", @@ -27,8 +28,10 @@ go_test( srcs = ["version_test.go"], embed = [":version"], flaky = True, + shard_count = 10, deps = [ "//br/pkg/version/build", + "//parser/model", "//sessionctx/variable", "@com_github_coreos_go_semver//semver", "@com_github_data_dog_go_sqlmock//:go-sqlmock", diff --git a/br/pkg/version/version.go b/br/pkg/version/version.go index c49e3d1ada923..232bb28358f58 100644 --- a/br/pkg/version/version.go +++ b/br/pkg/version/version.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/engine" pd "github.com/tikv/pd/client" @@ -36,6 +37,10 @@ var ( checkpointSupportError error = nil // pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR. pitrSupportBatchKVFiles bool = false + + // Once TableInfoVersion updated. BR need to check compatibility with + // new TableInfoVersion. both snapshot restore and pitr need to be checked. + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5 ) // NextMajorVersion returns the next major version. diff --git a/br/pkg/version/version_test.go b/br/pkg/version/version_test.go index 1fc654b3990b6..85a738efa6497 100644 --- a/br/pkg/version/version_test.go +++ b/br/pkg/version/version_test.go @@ -13,6 +13,7 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/version/build" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" @@ -633,3 +634,11 @@ Check Table Before Drop: false` require.NoError(t, err) require.Equal(t, "5.7.25", versionStr) } + +func TestEnsureSupportVersion(t *testing.T) { + // Once this test failed. please check the compatibility carefully. + // *** Don't change this test simply. *** + require.Equal(t, + CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION, + model.CurrLatestTableInfoVersion) +} diff --git a/meta/meta.go b/meta/meta.go index 52905f1f50211..6c79ac23b6659 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -276,9 +276,30 @@ func ParseAutoTableIDKey(key []byte) (int64, error) { } func (*Meta) autoIncrementIDKey(tableID int64) []byte { + return AutoIncrementIDKey(tableID) +} + +// AutoIncrementIDKey decodes the auto inc table key. +func AutoIncrementIDKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID)) } +// IsAutoIncrementIDKey checks whether the key is auto increment key. +func IsAutoIncrementIDKey(key []byte) bool { + return strings.HasPrefix(string(key), mIncIDPrefix+":") +} + +// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key. +func ParseAutoIncrementIDKey(key []byte) (int64, error) { + if !IsAutoIncrementIDKey(key) { + return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey") + } + + tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":") + id, err := strconv.Atoi(tableID) + return int64(id), err +} + func (*Meta) autoRandomTableIDKey(tableID int64) []byte { return AutoRandomTableIDKey(tableID) }