Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: extract data reorg code to separate functions #33679

Merged
merged 6 commits into from
Apr 2, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 52 additions & 43 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,51 +1009,11 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return ver, errors.Trace(err)
done, ver, err := doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
if !done {
return ver, err
}

// Inject a failpoint so that we can pause here and do verification on other components.
// With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command:
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tblInfo.Name, oldCol.Name)
}, false)
// Use old column name to generate less confusing error messages.
changingColCpy := changingCol.Clone()
changingColCpy.Name = oldCol.Name
return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return ver, nil
}
if kv.IsTxnRetryableError(err) {
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
zap.String("job", job.String()), zap.Error(err1))
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()

oldIdxIDs := getOldIndexIDs(tblInfo, oldCol) // used by GC delete range.

err = adjustTableInfoAfterModifyColumnWithData(tblInfo, pos, oldCol, changingCol, colName, changingIdxs)
Expand All @@ -1080,6 +1040,55 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
}

// Inject a failpoint so that we can pause here and do verification on other components.
// With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command:
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
}, false)
// Use old column name to generate less confusing error messages.
changingColCpy := changingCol.Clone()
changingColCpy.Name = oldCol.Name
return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
}
if kv.IsTxnRetryableError(err) {
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
zap.String("job", job.String()), zap.Error(err1))
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return true, ver, nil
}

func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast.ColumnPosition,
oldCol, changingCol *model.ColumnInfo, newName model.CIStr, changingIdxs []*model.IndexInfo) (err error) {
if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L {
Expand Down
73 changes: 41 additions & 32 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,40 +557,11 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}

elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return ver, errors.Trace(err)
done, ver, err := doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if !done {
return ver, err
}

err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tblInfo.Name, indexInfo.Name)
}, false)
return w.addTableIndex(tbl, indexInfo, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// if timeout, we should return, check for the owner and re-wait job done.
return ver, nil
}
if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(t, job, tblInfo, indexInfo, err)
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1))
}
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()

indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
Expand All @@ -612,6 +583,44 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
reorgInfo, err := getReorgInfo(w.jobContext, d, t, job, tbl, elements)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
}

err = w.runReorgJob(t, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onCreateIndex",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name)
}, false)
return w.addTableIndex(tbl, indexInfo, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// if timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
}
if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
ver, err = convertAddIdxJob2RollbackJob(t, job, tbl.Meta(), indexInfo, err)
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1))
}
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return true, ver, errors.Trace(err)
}

func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, indexInfo, err := checkDropIndex(t, job)
if err != nil {
Expand Down