diff --git a/ddl/fail_db_test.go b/ddl/fail_db_test.go index 360df99dac420..0895f10906e00 100644 --- a/ddl/fail_db_test.go +++ b/ddl/fail_db_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/ddl" + ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" @@ -269,11 +270,13 @@ func (s *testDBSuite) TestAddIndexWorkerNum(c *C) { // Split table to multi region. s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() lastSetWorkerCnt := originDDLAddIndexWorkerCnt atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) ddl.TestCheckWorkerNumber = lastSetWorkerCnt - defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt) + defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", originDDLAddIndexWorkerCnt)) gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`) defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum") @@ -291,7 +294,7 @@ LOOP: c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) case <-ddl.TestCheckWorkerNumCh: lastSetWorkerCnt = int32(rand.Intn(8) + 8) - tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) checkNum++ } diff --git a/ddl/index.go b/ddl/index.go index 53310b804d1a3..ccaf5a9412d62 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -1052,6 +1053,17 @@ var ( TestCheckWorkerNumber = int32(16) ) +func loadDDLReorgVars(w *worker) error { + // Get sessionctx from context resource pool. + var ctx sessionctx.Context + ctx, err := w.sessPool.get() + if err != nil { + return errors.Trace(err) + } + defer w.sessPool.put(ctx) + return ddlutil.LoadDDLReorgVars(ctx) +} + // addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. // @@ -1092,6 +1104,9 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I } // For dynamic adjust add index worker number. + if err := loadDDLReorgVars(w); err != nil { + log.Error(err) + } workerCnt = variable.GetDDLReorgWorkerCounter() // If only have 1 range, we can only start 1 worker. if len(kvRanges) < int(workerCnt) { diff --git a/ddl/util/util.go b/ddl/util/util.go index 9ec18dbe970a6..68d3e2861d02d 100644 --- a/ddl/util/util.go +++ b/ddl/util/util.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/sqlexec" "golang.org/x/net/context" @@ -128,3 +129,23 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old _, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) return errors.Trace(err) } + +const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" + + variable.TiDBDDLReorgWorkerCount + "', '" + + variable.TiDBDDLReorgBatchSize + "')" + +// LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables. +func LoadDDLReorgVars(ctx sessionctx.Context) error { + if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { + rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL) + if err != nil { + return errors.Trace(err) + } + for _, row := range rows { + varName := row.GetString(0) + varValue := row.GetString(1) + variable.SetLocalSystemVar(varName, varValue) + } + } + return nil +} diff --git a/executor/ddl_test.go b/executor/ddl_test.go index 5a455108b5a97..e8b24397ef504 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/meta/autoid" plannercore "github.com/pingcap/tidb/planner/core" @@ -447,24 +448,32 @@ func (s *testSuite) TestMaxHandleAddIndex(c *C) { func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") + err := ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount)) - tk.MustExec("set tidb_ddl_reorg_worker_cnt = 1") + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1") + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(1)) - tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100") + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100)) - _, err := tk.Exec("set tidb_ddl_reorg_worker_cnt = invalid_val") + _, err = tk.Exec("set @@global.tidb_ddl_reorg_worker_cnt = invalid_val") c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) - tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100") + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100)) - _, err = tk.Exec("set tidb_ddl_reorg_worker_cnt = -1") + _, err = tk.Exec("set @@global.tidb_ddl_reorg_worker_cnt = -1") c.Assert(terror.ErrorEqual(err, variable.ErrWrongValueForVar), IsTrue, Commentf("err %v", err)) - tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100") - res := tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt") + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") + res := tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") - res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgWorkerCount))) + res.Check(testkit.Rows("100")) tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100") res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt") res.Check(testkit.Rows("100")) @@ -473,28 +482,39 @@ func (s *testSuite) TestSetDDLReorgWorkerCnt(c *C) { func (s *testSuite) TestSetDDLReorgBatchSize(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") + err := ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.DefTiDBDDLReorgBatchSize)) - tk.MustExec("set tidb_ddl_reorg_batch_size = 1") + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1") tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '1'")) + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MinDDLReorgBatchSize)) - tk.MustExec(fmt.Sprintf("set tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1)) + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1)) tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '%d'", variable.MaxDDLReorgBatchSize+1))) + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MaxDDLReorgBatchSize)) - _, err := tk.Exec("set tidb_ddl_reorg_batch_size = invalid_val") + _, err = tk.Exec("set @@global.tidb_ddl_reorg_batch_size = invalid_val") c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err)) - tk.MustExec("set tidb_ddl_reorg_batch_size = 100") + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 100") + err = ddlutil.LoadDDLReorgVars(tk.Se) + c.Assert(err, IsNil) c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100)) - tk.MustExec("set tidb_ddl_reorg_batch_size = -1") + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = -1") tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '-1'")) - tk.MustExec("set tidb_ddl_reorg_batch_size = 100") - res := tk.MustQuery("select @@tidb_ddl_reorg_batch_size") + tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 100") + res := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") res.Check(testkit.Rows("100")) res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") - res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgBatchSize))) + res.Check(testkit.Rows(fmt.Sprintf("%v", 100))) tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000") res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size") res.Check(testkit.Rows("1000")) + + // If do not LoadDDLReorgVars, the local variable will be the last loaded value. + c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100)) } diff --git a/session/session.go b/session/session.go index 604b2429d131e..230f17a434a94 100644 --- a/session/session.go +++ b/session/session.go @@ -749,8 +749,9 @@ func (s *session) SetGlobalSysVar(name, value string) error { if err != nil { return errors.Trace(err) } + name = strings.ToLower(name) sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`, - mysql.SystemDB, mysql.GlobalVariablesTable, strings.ToLower(name), sVal) + mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal) _, _, err = s.ExecRestrictedSQL(s, sql) return errors.Trace(err) } @@ -1352,9 +1353,7 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab variable.TiDBHashAggPartialConcurrency + quoteCommaQuote + variable.TiDBHashAggFinalConcurrency + quoteCommaQuote + variable.TiDBBackoffLockFast + quoteCommaQuote + - variable.TiDBDDLReorgWorkerCount + quoteCommaQuote + variable.TiDBOptInSubqUnFolding + quoteCommaQuote + - variable.TiDBDDLReorgBatchSize + quoteCommaQuote + variable.TiDBDistSQLScanConcurrency + quoteCommaQuote + variable.TiDBMaxChunkSize + quoteCommaQuote + variable.TiDBRetryLimit + quoteCommaQuote + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 497a6249fc085..6974be2a6fb93 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -597,10 +597,6 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.OptimizerSelectivityLevel = tidbOptPositiveInt32(val, DefTiDBOptimizerSelectivityLevel) case TiDBEnableTablePartition: s.EnableTablePartition = TiDBOptOn(val) - case TiDBDDLReorgWorkerCount: - SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) - case TiDBDDLReorgBatchSize: - SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) case TiDBDDLReorgPriority: s.setDDLReorgPriority(val) case TiDBForcePriority: @@ -610,6 +606,16 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return nil } +// SetLocalSystemVar sets values of the local variables which in "server" scope. +func SetLocalSystemVar(name string, val string) { + switch name { + case TiDBDDLReorgWorkerCount: + SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount))) + case TiDBDDLReorgBatchSize: + SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize))) + } +} + // special session variables. const ( SQLModeVar = "sql_mode" diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 72b9c77e7baa1..3971b9aacebb0 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -664,8 +664,8 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBSlowLogThreshold, strconv.Itoa(logutil.DefaultSlowThreshold)}, {ScopeSession, TiDBQueryLogMaxLen, strconv.Itoa(logutil.DefaultQueryLogMaxLen)}, {ScopeSession, TiDBConfig, ""}, - {ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, - {ScopeGlobal | ScopeSession, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, + {ScopeGlobal, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)}, + {ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)}, {ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"}, {ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]}, } diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 013b46c44e4b1..909701f84d023 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -200,10 +200,6 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, types.NewIntDatum(1)) c.Assert(v.OptimizerSelectivityLevel, Equals, 1) - c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(DefTiDBDDLReorgWorkerCount)) - SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(1)) - c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(1)) - err = SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(-1)) c.Assert(terror.ErrorEqual(err, ErrWrongValueForVar), IsTrue)