Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix a bug that cancel add index ddl job when workers are not run, which cause tidb ddl hang up #8171

Merged
merged 25 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,16 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *
return colInfo, position, nil
}

func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -254,7 +263,11 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
default:
err = ErrInvalidTableState.GenWithStack("invalid table state %v", tblInfo.State)
}
Expand Down
70 changes: 65 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,15 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i)
}

var checkErr error
var c3IdxInfo *model.IndexInfo
hook := &ddl.TestDDLCallback{}
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
// let hook.OnJobUpdatedExported has chance to cancel the job.
// the hook.OnJobUpdatedExported is called when the job is updated, runReorgJob will wait ddl.ReorgWaitTimeout, then return the ddl.runDDLJob.
// After that ddl call d.hook.OnJobUpdated(job), so that we can canceled the job in this test case.
ddl.ReorgWaitTimeout = 50 * time.Millisecond
hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr)
var checkErr error
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(c, s, hook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done)
Expand Down Expand Up @@ -459,6 +459,65 @@ LOOP:
s.dom.DDL().(ddl.DDLForTest).SetHook(callback)
}

// TestCancelAddIndex1 tests canceling ddl job when the add index worker is not started.
func (s *testDBSuite) TestCancelAddIndex1(c *C) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")

for i := 0; i < 50; i++ {
s.mustExec(c, "insert into t values (?, ?)", i, i)
}

var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer == 0 {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}

if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}

checkErr = hookCtx.Txn(true).Commit(context.Background())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
rs, err := s.tk.Exec("alter table t add index idx_c2(c2)")
if rs != nil {
rs.Close()
}
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")

s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
t := s.testGetTable(c, "t")
for _, idx := range t.Indices() {
c.Assert(strings.EqualFold(idx.Meta().Name.L, "idx_c2"), IsFalse)
}
s.mustExec(c, "alter table t add index idx_c2(c2)")
s.mustExec(c, "alter table t drop index idx_c2")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down Expand Up @@ -3377,7 +3436,7 @@ func (s *testDBSuite) TestPartitionCancelAddIndex(c *C) {
hook := &ddl.TestDDLCallback{}
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 10 * time.Millisecond
hook.OnJobUpdatedExported, c3IdxInfo = backgroundExecOnJobUpdatedExported(c, s, hook, checkErr)
hook.OnJobUpdatedExported, c3IdxInfo, checkErr = backgroundExecOnJobUpdatedExported(c, s, hook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create index c3_index on t1 (c3)", done)
Expand Down Expand Up @@ -3427,7 +3486,8 @@ LOOP:
s.dom.DDL().(ddl.DDLForTest).SetHook(callback)
}

func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLCallback, checkErr error) (func(*model.Job), *model.IndexInfo) {
func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLCallback) (func(*model.Job), *model.IndexInfo, error) {
var checkErr error
first := true
ddl.ReorgWaitTimeout = 10 * time.Millisecond
c3IdxInfo := &model.IndexInfo{}
Expand Down Expand Up @@ -3480,7 +3540,7 @@ func backgroundExecOnJobUpdatedExported(c *C, s *testDBSuite, hook *ddl.TestDDLC
checkErr = errors.Trace(err)
}
}
return hook.OnJobUpdatedExported, c3IdxInfo
return hook.OnJobUpdatedExported, c3IdxInfo, checkErr
}

func (s *testDBSuite) TestColumnModifyingDefinition(c *C) {
Expand Down
15 changes: 15 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ func testCreateIndex(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
return job
}

func testAddColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo, args []interface{}) *model.Job {
job := &model.Job{
SchemaID: dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddColumn,
Args: args,
BinlogInfo: &model.HistoryInfo{},
}
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
v := getSchemaVer(c, ctx)
checkHistoryJobArgs(c, ctx, job.ID, &historyJobArgs{ver: v, tbl: tblInfo})
return job
}

