Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add ddl job error count limit, exceed the limit should cancel the ddl job #9295

Merged
merged 14 commits into from Feb 27, 2019
@@ -23,11 +23,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/admin"
log "github.com/sirupsen/logrus"
)

@@ -537,10 +540,30 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

job.Error = toTError(err)
job.ErrorCount++
// Load global ddl variables.
if err1 := loadDDLVars(w); err1 != nil {
This conversation was marked as resolved by crazycs520

This comment has been minimized.

Copy link
@winkyao

winkyao Feb 15, 2019

Member

I think a config item is better.

This comment has been minimized.

Copy link
@winkyao

winkyao Feb 15, 2019

Member

This Limit will not change in general.

This comment has been minimized.

Copy link
@crazycs520

crazycs520 Feb 18, 2019

Author Contributor

em... I think the variable in the config is some system machine variable such as log file, cache size, max_connection ...

The disadvantage of config is you should restart the TiDB or add an http api to change it, and the http api change only takes effect in one machine.
But the variable of tidb_ddl_error_count_limit is not a machine variable, it only takes effect in TiDB owner. So I think global variable is better.

If the variable is not changed in general, so why we need this variable? Just hardcode to const will be better. 😃

log.Errorf("[ddl-%s] load ddl global variable error: %v", w, err1)
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
log.Warnf("[ddl-%s] the job id %v error count exceed the limit, cancelling it now", w, job.ID)
This conversation was marked as resolved by crazycs520

This comment has been minimized.

Copy link
@zimulala

zimulala Feb 27, 2019

Member

Could we add the value of limit to log?

This comment has been minimized.

Copy link
@crazycs520

crazycs520 Feb 27, 2019

Author Contributor

done.

job.State = model.JobStateCancelling
This conversation was marked as resolved by crazycs520

This comment has been minimized.

Copy link
@zimulala

zimulala Feb 13, 2019

Member

Please add a log here.

This comment has been minimized.

Copy link
@crazycs520

crazycs520 Feb 13, 2019

Author Contributor

done.

}
}
return
}

func loadDDLVars(w *worker) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return util.LoadDDLVars(ctx)
}

func toTError(err error) *terror.Error {
originErr := errors.Cause(err)
tErr, ok := originErr.(*terror.Error)
@@ -458,3 +458,14 @@ func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) {
tk.MustExec("insert into t_recover values (4),(5),(6)")
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
}

func (s *testSerialSuite) TestCancelJobByErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
gofail.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
_, err := tk.Exec("create table t (a int)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
@@ -33,6 +33,11 @@ import (
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// gofail: var mockExceedErrorLimit bool
// if mockExceedErrorLimit {
// return ver, errors.New("mock do job error")
// }

schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
@@ -135,14 +135,30 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old
return errors.Trace(err)
}

const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" +
variable.TiDBDDLReorgWorkerCount + "', '" +
variable.TiDBDDLReorgBatchSize + "')"

// LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables.
func LoadDDLReorgVars(ctx sessionctx.Context) error {
return LoadGlobalVars(ctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize})
}

// LoadDDLVars loads ddl variable from mysql.global_variables.
func LoadDDLVars(ctx sessionctx.Context) error {
return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit})
}

const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)"

// LoadGlobalVars loads global variable from mysql.global_variables.
func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error {
if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL)
nameList := ""
for i, name := range varNames {
if i > 0 {
nameList += ", "
}
nameList += fmt.Sprintf("'%s'", name)
}
sql := fmt.Sprintf(loadGlobalVarsSQL, nameList)
rows, _, err := sctx.ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
@@ -595,6 +595,33 @@ func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) {
res.Check(testkit.Rows("1000"))
}

func (s *testSuite3) TestSetDDLErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
err := ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(variable.DefTiDBDDLErrorCountLimit))

tk.MustExec("set @@global.tidb_ddl_error_count_limit = -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '-1'"))
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(0))
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_error_count_limit = %v", uint64(math.MaxInt64)+1))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '%d'", uint64(math.MaxInt64)+1)))
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(math.MaxInt64))
_, err = tk.Exec("set @@global.tidb_ddl_error_count_limit = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set @@global.tidb_ddl_error_count_limit = 100")
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(100))
res := tk.MustQuery("select @@global.tidb_ddl_error_count_limit")
res.Check(testkit.Rows("100"))
}

