Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: support checkpoint #224

Merged
merged 65 commits into from Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
52cf7c5
update chunk struct
WangXiangUSTC Mar 24, 2019
34d1730
save chunk information after generate
WangXiangUSTC Mar 24, 2019
510ae13
add a column
WangXiangUSTC Mar 25, 2019
3f4d938
add ExecSQLWithRetry function
WangXiangUSTC Mar 25, 2019
b3c7ac9
add function to load chunk info
WangXiangUSTC Mar 25, 2019
9913444
add function to get config hash
WangXiangUSTC Mar 25, 2019
c455da0
remove struct checkjob
WangXiangUSTC Mar 25, 2019
4a1a8fa
add ignore state
WangXiangUSTC Mar 25, 2019
17ed0a3
move some code into function checkChunkDataEqual
WangXiangUSTC Mar 25, 2019
4f98312
update chunk info after check
WangXiangUSTC Mar 25, 2019
1b52129
update summary info every 10 second
WangXiangUSTC Mar 26, 2019
975bf1d
modify update chunk
WangXiangUSTC Mar 26, 2019
bbc4529
refine code
WangXiangUSTC Mar 26, 2019
8b32443
add unit test
WangXiangUSTC Mar 26, 2019
67b9b1f
remove useless code
WangXiangUSTC Mar 26, 2019
f087f6c
minor update
WangXiangUSTC Mar 26, 2019
6a73fdb
add comment
WangXiangUSTC Mar 26, 2019
b4fa0c2
minor fix
WangXiangUSTC Mar 27, 2019
4971247
use json marshal in chunk.String
WangXiangUSTC Mar 28, 2019
f9d2cd3
add ignore err and retryable err function
WangXiangUSTC Mar 28, 2019
1ded757
add config use-checkpoint
WangXiangUSTC Mar 28, 2019
463975a
Update pkg/diff/chunk.go
IANTHEREAL Mar 29, 2019
d5fc18f
Update pkg/dbutil/common.go
IANTHEREAL Mar 29, 2019
f16a4e5
add comment for state
WangXiangUSTC Mar 29, 2019
2c15e52
move some function to checkpoint.go
WangXiangUSTC Mar 29, 2019
12c32d8
format
WangXiangUSTC Mar 29, 2019
8815bbc
add unit test for retryable error
WangXiangUSTC Mar 29, 2019
cce0c95
add test for ignore error
WangXiangUSTC Mar 29, 2019
09e9b27
minor update
WangXiangUSTC Mar 29, 2019
3e2e5a7
Update pkg/diff/checkpoint.go
IANTHEREAL Apr 1, 2019
1a12d55
Update pkg/diff/checkpoint.go
IANTHEREAL Apr 1, 2019
71da44e
Update pkg/diff/checkpoint.go
IANTHEREAL Apr 1, 2019
bb9d67e
check rows.Err()
WangXiangUSTC Apr 1, 2019
e71f457
refine table struct
WangXiangUSTC Apr 2, 2019
c5da565
add comment for config_hash
WangXiangUSTC Apr 2, 2019
1143889
update chunk table's pk
WangXiangUSTC Apr 2, 2019
07da364
Update pkg/diff/diff.go
IANTHEREAL Apr 2, 2019
90d4185
add comment for loadFromCheckPoint
WangXiangUSTC Apr 2, 2019
8723b3d
delete checkpoint in one transcation
WangXiangUSTC Apr 2, 2019
d93f0c3
use errors.Trace(err)
WangXiangUSTC Apr 2, 2019
501e1f6
add timeout in function parameter
WangXiangUSTC Apr 2, 2019
9e425f5
update isRetryableError
WangXiangUSTC Apr 2, 2019
cadb711
fix some fix sql may not write
WangXiangUSTC Apr 2, 2019
a5ff32e
add function to get summary and chunk info
WangXiangUSTC Apr 2, 2019
164a8e8
add unit test for checkpoint
WangXiangUSTC Apr 2, 2019
8243962
minor update
WangXiangUSTC Apr 2, 2019
fea3d82
merge master and resolve conflicts
WangXiangUSTC Apr 2, 2019
c4d8ca3
minor update
WangXiangUSTC Apr 2, 2019
4811b35
add warn log when chunks is empty
WangXiangUSTC Apr 2, 2019
a108c7a
minor update on ErrDupKeyName
WangXiangUSTC Apr 2, 2019
6b5f538
address comment
WangXiangUSTC Apr 2, 2019
8213953
remove ctx withtime in checkpoint function
WangXiangUSTC Apr 3, 2019
5e01383
update function name
WangXiangUSTC Apr 3, 2019
abf621e
update state comment
WangXiangUSTC Apr 3, 2019
18ce1e1
update table name
WangXiangUSTC Apr 3, 2019
06ee87d
print slow log although exec sql failed
WangXiangUSTC Apr 3, 2019
b28fdb2
format code
WangXiangUSTC Apr 3, 2019
0b340af
don't sleep at the last retry time && judge ctx.Done
WangXiangUSTC Apr 4, 2019
fda47a9
return err when value is invalid && remove defer in loop
WangXiangUSTC Apr 4, 2019
d3e2c3e
address comment
WangXiangUSTC Apr 4, 2019
a4f5261
return not found error when no chunk found
WangXiangUSTC Apr 4, 2019
4272d43
minor update
WangXiangUSTC Apr 4, 2019
dd73053
remove prepare function
WangXiangUSTC Apr 4, 2019
1c68b9a
close channel after no chunk send to channel
WangXiangUSTC Apr 9, 2019
5f20455
address comment
WangXiangUSTC Apr 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod1
Expand Up @@ -17,6 +17,7 @@ require (
github.com/pingcap/tidb v0.0.0-20190320062740-9071c7b5b9ed
github.com/pingcap/tipb v0.0.0-20190107072121-abbec73437b7
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-mysql v0.0.0-20190312052122-c6ab05a85eb8
go.uber.org/atomic v1.3.2
go.uber.org/zap v1.9.1
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e
Expand Down
6 changes: 6 additions & 0 deletions go.sum1
Expand Up @@ -144,10 +144,16 @@ github.com/prometheus/procfs v0.0.0-20180408092902-8b1c2da0d56d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07 h1:oI+RNwuC9jF2g2lP0u0cVEEZrc/AYBCuFdvwrLWM/6Q=
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
github.com/siddontang/go-mysql v0.0.0-20190312052122-c6ab05a85eb8 h1:8puKTg/UOIQ+ZiowY1ywmGsI08sWqrKD7HJ/j165CUM=
github.com/siddontang/go-mysql v0.0.0-20190312052122-c6ab05a85eb8/go.mod h1:/b8ZcWjAShCcHp2dWpjb1vTlNyiG03UeHEQr2jteOpI=
github.com/sirupsen/logrus v0.0.0-20170323161349-3bcb09397d6d/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.1.1/go.mod h1:zrgwTnHtNr00buQ1vSptGe8m1f/BbgsPukg8qsT7A+A=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down
135 changes: 135 additions & 0 deletions pkg/dbutil/common.go
Expand Up @@ -20,12 +20,19 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/types"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"
)

