Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: only retry for special message with 1105 (#330) #332

Merged
merged 2 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions pkg/retry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ var (
"binlog checksum mismatch, data may be corrupted",
"get event err EOF",
}

// Retryable1105Msgs list the error messages of some retryable error with 1105 code.
Retryable1105Msgs = []string{
"Information schema is out of date",
"Information schema is changed",
}
)

// IsRetryableError tells whether this error should retry
Expand All @@ -54,8 +60,14 @@ func IsRetryableError(err error) bool {
if ok {
switch mysqlErr.Number {
// ER_LOCK_DEADLOCK can retry to commit while meet deadlock
case tmysql.ErrUnknown, gmysql.ER_LOCK_DEADLOCK, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable, tmysql.ErrQueryInterrupted, tmysql.ErrWriteConflictInTiDB, tmysql.ErrTableLocked, tmysql.ErrWriteConflict:
case gmysql.ER_LOCK_DEADLOCK, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable, tmysql.ErrQueryInterrupted, tmysql.ErrWriteConflictInTiDB, tmysql.ErrTableLocked, tmysql.ErrWriteConflict:
return true
case tmysql.ErrUnknown:
for _, msg := range Retryable1105Msgs {
if strings.Contains(mysqlErr.Message, msg) {
return true
}
}
default:
return false
}
Expand All @@ -72,18 +84,3 @@ func IsConnectionError(err error) bool {
}
return false
}

// IsRetryableErrorFastFailFilter tells whether this error should retry,
// filtering some incompatible DDL error to achieve fast fail.
func IsRetryableErrorFastFailFilter(err error) bool {
err2 := errors.Cause(err) // check the original error
if mysqlErr, ok := err2.(*mysql.MySQLError); ok && mysqlErr.Number == tmysql.ErrUnknown {
for _, msg := range UnsupportedDDLMsgs {
if strings.Contains(mysqlErr.Message, msg) {
return false
}
}
}

return IsRetryableError(err)
}
2 changes: 1 addition & 1 deletion syncer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
sqlRetriesTotal.WithLabelValues("stmt_exec", conn.cfg.Name).Add(1)
return true
}
if retry.IsRetryableErrorFastFailFilter(err) {
if retry.IsRetryableError(err) {
tctx.L().Warn("execute statements", zap.Int("retry", retryTime),
zap.String("queries", utils.TruncateInterface(queries, -1)),
zap.String("arguments", utils.TruncateInterface(args, -1)))
Expand Down
5 changes: 4 additions & 1 deletion syncer/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func (s *testSyncerSuite) TestIsRetryableError(c *C) {
}{
{newMysqlErr(tmysql.ErrNoDB, "no baseConn error"), false},
{errors.New("unknown error"), false},
{newMysqlErr(tmysql.ErrUnknown, "i/o timeout"), true},
{newMysqlErr(tmysql.ErrDBCreateExists, "baseConn already exists"), false},
{driver.ErrBadConn, false},
{newMysqlErr(gmysql.ER_LOCK_DEADLOCK, "Deadlock found when trying to get lock; try restarting transaction"), true},
Expand All @@ -49,6 +48,10 @@ func (s *testSyncerSuite) TestIsRetryableError(c *C) {
{newMysqlErr(tmysql.ErrTiKVServerBusy, "tikv server busy"), true},
{newMysqlErr(tmysql.ErrResolveLockTimeout, "resolve lock timeout"), true},
{newMysqlErr(tmysql.ErrRegionUnavailable, "region unavailable"), true},
{newMysqlErr(tmysql.ErrUnknown, "i/o timeout"), false},
{newMysqlErr(tmysql.ErrUnknown, "can't drop column with index"), false},
{newMysqlErr(tmysql.ErrUnknown, "Information schema is out of date"), true},
{newMysqlErr(tmysql.ErrUnknown, "Information schema is changed"), true},
}

for _, t := range cases {
Expand Down
2 changes: 1 addition & 1 deletion tests/load_interrupt/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ function run() {
check_row_count 2

# only failed at the first two time, will retry later and success
export GO_FAILPOINTS='github.com/pingcap/dm/loader/LoadExecCreateTableFailed=3*return("1105")'
export GO_FAILPOINTS='github.com/pingcap/dm/loader/LoadExecCreateTableFailed=3*return("1213")' # ER_LOCK_DEADLOCK, retryable error code
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
Expand Down