Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Support the operation of cancel DDL jobs #4753

Merged
merged 18 commits into from Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions ast/misc.go
Expand Up @@ -568,6 +568,7 @@ const (
AdminShowDDL = iota + 1
AdminCheckTable
AdminShowDDLJobs
AdminCancelDDLJobs
)

// AdminStmt is the struct for Admin statement.
Expand All @@ -576,6 +577,7 @@ type AdminStmt struct {

Tp AdminStmtType
Tables []*TableName
JobIDs []int64
}

// Accept implements Node Accpet interface.
Expand Down
37 changes: 26 additions & 11 deletions ddl/ddl.go
Expand Up @@ -64,7 +64,8 @@ var (
errInvalidWorker = terror.ClassDDL.New(codeInvalidWorker, "invalid worker")
// errNotOwner means we are not owner and can't handle DDL jobs.
errNotOwner = terror.ClassDDL.New(codeNotOwner, "not Owner")
errInvalidDDLJob = terror.ClassDDL.New(codeInvalidDDLJob, "invalid ddl job")
errInvalidDDLJob = terror.ClassDDL.New(codeInvalidDDLJob, "invalid DDL job")
errCancelledDDLJob = terror.ClassDDL.New(codeCancelledDDLJob, "cancelled DDL job")
errInvalidJobFlag = terror.ClassDDL.New(codeInvalidJobFlag, "invalid job flag")
errRunMultiSchemaChanges = terror.ClassDDL.New(codeRunMultiSchemaChanges, "can't run multi schema change")
errWaitReorgTimeout = terror.ClassDDL.New(codeWaitReorgTimeout, "wait for reorganization timeout")
Expand Down Expand Up @@ -177,6 +178,8 @@ type DDL interface {
WorkerVars() *variable.SessionVars
// SetHook sets the hook. It's exported for testing.
SetHook(h Callback)
// GetHook gets the hook. It's exported for testing.
GetHook() Callback
}

// Event is an event that a ddl operation happened.
Expand Down Expand Up @@ -226,6 +229,8 @@ type ddl struct {
reorgDoneCh chan error
// reorgRowCount is for reorganization, it uses to simulate a job's row count.
reorgRowCount int64
// notifyCancelReorgJob is for reorganization, it used to notify the backfilling goroutine if the DDL job is cancelled.
notifyCancelReorgJob chan struct{}

quitCh chan struct{}
wait sync.WaitGroup
Expand Down Expand Up @@ -290,16 +295,17 @@ func newDDL(ctx goctx.Context, etcdCli *clientv3.Client, store kv.Storage,
syncer = NewSchemaSyncer(etcdCli, id)
}
d := &ddl{
infoHandle: infoHandle,
hook: hook,
store: store,
uuid: id,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
workerVars: variable.NewSessionVars(),
infoHandle: infoHandle,
hook: hook,
store: store,
uuid: id,
lease: lease,
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
notifyCancelReorgJob: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
workerVars: variable.NewSessionVars(),
}
d.workerVars.BinlogClient = binloginfo.GetPumpClient()

Expand Down Expand Up @@ -502,6 +508,14 @@ func (d *ddl) SetHook(h Callback) {
d.hook = h
}

// GetHook implements DDL.GetHook interface.
func (d *ddl) GetHook() Callback {
d.hookMu.RLock()
defer d.hookMu.RUnlock()

return d.hook
}

func (d *ddl) WorkerVars() *variable.SessionVars {
return d.workerVars
}
Expand All @@ -525,6 +539,7 @@ const (
codeUnknownTypeLength = 9
codeUnknownFractionLength = 10
codeInvalidJobVersion = 11
codeCancelledDDLJob = 12

codeInvalidDBState = 100
codeInvalidTableState = 101
Expand Down
100 changes: 100 additions & 0 deletions ddl/ddl_db_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/inspectkv"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -289,6 +291,104 @@ LOOP:
s.mustExec(c, "drop table t1")
}

func (s *testDBSuite) TestCancelAddIndex(c *C) {
s.mustExec(c, "use test_db")
s.mustExec(c, "drop table if exists t1")
s.mustExec(c, "create table t1 (c1 int, c2 int, c3 int, primary key(c1))")
// defaultBatchSize is equal to ddl.defaultBatchSize
base := defaultBatchSize * 2
count := base
// add some rows
for i := 0; i < count; i++ {
s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i)
}

s.tk.Se.NewTxn()
jobs, err := inspectkv.GetHistoryDDLJobs(s.tk.Se.Txn())
c.Assert(err, IsNil)
jobIDs := []int64{jobs[len(jobs)-1].ID}
var checkErr error
hook := &ddl.TestDDLCallback{}
first := true
hook.OnJobUpdatedExported = func(job *model.Job) {
addIndexNotFirstReorg := job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0
// If the action is adding index and the state is writing reorganization, it want to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexNotFirstReorg, the worker will start to backfill indexes.
if !addIndexNotFirstReorg {
return
}
// The job satisfies the case of addIndexNotFirst for the first time, the worker hasn't finished a batch of backfill indexes.
if first {
first = false
return
}
if checkErr != nil {
return
}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
var err error
err = hookCtx.NewTxn()
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := inspectkv.CancelJobs(hookCtx.Txn(), jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
// It only tests cancel one DDL job.
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
err = hookCtx.Txn().Commit()
if err != nil {
checkErr = errors.Trace(err)
}
}
originHook := s.dom.DDL().GetHook()
s.dom.DDL().SetHook(hook)
defer s.dom.DDL().SetHook(originHook)
done := make(chan error, 1)
go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done)

times := 0
ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
LOOP:
for {
select {
case err := <-done:
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
break LOOP
case <-ticker.C:
if times >= 10 {
break
}
step := 10
// delete some rows, and add some data
for i := count; i < count+step; i++ {
n := rand.Intn(count)
s.mustExec(c, "delete from t1 where c1 = ?", n)
s.mustExec(c, "insert into t1 values (?, ?, ?)", i+10, i, i)
}
count += step
times++
}
}

t := s.testGetTable(c, "t1")
for _, tidx := range t.Indices() {
c.Assert(strings.EqualFold(tidx.Meta().Name.L, "c3_index"), IsFalse)
}

s.mustExec(c, "drop table t1")
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
Expand Down
22 changes: 21 additions & 1 deletion ddl/ddl_worker.go
Expand Up @@ -69,6 +69,13 @@ func asyncNotify(ch chan struct{}) {
}
}

func cleanNotify(ch chan struct{}) {
select {
case <-ch:
default:
}
}

func (d *ddl) isOwner() bool {
isOwner := d.ownerManager.IsOwner()
log.Debugf("[ddl] it's the job owner %v, self id %s", isOwner, d.uuid)
Expand Down Expand Up @@ -248,8 +255,21 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
if job.IsFinished() {
return
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
log.Infof("[ddl] run the cancelling DDL job %s", job)
asyncNotify(d.notifyCancelReorgJob)
} else {
job.State = model.JobCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
}

if job.State != model.JobRollback {
if !job.IsRollingback() && !job.IsCancelling() {
job.State = model.JobRunning
}

Expand Down