Skip to content

Commit

Permalink
restore: rewrite auto increment id after pitr (#46521) (#46634)
Browse files Browse the repository at this point in the history
close #46520
  • Loading branch information
ti-chi-bot committed Sep 4, 2023
1 parent e2ba276 commit 7c75786
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 1 deletion.
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//br/pkg/storage",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/version",
"//ddl",
"//distsql",
"//kv",
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -553,6 +554,15 @@ func BuildBackupRangeAndSchema(
}

for _, tableInfo := range tables {
if tableInfo.Version > version.CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION {
// normally this shouldn't happen in a production env.
// because we had a unit test to avoid table info version update silencly.
// and had version check before run backup.
return nil, nil, nil, errors.Errorf("backup doesn't not support table %s with version %d, maybe try a new version of br",
tableInfo.Name.String(),
tableInfo.Version,
)
}
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
continue
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,20 @@ func (sr *SchemasReplace) rewriteEntryForTable(e *kv.Entry, cf string) (*kv.Entr
return &kv.Entry{Key: newKey, Value: result.NewValue}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoIncrementIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, needWrite, err := sr.rewriteKeyForTable(
e.Key,
cf,
meta.ParseAutoIncrementIDKey,
meta.AutoIncrementIDKey,
)
if err != nil || !needWrite {
return nil, errors.Trace(err)
}

return &kv.Entry{Key: newKey, Value: e.Value}, nil
}

func (sr *SchemasReplace) rewriteEntryForAutoTableIDKey(e *kv.Entry, cf string) (*kv.Entry, error) {
newKey, needWrite, err := sr.rewriteKeyForTable(
e.Key,
Expand Down Expand Up @@ -533,6 +547,8 @@ func (sr *SchemasReplace) RewriteKvEntry(e *kv.Entry, cf string) (*kv.Entry, err
} else if meta.IsDBkey(rawKey.Key) {
if meta.IsTableKey(rawKey.Field) {
return sr.rewriteEntryForTable(e, cf)
} else if meta.IsAutoIncrementIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoIncrementIDKey(e, cf)
} else if meta.IsAutoTableIDKey(rawKey.Field) {
return sr.rewriteEntryForAutoTableIDKey(e, cf)
} else if meta.IsSequenceKey(rawKey.Field) {
Expand Down
76 changes: 75 additions & 1 deletion br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"testing"

"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -82,7 +83,80 @@ func TestRewriteValueForDB(t *testing.T) {
require.Equal(t, newId, sr.DbMap[dbID].NewDBID)
}

func TestRewriteValueForTable(t *testing.T) {
func TestRewriteKeyForTable(t *testing.T) {
var (
dbID int64 = 1
tableID int64 = 57
ts uint64 = 400036290571534337
)
cases := []struct {
encodeTableFn func(int64) []byte
decodeTableFn func([]byte) (int64, error)
}{
{
meta.TableKey,
meta.ParseTableKey,
},
{
meta.AutoIncrementIDKey,
meta.ParseAutoIncrementIDKey,
},
{
meta.AutoTableIDKey,
meta.ParseAutoTableIDKey,
},
{
meta.AutoRandomTableIDKey,
meta.ParseAutoRandomTableIDKey,
},
{
meta.SequenceKey,
meta.ParseSequenceKey,
},
}

for _, ca := range cases {
encodedKey := encodeTxnMetaKey(meta.DBkey(dbID), ca.encodeTableFn(tableID), ts)
// create schemasReplace.
sr := MockEmptySchemasReplace(nil)

newKey, needWrite, err := sr.rewriteKeyForTable(encodedKey, DefaultCF, ca.decodeTableFn, ca.encodeTableFn)
require.Nil(t, err)
require.True(t, needWrite)
require.Equal(t, len(sr.DbMap), 1)
require.Equal(t, len(sr.DbMap[dbID].TableMap), 1)
downStreamDbID := sr.DbMap[dbID].NewDBID
downStreamTblID := sr.DbMap[dbID].TableMap[tableID].NewTableID

decodedKey, err := ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, ts)

newDbID, err := meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err := ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)

// rewrite it again, and get the same result.
newKey, needWrite, err = sr.rewriteKeyForTable(encodedKey, WriteCF, ca.decodeTableFn, ca.encodeTableFn)
require.True(t, needWrite)
require.Nil(t, err)
decodedKey, err = ParseTxnMetaKeyFrom(newKey)
require.Nil(t, err)
require.Equal(t, decodedKey.Ts, sr.RewriteTS)

newDbID, err = meta.ParseDBKey(decodedKey.Key)
require.Nil(t, err)
require.Equal(t, newDbID, downStreamDbID)
newTblID, err = ca.decodeTableFn(decodedKey.Field)
require.Nil(t, err)
require.Equal(t, newTblID, downStreamTblID)
}
}

func TestRewriteTableInfo(t *testing.T) {
var (
dbId int64 = 40
tableID int64 = 100
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -103,6 +104,12 @@ const (
flagFullBackupType = "type"
)

const (
// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// FullBackupType type when doing full backup or restore
type FullBackupType string

Expand Down
3 changes: 3 additions & 0 deletions br/pkg/version/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//br/pkg/logutil",
"//br/pkg/utils",
"//br/pkg/version/build",
"//parser/model",
"//sessionctx/variable",
"//util/engine",
"@com_github_coreos_go_semver//semver",
Expand All @@ -27,8 +28,10 @@ go_test(
srcs = ["version_test.go"],
embed = [":version"],
flaky = True,
shard_count = 10,
deps = [
"//br/pkg/version/build",
"//parser/model",
"//sessionctx/variable",
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/engine"
pd "github.com/tikv/pd/client"
Expand All @@ -36,6 +37,10 @@ var (
checkpointSupportError error = nil
// pitrSupportBatchKVFiles specifies whether TiKV-server supports batch PITR.
pitrSupportBatchKVFiles bool = false

// Once TableInfoVersion updated. BR need to check compatibility with
// new TableInfoVersion. both snapshot restore and pitr need to be checked.
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION = model.TableInfoVersion5
)

// NextMajorVersion returns the next major version.
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -633,3 +634,11 @@ Check Table Before Drop: false`
require.NoError(t, err)
require.Equal(t, "5.7.25", versionStr)
}

func TestEnsureSupportVersion(t *testing.T) {
// Once this test failed. please check the compatibility carefully.
// *** Don't change this test simply. ***
require.Equal(t,
CURRENT_BACKUP_SUPPORT_TABLE_INFO_VERSION,
model.CurrLatestTableInfoVersion)
}
21 changes: 21 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,30 @@ func ParseAutoTableIDKey(key []byte) (int64, error) {
}

func (*Meta) autoIncrementIDKey(tableID int64) []byte {
return AutoIncrementIDKey(tableID)
}

// AutoIncrementIDKey decodes the auto inc table key.
func AutoIncrementIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mIncIDPrefix, tableID))
}

// IsAutoIncrementIDKey checks whether the key is auto increment key.
func IsAutoIncrementIDKey(key []byte) bool {
return strings.HasPrefix(string(key), mIncIDPrefix+":")
}

// ParseAutoIncrementIDKey decodes the tableID from the auto tableID key.
func ParseAutoIncrementIDKey(key []byte) (int64, error) {
if !IsAutoIncrementIDKey(key) {
return 0, ErrInvalidString.GenWithStack("fail to parse autoIncrementKey")
}

tableID := strings.TrimPrefix(string(key), mIncIDPrefix+":")
id, err := strconv.Atoi(tableID)
return int64(id), err
}

func (*Meta) autoRandomTableIDKey(tableID int64) []byte {
return AutoRandomTableIDKey(tableID)
}
Expand Down

0 comments on commit 7c75786

Please sign in to comment.