Expand All @@ -35,6 +42,15 @@ const (

// ImplicitColID is ID implicit column in TiDB
ImplicitColID = -1

// DefaultRetryTime is the default retry time to execute sql
DefaultRetryTime = 10

// DefaultTimeout is the default timeout for execute sql
DefaultTimeout time.Duration = 5 * time.Second

// SlowWarnLog defines the duration to log warn log of sql when exec time greater than
SlowWarnLog = 100 * time.Millisecond
)

var (
Expand Down Expand Up @@ -617,3 +633,122 @@ func ReplacePlaceholder(str string, args []string) string {
newStr := strings.Replace(str, "?", "'%s'", -1)
return fmt.Sprintf(newStr, utils.StringsToInterfaces(args)...)
}

// ExecSQLWithRetry executes sql with retry
func ExecSQLWithRetry(ctx context.Context, db *sql.DB, sql string, args ...interface{}) (err error) {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < DefaultRetryTime; i++ {
startTime := time.Now()
_, err = db.ExecContext(ctx, sql, args...)
takeDuration := time.Since(startTime)
if takeDuration > SlowWarnLog {
log.Warn("exec sql slow", zap.String("sql", sql), zap.Reflect("args", args), zap.Duration("take", takeDuration))
}
if err == nil {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

if ignoreError(err) {
csuzhangxc marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("ignore execute sql error", zap.Error(err))
return nil
}

if !isRetryableError(err) {
return errors.Trace(err)
}
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved

log.Warn("exe sql failed, will try again", zap.String("sql", sql), zap.Reflect("args", args), zap.Error(err))

if i == DefaultRetryTime-1 {
break
}

select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-time.After(10 * time.Millisecond):
}
}

return errors.Trace(err)
}

// ExecuteSQLs executes some sqls in one transaction
func ExecuteSQLs(ctx context.Context, db *sql.DB, sqls []string, args [][]interface{}) error {
txn, err := db.Begin()
if err != nil {
log.Error("exec sqls begin", zap.Error(err))
return errors.Trace(err)
}

for i := range sqls {
startTime := time.Now()

_, err = txn.ExecContext(ctx, sqls[i], args[i]...)
if err != nil {
log.Error("exec sql", zap.String("sql", sqls[i]), zap.Reflect("args", args[i]), zap.Error(err))
rerr := txn.Rollback()
if rerr != nil {
log.Error("rollback", zap.Error(err))
}
return errors.Trace(err)
}

takeDuration := time.Since(startTime)
if takeDuration > SlowWarnLog {
log.Warn("exec sql slow", zap.String("sql", sqls[i]), zap.Reflect("args", args[i]), zap.Duration("take", takeDuration))
}
}

err = txn.Commit()
if err != nil {
log.Error("exec sqls commit", zap.Error(err))
return errors.Trace(err)
}

return nil
}

func isRetryableError(err error) bool {
err = errors.Cause(err) // check the original error
mysqlErr, ok := err.(*mysql.MySQLError)
if !ok {
return false
}

switch mysqlErr.Number {
// ER_LOCK_DEADLOCK can retry to commit while meet deadlock
case tmysql.ErrUnknown, gmysql.ER_LOCK_DEADLOCK, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable:
return true
default:
return false
}
}

func ignoreError(err error) bool {
// TODO: now only ignore some ddl error, add some dml error later
if ignoreDDLError(err) {
return true
}

return false
}

func ignoreDDLError(err error) bool {
err = errors.Cause(err)
mysqlErr, ok := err.(*mysql.MySQLError)
if !ok {
return false
}

errCode := terror.ErrCode(mysqlErr.Number)
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code():
return true
case ddl.ErrDupKeyName.Code():
return true
default:
return false
}
}
60 changes: 60 additions & 0 deletions pkg/dbutil/common_test.go
Expand Up @@ -14,7 +14,14 @@
package dbutil