// Test issue #9205, fix the precision problem for time type default values
// See https://github.com/pingcap/tidb/issues/9205 for details
func (s *testSuite3) TestIssue9205(c *C) {
@@ -1480,6 +1480,7 @@ var builtinGlobalVariable = []string{
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
variable.TiDBDDLErrorCountLimit,
variable.TiDBOptInSubqToJoinAndAgg,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
@@ -701,6 +701,8 @@ func SetLocalSystemVar(name string, val string) {
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
case TiDBDDLErrorCountLimit:
SetDDLErrorCountLimit(tidbOptInt64(val, DefTiDBDDLErrorCountLimit))
}
}

@@ -678,6 +678,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)},
@@ -210,6 +210,9 @@ const (
// tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers.
TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size"

// tidb_ddl_error_count_limit defines the count of ddl error limit.
TiDBDDLErrorCountLimit = "tidb_ddl_error_count_limit"

// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"
@@ -273,6 +276,7 @@ const (
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBDDLReorgWorkerCount = 16
DefTiDBDDLReorgBatchSize = 1024
DefTiDBDDLErrorCountLimit = 512
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
@@ -286,6 +290,7 @@ var (
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit
This conversation was marked as resolved by crazycs520

This comment has been minimized.

Copy link
@xiekeyi98

xiekeyi98 Feb 15, 2019

Contributor

Why this variable is int64 and others are int32?

This comment has been minimized.

Copy link
@crazycs520

crazycs520 Feb 19, 2019

Author Contributor

Because the max value of ddlErrorCountlimit is be MaxInt64 currently.
em... both int64 and int32 are ok for me.

This comment has been minimized.

Copy link
@xiekeyi98

xiekeyi98 Feb 19, 2019

Contributor

Well, At beginning, I think we should keep consistent with other variables.
Now, I think what you say is reasonable.
This problem is not a big dial.

// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
@@ -63,6 +63,16 @@ func GetDDLReorgBatchSize() int32 {
return atomic.LoadInt32(&ddlReorgBatchSize)
}

// SetDDLErrorCountLimit sets ddlErrorCountlimit size.
func SetDDLErrorCountLimit(cnt int64) {
atomic.StoreInt64(&ddlErrorCountlimit, cnt)
}

// GetDDLErrorCountLimit gets ddlErrorCountlimit size.
func GetDDLErrorCountLimit() int64 {
return atomic.LoadInt64(&ddlErrorCountlimit)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
@@ -360,6 +370,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBDDLReorgBatchSize:
return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars)
case TiDBDDLErrorCountLimit:
return checkUInt64SystemVar(name, value, uint64(0), math.MaxInt64, vars)
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize,
TiDBIndexLookupSize,
TiDBHashJoinConcurrency,
@@ -83,26 +83,27 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

func isJobRollbackable(job *model.Job, id int64) error {
// IsJobRollbackable checks whether the job can be rollback.
func IsJobRollbackable(job *model.Job) bool {
switch job.Type {
case model.ActionDropIndex:
// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization, otherwise will cause inconsistent between record and index.
if job.SchemaState == model.StateDeleteOnly ||
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
case model.ActionDropColumn:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
case model.ActionDropSchema, model.ActionDropTable:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
}
return nil
return true

This comment has been minimized.

Copy link
@zimulala

zimulala Feb 19, 2019

Member

It seems some DDL jobs don't support rollback, and the function returns true. Do we need to wait for other DDL jobs support rollback?

This comment has been minimized.

Copy link
@crazycs520

crazycs520 Feb 27, 2019

Author Contributor

Mostly ddl rollback is supported.

}

// CancelJobs cancels the DDL jobs.
@@ -135,8 +136,8 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
errs[i] = isJobRollbackable(job, id)
if errs[i] != nil {
if !IsJobRollbackable(job) {
errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.