Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Update job state name #4818

Merged
merged 2 commits into from Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 19 additions & 19 deletions ddl/column.go
Expand Up @@ -104,21 +104,21 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
offset := 0
err = job.DecodeArgs(col, pos, &offset)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

columnInfo := findCol(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnExists.GenByArgs(col.Name)
}
} else {
columnInfo, offset, err = d.createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
// Set offset arg to job.
Expand Down Expand Up @@ -156,7 +156,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
d.asyncNotifyEvent(&Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
default:
Expand All @@ -176,17 +176,17 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

colInfo := findCol(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("column %s doesn't exist", colName)
}
if err = isDroppableColumn(tblInfo, colName); err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand Down Expand Up @@ -226,7 +226,7 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
d.asyncNotifyEvent(&Event{Tp: model.ActionDropColumn, TableInfo: tblInfo, ColumnInfo: colInfo})
default:
Expand Down Expand Up @@ -258,7 +258,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
if columnInfo.DefaultValue != nil {
colMeta.defaultVal, err = table.GetColDefaultValue(ctx, columnInfo)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
log.Errorf("[ddl] fatal: this case shouldn't happen, column %v err %v", columnInfo, err)
return errors.Trace(err)
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func (d *ddl) onSetDefaultValue(t *meta.Meta, job *model.Job) (ver int64, _ erro
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -400,7 +400,7 @@ func (d *ddl) onModifyColumn(t *meta.Meta, job *model.Job) (ver int64, _ error)
pos := &ast.ColumnPosition{}
err := job.DecodeArgs(newCol, oldColName, pos)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -416,7 +416,7 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo

oldCol := findCol(tblInfo.Columns, oldName.L)
if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(oldName, tblInfo.Name)
}

Expand All @@ -425,13 +425,13 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
if pos.Tp == ast.ColumnPositionAfter {
if oldName.L == pos.RelativeColumn.Name.L {
// `alter table tableName modify column b int after b` will return ver,ErrColumnNotExists.
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(oldName, tblInfo.Name)
}

relative := findCol(tblInfo.Columns, pos.RelativeColumn.Name.L)
if relative == nil || relative.State != model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(pos.RelativeColumn, tblInfo.Name)
}

Expand Down Expand Up @@ -482,11 +482,11 @@ func (d *ddl) doModifyColumn(t *meta.Meta, job *model.Job, col *model.ColumnInfo
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
}
Expand All @@ -498,7 +498,7 @@ func (d *ddl) updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInf
}
oldCol := findCol(tblInfo.Columns, oldColName.L)
if oldCol == nil || oldCol.State != model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrColumnNotExists.GenByArgs(newCol.Name, tblInfo.Name)
}
*oldCol = *newCol
Expand All @@ -507,11 +507,11 @@ func (d *ddl) updateColumn(t *meta.Meta, job *model.Job, newCol *model.ColumnInf
job.SchemaState = model.StatePublic
ver, err = updateTableInfo(t, job, tblInfo, originalState)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_test.go
Expand Up @@ -89,7 +89,7 @@ func checkEqualTable(c *C, t1, t2 *model.TableInfo) {
}

func checkHistoryJob(c *C, job *model.Job) {
c.Assert(job.State, Equals, model.JobSynced)
c.Assert(job.State, Equals, model.JobStateSynced)
}

func checkHistoryJobArgs(c *C, ctx context.Context, id int64, args *historyJobArgs) {
Expand Down
14 changes: 7 additions & 7 deletions ddl/ddl_worker.go
Expand Up @@ -115,7 +115,7 @@ func (d *ddl) handleUpdateJobError(t *meta.Meta, job *model.Job, err error) erro
job.BinlogInfo.Clean()
job.Error = toTError(err)
job.SchemaState = model.StateNone
job.State = model.JobCancelled
job.State = model.JobStateCancelled
err = d.finishDDLJob(t, job)
}
return errors.Trace(err)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (d *ddl) handleDDLJobQueue() error {

if job.IsDone() {
binloginfo.SetDDLBinlog(d.workerVars.BinlogClient, txn, job.ID, job.Query)
job.State = model.JobSynced
job.State = model.JobStateSynced
err = d.finishDDLJob(t, job)
return errors.Trace(err)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (d *ddl) handleDDLJobQueue() error {
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
if job.State == model.JobStateRunning || job.State == model.JobStateDone {
d.waitSchemaChanged(nil, waitTime, schemaVer)
}
if job.IsSynced() {
Expand Down Expand Up @@ -262,15 +262,15 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
log.Infof("[ddl] run the cancelling DDL job %s", job)
asyncNotify(d.notifyCancelReorgJob)
} else {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
job.Error = errCancelledDDLJob
job.ErrorCount++
return
}
}

if !job.IsRollingback() && !job.IsCancelling() {
job.State = model.JobRunning
job.State = model.JobStateRunning
}

var err error
Expand Down Expand Up @@ -305,14 +305,14 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) (ver int64) {
ver, err = d.onSetDefaultValue(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobCancelled
job.State = model.JobStateCancelled
err = errInvalidDDLJob.Gen("invalid ddl job %v", job)
}

// Save errors in job, so that others can know errors happened.
if err != nil {
// If job is not cancelled, we should log this error.
if job.State != model.JobCancelled {
if job.State != model.JobStateCancelled {
log.Errorf("[ddl] run DDL job err %v", errors.ErrorStack(err))
} else {
log.Infof("[ddl] the DDL job is normal to cancel because %v", errors.ErrorStack(err))
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Expand Up @@ -257,7 +257,7 @@ func checkCancelState(txn kv.Transaction, job *model.Job, test *testCancelJob) e
// If the action is adding index and the state is writing reorganization, it wants to test the case of cancelling the job when backfilling indexes.
// When the job satisfies this case of addIndexFirstReorg, the worker hasn't started to backfill indexes.
if test.cancelState == job.SchemaState && !addIndexFirstReorg {
if job.SchemaState == model.StateNone && job.State != model.JobDone {
if job.SchemaState == model.StateNone && job.State != model.JobStateDone {
// If the schema state is none, we only test the job is finished.
} else {
errs, err := inspectkv.CancelJobs(txn, test.jobIDs)
Expand Down
10 changes: 5 additions & 5 deletions ddl/foreign_key.go
Expand Up @@ -30,7 +30,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ err
var fkInfo model.FKInfo
err = job.DecodeArgs(&fkInfo)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
fkInfo.ID = allocateIndexID(tblInfo)
Expand All @@ -48,7 +48,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ err
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
default:
Expand All @@ -70,7 +70,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
)
err = job.DecodeArgs(&fkName)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

Expand All @@ -82,7 +82,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
}

if !found {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, infoschema.ErrForeignKeyNotExists.GenByArgs(fkName)
}

Expand All @@ -106,7 +106,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
return ver, nil
default:
Expand Down
4 changes: 2 additions & 2 deletions ddl/foreign_key_test.go
Expand Up @@ -134,7 +134,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
var hookErr error
tc := &TestDDLCallback{}
tc.onJobUpdated = func(job *model.Job) {
if job.State != model.JobDone {
if job.State != model.JobStateDone {
return
}
mu.Lock()
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
checkOK = false
mu.Unlock()
tc.onJobUpdated = func(job *model.Job) {
if job.State != model.JobDone {
if job.State != model.JobStateDone {
return
}
mu.Lock()
Expand Down
18 changes: 9 additions & 9 deletions ddl/index.go
Expand Up @@ -198,20 +198,20 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
)
err = job.DecodeArgs(&unique, &indexName, &idxColNames, &indexOption)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
if indexInfo != nil && indexInfo.State == model.StatePublic {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errDupKeyName.Gen("index already exist %s", indexName)
}

if indexInfo == nil {
indexInfo, err = buildIndexInfo(tblInfo, indexName, idxColNames, model.StateNone)
if err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if indexOption != nil {
Expand Down Expand Up @@ -304,7 +304,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}
// Finish this job.
job.State = model.JobDone
job.State = model.JobStateDone
job.BinlogInfo.AddTableInfo(ver, tblInfo)
default:
err = ErrInvalidIndexState.Gen("invalid index state %v", tblInfo.State)
Expand All @@ -314,7 +314,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
}

func (d *ddl) convert2RollbackJob(t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (ver int64, _ error) {
job.State = model.JobRollingback
job.State = model.JobStateRollingback
job.Args = []interface{}{indexInfo.Name}
// If add index job rollbacks in write reorganization state, its need to delete all keys which has been added.
// Its work is the same as drop index job do.
Expand Down Expand Up @@ -344,13 +344,13 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {

var indexName model.CIStr
if err = job.DecodeArgs(&indexName); err != nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
if indexInfo == nil {
job.State = model.JobCancelled
job.State = model.JobStateCancelled
return ver, ErrCantDropFieldOrKey.Gen("index %s doesn't exist", indexName)
}

Expand Down Expand Up @@ -391,9 +391,9 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {

// Finish this job.
if job.IsRollingback() {
job.State = model.JobRollbackDone
job.State = model.JobStateRollbackDone
} else {
job.State = model.JobDone
job.State = model.JobStateDone
d.asyncNotifyEvent(&Event{Tp: model.ActionDropIndex, TableInfo: tblInfo, IndexInfo: indexInfo})
}
job.BinlogInfo.AddTableInfo(ver, tblInfo)
Expand Down