import (
"database/sql/driver"

"github.com/go-sql-driver/mysql"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
gmysql "github.com/siddontang/go-mysql/mysql"
)

func (*testDBSuite) TestReplacePlaceholder(c *C) {
Expand Down Expand Up @@ -69,3 +76,56 @@ func (*testDBSuite) TestTableName(c *C) {
c.Assert(tableName, Equals, testCase.expectTableName)
}
}

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
return &mysql.MySQLError{
Number: number,
Message: message,
}
}

func (s *testDBSuite) TestIsRetryableError(c *C) {
cases := []struct {
err error
isRetryable bool
}{
{newMysqlErr(tmysql.ErrNoDB, "no db error"), false},
{errors.New("unknown error"), false},
{newMysqlErr(tmysql.ErrUnknown, "i/o timeout"), true},
{newMysqlErr(tmysql.ErrDBCreateExists, "db already exists"), false},
{driver.ErrBadConn, false},
{newMysqlErr(gmysql.ER_LOCK_DEADLOCK, "Deadlock found when trying to get lock; try restarting transaction"), true},
{newMysqlErr(tmysql.ErrPDServerTimeout, "pd server timeout"), true},
{newMysqlErr(tmysql.ErrTiKVServerTimeout, "tikv server timeout"), true},
{newMysqlErr(tmysql.ErrTiKVServerBusy, "tikv server busy"), true},
{newMysqlErr(tmysql.ErrResolveLockTimeout, "resolve lock timeout"), true},
{newMysqlErr(tmysql.ErrRegionUnavailable, "region unavailable"), true},
}

for _, t := range cases {
c.Logf("err %v, expected %v", t.err, t.isRetryable)
c.Assert(isRetryableError(t.err), Equals, t.isRetryable)
}
}

func (s *testDBSuite) TestIsIgnoreError(c *C) {
cases := []struct {
err error
canIgnore bool
}{
{newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database, database exists"), true},
{newMysqlErr(uint16(infoschema.ErrDatabaseDropExists.Code()), "Can't drop database, database doesn't exists"), true},
{newMysqlErr(uint16(infoschema.ErrTableExists.Code()), "Can't create table, table exists"), true},
{newMysqlErr(uint16(infoschema.ErrTableDropExists.Code()), "Can't drop table, table dosen't exists"), true},
{newMysqlErr(uint16(infoschema.ErrColumnExists.Code()), "Duplicate column name"), true},
{newMysqlErr(uint16(infoschema.ErrIndexExists.Code()), "Duplicate Index"), true},

{newMysqlErr(uint16(999), "fake error"), false},
{errors.New("unknown error"), false},
}

for _, t := range cases {
c.Logf("err %v, expected %v", t.err, t.canIgnore)
c.Assert(ignoreError(t.err), Equals, t.canIgnore)
}
}