func buildDropIdxJob(dbInfo *model.DBInfo, tblInfo *model.TableInfo, indexName string) *model.Job {
return &model.Job{
SchemaID: dbInfo.ID,
Expand Down
12 changes: 2 additions & 10 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
if job.State != model.JobStateRollbackDone {
break
}

// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition:
Expand Down Expand Up @@ -453,16 +454,7 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
log.Infof("[ddl-%s] run the cancelling DDL job %s", w, job)
w.reorgCtx.notifyReorgCancel()
} else {
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
return convertJob2RollbackJob(w, d, t, job)
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down
119 changes: 94 additions & 25 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -285,7 +286,7 @@ func doDDLJobErrWithSchemaState(ctx sessionctx.Context, d *ddl, c *C, schemaID,
}
err := d.doDDLJob(ctx, job)
// TODO: Add the detail error check.
c.Assert(err, NotNil)
c.Assert(err, NotNil, Commentf("err:%v", err))
testCheckJobCancelled(c, d, job, state)

return job
Expand All @@ -311,7 +312,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
return checkErr
}
// It only tests cancel one DDL job.
if errs[0] != test.cancelRetErrs[0] {
if !terror.ErrorEqual(errs[0], test.cancelRetErrs[0]) {
checkErr = errors.Trace(errs[0])
return checkErr
}
Expand All @@ -333,22 +334,52 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
errs := []error{err}
noErrs := []error{nil}
tests := []testCancelJob{
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: errs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 1}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 2}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 3}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddIndex, jobIDs: []int64{firstID + 4}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 4)}, cancelState: model.StatePublic, ddlRetErr: err},

// TODO: after fix drop index and create table rollback bug, the below test cases maybe need to change.
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 5}, cancelRetErrs: errs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 6}, cancelRetErrs: errs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 7}, cancelRetErrs: errs, cancelState: model.StateDeleteReorganization, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},

{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},
{act: model.ActionDropIndex, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StateNone, ddlRetErr: err},
// TODO: add create table back after we fix the cancel bug.
//{act: model.ActionCreateTable, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},

{act: model.ActionAddColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: noErrs, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 12)}, cancelState: model.StatePublic, ddlRetErr: err},
}

return tests
}

func (s *testDDLSuite) checkAddIdx(c *C, d *ddl, schemaID int64, tableID int64, idxName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, idxInfo := range changedTable.Meta().Indices {
if idxInfo.Name.O == idxName {
found = true
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
var found bool
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
found = true
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand Down Expand Up @@ -378,47 +409,60 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
var checkErr error
var test *testCancelJob
tc.onJobUpdated = func(job *model.Job) {
if job.State == model.JobStateSynced || job.State == model.JobStateCancelled || job.State == model.JobStateCancelling {
return
}
if checkErr != nil {
return
}

hookCtx := mock.NewContext()
hookCtx.Store = store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
var err1 error
err1 = hookCtx.NewTxn()
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
checkCancelState(hookCtx.Txn(true), job, test)
err = hookCtx.Txn(true).Commit(context.Background())
if err != nil {
checkErr = errors.Trace(err)
checkErr = checkCancelState(hookCtx.Txn(true), job, test)
if checkErr != nil {
return
}
err1 = hookCtx.Txn(true).Commit(context.Background())
if err1 != nil {
checkErr = errors.Trace(err1)
return
}
}
d.SetHook(tc)

// for adding index
test = &tests[0]
validArgs := []interface{}{false, model.NewCIStr("idx"),
idxOrigName := "idx"
validArgs := []interface{}{false, model.NewCIStr(idxOrigName),
[]*ast.IndexColName{{
Column: &ast.ColumnName{Name: model.NewCIStr("c1")},
Length: -1,
}}, nil}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)

// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[1]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &test.cancelState)
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[2]
// When the job satisfies this test case, the option will be rollback, so the job's schema state is none.
cancelState := model.StateNone
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddIndex, validArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, false)
test = &tests[3]
testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil)
s.checkAddIdx(c, d, dbInfo.ID, tblInfo.ID, idxOrigName, true)

// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}
Expand All @@ -435,10 +479,35 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
testDropIndex(c, ctx, d, dbInfo, tblInfo, "idx")
c.Check(errors.ErrorStack(checkErr), Equals, "")

// for creating table
// for add column
test = &tests[8]
tblInfo = testTableInfo(c, d, "t1", 3)
testCreateTable(c, ctx, d, dbInfo, tblInfo)
addingColName := "colA"

newColumnDef := &ast.ColumnDef{
Name: &ast.ColumnName{Name: model.NewCIStr(addingColName)},
Tp: &types.FieldType{Tp: mysql.TypeLonglong},
Options: []*ast.ColumnOption{},
}
col, _, err := buildColumnAndConstraint(ctx, 2, newColumnDef, nil)
addColumnArgs := []interface{}{col, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 0}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[9]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[10]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionAddColumn, addColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, false)

test = &tests[11]
testAddColumn(c, ctx, d, dbInfo, tblInfo, addColumnArgs)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)
}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
Loading