Skip to content

Commit

Permalink
This is an automated cherry-pick of #51897
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
wjhuang2016 authored and ti-chi-bot committed Apr 28, 2024
1 parent 0359bbc commit 1052e98
Show file tree
Hide file tree
Showing 14 changed files with 268 additions and 13 deletions.
5 changes: 5 additions & 0 deletions pkg/ddl/tests/metadatalock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ go_test(
"mdl_test.go",
],
flaky = True,
<<<<<<< HEAD
shard_count = 32,
=======
shard_count = 36,
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))
deps = [
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest/testutil",
"//pkg/errno",
"//pkg/server",
"//pkg/testkit",
Expand Down
98 changes: 98 additions & 0 deletions pkg/ddl/tests/metadatalock/mdl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/failpoint"
ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
mysql "github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -811,6 +812,103 @@ func TestMDLPreparePlanCacheInvalid(t *testing.T) {
tk.MustQuery(`execute stmt_test_1 using @a;`).Check(testkit.Rows("1 <nil>", "2 <nil>", "3 <nil>", "4 <nil>"))
}

func TestMDLPreparePlanCacheExecute(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'update t set a = ? where a = ?';`)
tk.MustExec(`set @a = 1, @b = 3;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)

tk.MustExec("begin")

ch := make(chan struct{})

var wg sync.WaitGroup
wg.Add(1)
go func() {
<-ch
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

tk.MustQuery("select * from t2")
tk.MustExec(`set @a = 2, @b=4;`)
tk.MustExec(`execute stmt_test_1 using @a, @b;`)
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
// The plan is from cache, the metadata lock should be added to block the DDL.
ch <- struct{}{}

time.Sleep(5 * time.Second)

tk.MustExec("commit")

wg.Wait()

tk.MustExec("admin check table t")
}

func TestMDLPreparePlanCacheExecute2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
defer ingesttestutil.InjectMockBackendMgr(t, store)()

sv := server.CreateMockServer(t, store)

sv.SetDomain(dom)
dom.InfoSyncer().SetSessionManager(sv)
defer sv.Close()

conn1 := server.CreateMockConn(t, sv)
tk := testkit.NewTestKitWithSession(t, store, conn1.Context().Session)
conn2 := server.CreateMockConn(t, sv)
tkDDL := testkit.NewTestKitWithSession(t, store, conn2.Context().Session)
tk.MustExec("use test")
tk.MustExec("set global tidb_enable_metadata_lock=1")
tk.MustExec("create table t(a int);")
tk.MustExec("create table t2(a int);")
tk.MustExec("insert into t values(1), (2), (3), (4);")

tk.MustExec(`prepare stmt_test_1 from 'select * from t where a = ?';`)
tk.MustExec(`set @a = 1;`)
tk.MustExec(`execute stmt_test_1 using @a;`)

tk.MustExec("begin")
tk.MustQuery("select * from t2")

var wg sync.WaitGroup
wg.Add(1)
go func() {
tkDDL.MustExec("alter table test.t add index idx(a);")
wg.Done()
}()

wg.Wait()

tk.MustExec(`set @a = 2;`)
tk.MustExec(`execute stmt_test_1 using @a;`)
// The plan should not be from cache because the schema has changed.
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec("commit")

tk.MustExec("admin check table t")
}

func TestMDLDisable2Enable(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sv := server.CreateMockServer(t, store)
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func dumpPlanReplayerExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanR
return err
}

// extractTableNames extracts table names from the given stmts.
func extractTableNames(ctx context.Context, sctx sessionctx.Context,
ExecStmts []ast.StmtNode, curDB model.CIStr) (map[tableNamePair]struct{}, error) {
tableExtractor := &tableNameExtractor{
Expand Down
9 changes: 9 additions & 0 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,7 +2003,16 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.DiskTracker.Detach()
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID.Store(vars.ConnectionID)
<<<<<<< HEAD
vars.StmtCtx.TableStats = make(map[int64]interface{})
=======
vars.MemTracker.Killer = &vars.SQLKiller
vars.DiskTracker.Killer = &vars.SQLKiller
vars.SQLKiller.Reset()
vars.SQLKiller.ConnID = vars.ConnectionID
vars.StmtCtx.TableStats = make(map[int64]any)
sc.MDLRelatedTableIDs = make(map[int64]struct{})
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))

isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
Expand Down
9 changes: 8 additions & 1 deletion pkg/executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -117,7 +118,7 @@ func (e *PrepareExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}
}
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.Ctx(), true, stmt0.Text(), stmt0, nil)
stmt, p, paramCnt, err := plannercore.GeneratePlanCacheStmtWithAST(ctx, e.Ctx(), true, stmt0.Text(), stmt0, sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema())
if err != nil {
return err
}
Expand Down Expand Up @@ -205,9 +206,15 @@ func (e *DeallocateExec) Next(context.Context, *chunk.Chunk) error {
prepared := preparedObj.PreparedAst
delete(vars.PreparedStmtNameToID, e.Name)
if e.Ctx().GetSessionVars().EnablePreparedPlanCache {
<<<<<<< HEAD
bindSQL, _ := plannercore.GetBindSQL4PlanCache(e.Ctx(), preparedObj)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
=======
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(e.Ctx(), preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, preparedObj.SchemaVersion,
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), preparedObj.RelateVersion)
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,8 @@ func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
return errors.Trace(err)
}

tableInfo.Revision++

data, err := json.Marshal(tableInfo)
if err != nil {
return errors.Trace(err)
Expand Down
14 changes: 14 additions & 0 deletions pkg/parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,20 @@ type TableInfo struct {
ExchangePartitionInfo *ExchangePartitionInfo `json:"exchange_partition_info"`

TTLInfo *TTLInfo `json:"ttl_info"`
<<<<<<< HEAD
=======

// Revision is per table schema's version, it will be increased when the schema changed.
Revision uint64 `json:"revision"`

DBID int64 `json:"-"`
}

// TableNameInfo provides meta data describing a table name info.
type TableNameInfo struct {
ID int64 `json:"id"`
Name CIStr `json:"name"`
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))
}

// SepAutoInc decides whether _rowid and auto_increment id use separate allocator.
Expand Down
41 changes: 40 additions & 1 deletion pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,38 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
return errors.Trace(err)
}

<<<<<<< HEAD
// step 3: check schema version
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
=======
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
return plannererrors.ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
delete(stmt.RelateVersion, stmt.tbls[i].Meta().ID)
stmt.tbls[i] = tblByName
stmt.RelateVersion[tblByName.Meta().ID] = tblByName.Meta().Revision
}
newTbl, err := tryLockMDLAndUpdateSchemaIfNecessary(sctx.GetPlanCtx(), stmt.dbName[i], stmt.tbls[i], is)
if err != nil {
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
stmt.RelateVersion[newTbl.Meta().ID] = newTbl.Meta().Revision
}

// step 4: check schema version
if schemaNotMatch || stmt.SchemaVersion != is.SchemaMetaVersion() {
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
Expand All @@ -125,7 +155,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
stmtAst.SchemaVersion = is.SchemaMetaVersion()
}

// step 4: handle expiration
// step 5: handle expiration
// If the lastUpdateTime less than expiredTimeStamp4PC,
// it means other sessions have executed 'admin flush instance plan_cache'.
// So we need to clear the current session's plan cache.
Expand All @@ -136,6 +166,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
stmtAst.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}

return nil
}

Expand Down Expand Up @@ -188,7 +219,11 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
}
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
<<<<<<< HEAD
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
=======
stmt.StmtDB, stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), stmt.RelateVersion); err != nil {
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))
return nil, nil, err
}
}
Expand Down Expand Up @@ -333,7 +368,11 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
<<<<<<< HEAD
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
=======
stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load(), stmt.RelateVersion); err != nil {
>>>>>>> 70a825397f3 (*: add metadata lock when using the plan cache (#51897))
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down

0 comments on commit 1052e98

Please sign in to comment.