diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 800058e63b367..54ee4c7dd0e15 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -425,5 +425,5 @@ func TestGetExistedUserDBs(t *testing.T) { // // The above variables are in the file br/pkg/restore/systable_restore.go func TestMonitorTheSystemTableIncremental(t *testing.T) { - require.Equal(t, int64(196), session.CurrentBootstrapVersion) + require.Equal(t, int64(197), session.CurrentBootstrapVersion) } diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 34270324b9a72..2c1cc7967c2a3 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -217,7 +217,7 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) { } // Check if there is any running job works on the same table. sql := fmt.Sprintf("select job_id from mysql.tidb_ddl_job t1, (select table_ids from mysql.tidb_ddl_job where job_id = %d) t2 where "+ - "(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',', REPLACE(t1.table_ids, ',', '|'), ',') != 0)"+ + "(processing and CONCAT(',', t2.table_ids, ',') REGEXP CONCAT(',(', REPLACE(t1.table_ids, ',', '|'), '),') != 0)"+ "or (type = %d and processing)", job.ID, model.ActionFlashbackCluster) rows, err := sess.Execute(d.ctx, sql, "check conflict jobs") return len(rows) == 0, err diff --git a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go index 17d884bbcc5b4..b571f805fd6c2 100644 --- a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go +++ b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/auth" + "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/privilege/privileges" "github.com/pingcap/tidb/pkg/server" @@ -1694,3 +1695,91 @@ func TestUnusedIndexView(t *testing.T) { return result.Equal(expectedResult) }, 5*time.Second, 100*time.Millisecond) } + +func TestMDLViewIDConflict(t *testing.T) { + save := privileges.SkipWithGrant + privileges.SkipWithGrant = true + defer func() { + privileges.SkipWithGrant = save + }() + + s := new(clusterTablesSuite) + s.store, s.dom = testkit.CreateMockStoreAndDomain(t) + s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.startTime = time.Now() + defer s.httpServer.Close() + tk := s.newTestKitWithRoot(t) + + tk.MustExec("use test") + tk.MustExec("create table t(a int);") + tbl, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tk.MustExec("insert into t values (1)") + + bigID := tbl.Meta().ID * 10 + bigTableName := "" + // set a hard limitation on 10000 to avoid using too much resource + for i := 0; i < 10000; i++ { + bigTableName = fmt.Sprintf("t%d", i) + tk.MustExec(fmt.Sprintf("create table %s(a int);", bigTableName)) + + tbl, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(bigTableName)) + require.NoError(t, err) + + require.LessOrEqual(t, tbl.Meta().ID, bigID) + if tbl.Meta().ID == bigID { + break + } + } + tk.MustExec("insert into t1 values (1)") + tk.MustExec(fmt.Sprintf("insert into %s values (1)", bigTableName)) + + // Now we have two table: t and `bigTableName`. The later one's ID is 10 times the former one. + // Then create two session to run TXNs on these two tables + txnTK1 := s.newTestKitWithRoot(t) + txnTK2 := s.newTestKitWithRoot(t) + txnTK1.MustExec("use test") + txnTK1.MustExec("BEGIN") + // this transaction will query `t` and one another table. Then the `related_table_ids` is `smallID|anotherID` + txnTK1.MustQuery("SELECT * FROM t").Check(testkit.Rows("1")) + txnTK1.MustQuery("SELECT * FROM t1").Check(testkit.Rows("1")) + txnTK2.MustExec("use test") + txnTK2.MustExec("BEGIN") + txnTK2.MustQuery("SELECT * FROM " + bigTableName).Check(testkit.Rows("1")) + + testTK := s.newTestKitWithRoot(t) + s.rpcserver, s.listenAddr = s.setUpRPCService(t, "127.0.0.1:0", testTK.Session().GetSessionManager()) + defer s.rpcserver.Stop() + testTK.MustQuery("select table_name from mysql.tidb_mdl_view").Check(testkit.Rows()) + + // run a DDL on the table with smallID + ddlTK1 := s.newTestKitWithRoot(t) + ddlTK1.MustExec("use test") + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + ddlTK1.MustExec("ALTER TABLE t ADD COLUMN b INT;") + wg.Done() + }() + ddlTK2 := s.newTestKitWithRoot(t) + ddlTK2.MustExec("use test") + wg.Add(1) + go func() { + ddlTK2.MustExec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN b INT;", bigTableName)) + wg.Done() + }() + + require.Eventually(t, func() bool { + rows := testTK.MustQuery("select table_ids from mysql.tidb_mdl_info").Rows() + return len(rows) == 2 + }, time.Second*10, time.Second) + + // it only contains the table with smallID + require.Eventually(t, func() bool { + rows := testTK.MustQuery("select table_name, query, start_time from mysql.tidb_mdl_view order by table_name").Rows() + return len(rows) == 2 + }, time.Second*10, time.Second) + txnTK1.MustExec("COMMIT") + txnTK2.MustExec("COMMIT") + wg.Wait() +} diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index e53976cd88ed5..6b0a90be4722f 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -472,7 +472,7 @@ const ( mysql.tidb_mdl_info, information_schema.cluster_tidb_trx WHERE tidb_ddl_job.job_id=tidb_mdl_info.job_id - AND CONCAT(',', tidb_mdl_info.table_ids, ',') REGEXP CONCAT(',', REPLACE(cluster_tidb_trx.related_table_ids, ',', '|'), ',') != 0 + AND CONCAT(',', tidb_mdl_info.table_ids, ',') REGEXP CONCAT(',(', REPLACE(cluster_tidb_trx.related_table_ids, ',', '|'), '),') != 0 );` // CreatePlanReplayerStatusTable is a table about plan replayer status @@ -1089,11 +1089,15 @@ const ( // add column `target_scope` for 'mysql.tidb_global_task` table // add column `target_scope` for 'mysql.tidb_global_task_history` table version196 = 196 + + // version 197 + // replace `mysql.tidb_mdl_view` table + version197 = 197 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version196 +var currentBootstrapVersion int64 = version197 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -1255,6 +1259,7 @@ var ( upgradeToVer194, upgradeToVer195, upgradeToVer196, + upgradeToVer197, } ) @@ -3090,6 +3095,14 @@ func upgradeToVer196(s sessiontypes.Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.tidb_global_task_history ADD COLUMN target_scope VARCHAR(256) DEFAULT '' AFTER `step`;", infoschema.ErrColumnExists) } +func upgradeToVer197(s sessiontypes.Session, ver int64) { + if ver >= version197 { + return + } + + doReentrantDDL(s, CreateMDLView) +} + func writeOOMAction(s sessiontypes.Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,