Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix(load): stop goroutines after restore returned (#744) (#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc committed Jun 17, 2020
1 parent 16d1fc9 commit 7e59da8
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 40 deletions.
8 changes: 5 additions & 3 deletions dm/worker/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,18 @@ func (w *Worker) Error(stName string) []*pb.SubTaskError {
return errs
}

// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Error` field, so no duplicated
// error message will be displayed in `query-status`, because the `Msg` field contains enough error information.
// statusProcessResult returns a clone of *pb.ProcessResult, but omit the `Msg` field, so no duplicated
// error message will be displayed in `query-status`, because the `Error` field contains enough error information.
func statusProcessResult(pr *pb.ProcessResult) *pb.ProcessResult {
if pr == nil {
return nil
}
result := proto.Clone(pr).(*pb.ProcessResult)
if result != nil {
for i := range result.Errors {
result.Errors[i].Error = nil
if result.Errors[i].Error != nil {
result.Errors[i].Msg = ""
}
}
}
return result
Expand Down
2 changes: 1 addition & 1 deletion dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (t *testSubTask) TestSubTaskNormalUsage(c *C) {
st.units = nil
st.Run()
c.Assert(st.Stage(), Equals, pb.Stage_Paused)
c.Assert(strings.Contains(st.Result().Errors[0].Msg, "has no dm units for mode"), IsTrue)
c.Assert(strings.Contains(st.Result().Errors[0].Error.String(), "has no dm units for mode"), IsTrue)

mockDumper := NewMockUnit(pb.UnitType_Dump)
mockLoader := NewMockUnit(pb.UnitType_Load)
Expand Down
30 changes: 13 additions & 17 deletions dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,25 +263,21 @@ func isResumableError(err *pb.ProcessError) bool {
return true
}

switch err.Type {
case pb.ErrorType_ExecSQL:
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
// not elegant code, because TiDB doesn't expose some error
for _, msg := range retry.UnsupportedDDLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
}
for _, msg := range retry.UnsupportedDMLMsgs {
if strings.Contains(strings.ToLower(err.Error.RawCause), strings.ToLower(msg)) {
return false
}
case pb.ErrorType_UnknownError:
if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) {
return false
}
}
if err.Error.ErrCode == int32(terror.ErrParserParseRelayLog.Code()) {
for _, msg := range retry.ParseRelayLogErrMsgs {
if strings.Contains(strings.ToLower(err.Error.Message), strings.ToLower(msg)) {
return false
}
}
}
Expand Down
31 changes: 15 additions & 16 deletions dm/worker/task_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,32 +292,31 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) {

func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) {
testCases := []struct {
errorType pb.ErrorType
err error
resumable bool
}{
// only DM new error is checked
{pb.ErrorType_ExecSQL, &tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, true},
{pb.ErrorType_ExecSQL, &tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, true},
{pb.ErrorType_ExecSQL, nil, true},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false},
{pb.ErrorType_ExecSQL, terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false},
{&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, true},
{&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, true},
{terror.ErrDBExecuteFailed.Generate("file test.t3.sql: execute statement failed: USE `test_abc`;: context canceled"), true},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported modify column length 20 is less than origin 40", tmysql.DefaultMySQLState}, "alter table t modify col varchar(20)"), false},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1105, "unsupported drop integer primary key", tmysql.DefaultMySQLState}, "alter table t drop column id"), false},
{terror.ErrDBExecuteFailed.Delegate(errors.New("Error 1062: Duplicate entry '5' for key 'PRIMARY'")), false},
{terror.ErrDBExecuteFailed.Delegate(errors.New("INSERT INTO `db`.`tbl` (`c1`,`c2`) VALUES (?,?);: Error 1406: Data too long for column 'c2' at row 1")), false},
{terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{1067, "Invalid default value for 'ct'", tmysql.DefaultMySQLState}, "CREATE TABLE `tbl` (`c1` int(11) NOT NULL,`ct` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',PRIMARY KEY (`c1`)) ENGINE=InnoDB DEFAULT CHARSET=latin1"), false},
// real error is generated by `Delegate` and multiple `Annotatef`, we use `New` to simplify it
{pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false},
{pb.ErrorType_UnknownError, terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false},
{terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 555 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: binlog checksum mismatch, data may be corrupted"), false},
{terror.ErrParserParseRelayLog.New("parse relay log file bin.000018 from offset 500 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file bin.000018 from offset 0 in dir /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004: parse relay log file /home/tidb/deploy/relay_log/d2e831df-b4ec-11e9-9237-0242ac110008.000004/bin.000018: get event err EOF, need 1567488104 but got 316323"), false},
// unresumable terror codes
{pb.ErrorType_UnknownError, terror.ErrSyncUnitDDLWrongSequence.Generate("wrong sequence", "right sequence"), false},
{pb.ErrorType_UnknownError, terror.ErrSyncerShardDDLConflict.Generate("conflict DDL"), false},
{terror.ErrSyncUnitDDLWrongSequence.Generate("wrong sequence", "right sequence"), false},
{terror.ErrSyncerShardDDLConflict.Generate("conflict DDL"), false},
// others
{pb.ErrorType_UnknownError, nil, true},
{pb.ErrorType_UnknownError, errors.New("unknown error"), true},
{nil, true},
{errors.New("unknown error"), true},
}

for _, tc := range testCases {
err := unit.NewProcessError(tc.errorType, tc.err)
err := unit.NewProcessError(pb.ErrorType_UnknownError, tc.err)
fmt.Printf("error: %v\n", err)
c.Assert(isResumableError(err), check.Equals, tc.resumable)
}
Expand Down
1 change: 1 addition & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {

err := l.Restore(newCtx)
close(l.runFatalChan) // Restore returned, all potential fatal sent to l.runFatalChan
cancel() // cancel the goroutines created in `Restore`.

failpoint.Inject("dontWaitWorkerExit", func(_ failpoint.Value) {
l.logCtx.L().Info("", zap.String("failpoint", "dontWaitWorkerExit"))
Expand Down
1 change: 1 addition & 0 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
"unsupported modify collate",
"unsupported drop integer primary key",
"Unsupported collation",
"Invalid default value for",
}

// UnsupportedDMLMsgs list the error messages of some un-recoverable DML, which is used in task auto recovery
Expand Down
2 changes: 1 addition & 1 deletion tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function test_session_config(){
dmctl_start_task "$WORK_DIR/dm-task.yaml"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"'tidb_retry_limit' can't be set to the value" 2
"'tidb_retry_limit' can't be set to the value" 4
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 3
Expand Down
4 changes: 2 additions & 2 deletions tests/relay_interrupt/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function run() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -w 127.0.0.1:$WORKER1_PORT" \
"no sub task started" 1 \
"ERROR" 1
"ERROR" 2

echo "start task and query status, task have error message"
task_conf="$cur/conf/dm-task.yaml"
Expand All @@ -58,7 +58,7 @@ function run() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -w 127.0.0.1:$WORKER1_PORT" \
"there aren't any data under relay log directory" 1 \
"ERROR" 1
"ERROR" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status" \
"\"taskName\": \"test\"" 1 \
Expand Down

0 comments on commit 7e59da8

Please sign in to comment.