Skip to content

Commit

Permalink
ddl, meta: check flashback history time range (#37737)
Browse files Browse the repository at this point in the history
close #37585
  • Loading branch information
Defined2014 committed Sep 14, 2022
1 parent 563b426 commit 51e6536
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 27 deletions.
65 changes: 65 additions & 0 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
}
if err = CheckFlashbackHistoryTSRange(t, flashbackTS); err != nil {
return err
}

if err = gcutil.DisableGC(sess); err != nil {
return err
Expand Down Expand Up @@ -491,6 +494,15 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
return err
}
}
if job.IsDone() || job.IsSynced() {
gcSafePoint, err := gcutil.GetGCSafePoint(sess)
if err != nil {
return err
}
if err = UpdateFlashbackHistoryTSRanges(t, flashbackTS, t.StartTS, gcSafePoint); err != nil {
return err
}
}
if err = t.SetFlashbackClusterJobID(0); err != nil {
return err
}
Expand All @@ -503,3 +515,56 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {

return nil
}

// CheckFlashbackHistoryTSRange checks flashbackTS overlapped with history time ranges or not.
func CheckFlashbackHistoryTSRange(m *meta.Meta, flashbackTS uint64) error {
tsRanges, err := m.GetFlashbackHistoryTSRange()
if err != nil {
return err
}
for _, tsRange := range tsRanges {
if tsRange.StartTS <= flashbackTS && flashbackTS <= tsRange.EndTS {
return errors.Errorf("FlashbackTs overlapped, old range: [%s, %s], flashbackTS: %s",
oracle.GetTimeFromTS(tsRange.StartTS), oracle.GetTimeFromTS(tsRange.EndTS), oracle.GetTimeFromTS(flashbackTS))
}
}
return nil
}

// UpdateFlashbackHistoryTSRanges insert [startTS, endTS] into FlashbackHistoryTSRange.
func UpdateFlashbackHistoryTSRanges(m *meta.Meta, startTS uint64, endTS uint64, gcSafePoint uint64) error {
tsRanges, err := m.GetFlashbackHistoryTSRange()
if err != nil {
return err
}
if len(tsRanges) != 0 && tsRanges[len(tsRanges)-1].EndTS >= endTS {
// It's impossible, endTS should always greater than all TS in history TS ranges.
return errors.Errorf("Maybe TSO fallback, last flashback endTS: %d, now: %d", tsRanges[len(tsRanges)-1].EndTS, endTS)
}

newTsRange := make([]meta.TSRange, 0, len(tsRanges))

for _, tsRange := range tsRanges {
if tsRange.EndTS < gcSafePoint {
continue
}
if startTS > tsRange.EndTS {
// tsRange.StartTS < tsRange.EndTS < startTS.
// We should keep tsRange in slices.
newTsRange = append(newTsRange, tsRange)
} else if startTS < tsRange.StartTS {
// startTS < tsRange.StartTS < tsRange.EndTS.
// The remained ts ranges are useless, [startTS, endTS] will cover them, so break.
break
} else {
// tsRange.StartTS < startTS < tsRange.EndTS.
// It's impossible reach here, we checked it before start flashback cluster.
return errors.Errorf("It's an unreachable branch, flashbackTS (%d) in old ts range: [%d, %d]",
startTS, tsRange.StartTS, tsRange.EndTS)
}
}

// Store the new tsRange.
newTsRange = append(newTsRange, meta.TSRange{StartTS: startTS, EndTS: endTS})
return m.SetFlashbackHistoryTSRange(newTsRange)
}
60 changes: 60 additions & 0 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -211,6 +212,8 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
tk.MustExec("set global tidb_gc_enable = off")
tk.MustExec("set global tidb_super_read_only = on")

ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
Expand Down Expand Up @@ -265,3 +268,60 @@ func TestCancelFlashbackCluster(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
}

func TestFlashbackTimeRange(t *testing.T) {
store := testkit.CreateMockStore(t)

se, err := session.CreateSession4Test(store)
require.NoError(t, err)
txn, err := se.GetStore().Begin()
require.NoError(t, err)

m := meta.NewMeta(txn)
flashbackTime := oracle.GetTimeFromTS(m.StartTS).Add(-10 * time.Minute)

// No flashback history, shouldn't return err.
require.NoError(t, ddl.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime)))

// Insert a time range to flashback history ts ranges.
require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime), m.StartTS, 0))

historyTS, err := m.GetFlashbackHistoryTSRange()
require.NoError(t, err)
require.Len(t, historyTS, 1)
require.NoError(t, txn.Commit(context.Background()))

se, err = session.CreateSession4Test(store)
require.NoError(t, err)
txn, err = se.GetStore().Begin()
require.NoError(t, err)

m = meta.NewMeta(txn)
require.NoError(t, err)
// Flashback history time range is [m.StartTS - 10min, m.StartTS]
require.Error(t, ddl.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(5*time.Minute))))

// Check add insert a new time range
require.NoError(t, ddl.CheckFlashbackHistoryTSRange(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute))))
require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(-5*time.Minute)), m.StartTS, 0))

historyTS, err = m.GetFlashbackHistoryTSRange()
require.NoError(t, err)
// history time range still equals to 1, because overlapped
require.Len(t, historyTS, 1)

require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m, oracle.GoTimeToTS(flashbackTime.Add(15*time.Minute)), oracle.GoTimeToTS(flashbackTime.Add(20*time.Minute)), 0))
historyTS, err = m.GetFlashbackHistoryTSRange()
require.NoError(t, err)
require.Len(t, historyTS, 2)

// GCSafePoint updated will clean some history TS ranges
require.NoError(t, ddl.UpdateFlashbackHistoryTSRanges(m,
oracle.GoTimeToTS(flashbackTime.Add(25*time.Minute)),
oracle.GoTimeToTS(flashbackTime.Add(30*time.Minute)),
oracle.GoTimeToTS(flashbackTime.Add(22*time.Minute))))
historyTS, err = m.GetFlashbackHistoryTSRange()
require.NoError(t, err)
require.Len(t, historyTS, 1)
require.NoError(t, txn.Commit(context.Background()))
}
16 changes: 9 additions & 7 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func TestFlashbackWithSafeTs(t *testing.T) {
// Set GC safe point.
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

time.Sleep(time.Second)
ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
flashbackTs := oracle.GetTimeFromTS(ts)
testcases := []struct {
Expand All @@ -385,8 +386,15 @@ func TestFlashbackWithSafeTs(t *testing.T) {
compareWithSafeTS int
}{
{
name: "10 seconds ago to now, safeTS 5 secs ago",
name: "5 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs),
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 5 secs ago",
// Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped.
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)),
compareWithSafeTS: -1,
},
Expand All @@ -396,12 +404,6 @@ func TestFlashbackWithSafeTs(t *testing.T) {
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(-10 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "5 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs),
compareWithSafeTS: 0,
},
}
for _, testcase := range testcases {
t.Log(testcase.name)
Expand Down
72 changes: 52 additions & 20 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,27 @@ var (
//

var (
mMetaPrefix = []byte("m")
mNextGlobalIDKey = []byte("NextGlobalID")
mSchemaVersionKey = []byte("SchemaVersionKey")
mDBs = []byte("DBs")
mDBPrefix = "DB"
mTablePrefix = "Table"
mSequencePrefix = "SID"
mSeqCyclePrefix = "SequenceCycle"
mTableIDPrefix = "TID"
mIncIDPrefix = "IID"
mRandomIDPrefix = "TARID"
mBootstrapKey = []byte("BootstrapKey")
mSchemaDiffPrefix = "Diff"
mPolicies = []byte("Policies")
mPolicyPrefix = "Policy"
mPolicyGlobalID = []byte("PolicyGlobalID")
mPolicyMagicByte = CurrentMagicByteVer
mDDLTableVersion = []byte("DDLTableVersion")
mConcurrentDDL = []byte("concurrentDDL")
mInFlashbackCluster = []byte("InFlashbackCluster")
mMetaPrefix = []byte("m")
mNextGlobalIDKey = []byte("NextGlobalID")
mSchemaVersionKey = []byte("SchemaVersionKey")
mDBs = []byte("DBs")
mDBPrefix = "DB"
mTablePrefix = "Table"
mSequencePrefix = "SID"
mSeqCyclePrefix = "SequenceCycle"
mTableIDPrefix = "TID"
mIncIDPrefix = "IID"
mRandomIDPrefix = "TARID"
mBootstrapKey = []byte("BootstrapKey")
mSchemaDiffPrefix = "Diff"
mPolicies = []byte("Policies")
mPolicyPrefix = "Policy"
mPolicyGlobalID = []byte("PolicyGlobalID")
mPolicyMagicByte = CurrentMagicByteVer
mDDLTableVersion = []byte("DDLTableVersion")
mConcurrentDDL = []byte("concurrentDDL")
mInFlashbackCluster = []byte("InFlashbackCluster")
mFlashbackHistoryTSRange = []byte("FlashbackHistoryTSRange")
)

const (
Expand Down Expand Up @@ -608,6 +609,37 @@ func (m *Meta) GetFlashbackClusterJobID() (int64, error) {
return int64(binary.BigEndian.Uint64(val)), nil
}

// TSRange store a range time
type TSRange struct {
StartTS uint64
EndTS uint64
}

// SetFlashbackHistoryTSRange store flashback time range to TiKV
func (m *Meta) SetFlashbackHistoryTSRange(timeRange []TSRange) error {
timeRangeByte, err := json.Marshal(timeRange)
if err != nil {
return err
}
return errors.Trace(m.txn.Set(mFlashbackHistoryTSRange, timeRangeByte))
}

// GetFlashbackHistoryTSRange get flashback time range from TiKV
func (m *Meta) GetFlashbackHistoryTSRange() (timeRange []TSRange, err error) {
timeRangeByte, err := m.txn.Get(mFlashbackHistoryTSRange)
if err != nil {
return nil, err
}
if len(timeRangeByte) == 0 {
return []TSRange{}, nil
}
err = json.Unmarshal(timeRangeByte, &timeRange)
if err != nil {
return nil, err
}
return timeRange, nil
}

// SetConcurrentDDL set the concurrent DDL flag.
func (m *Meta) SetConcurrentDDL(b bool) error {
var data []byte
Expand Down

0 comments on commit 51e6536

Please sign in to comment.