Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add a variable to control the back off time and disable txn auto retry by default #10266

Merged
merged 7 commits into from May 8, 2019
@@ -1280,6 +1280,7 @@ func (s *testDBSuite8) TestColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("create table t2 (c1 int, c2 int, c3 int)")
s.tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
s.testAddColumn(c)
s.testDropColumn(c)
s.tk.MustExec("drop table t2")
@@ -346,6 +346,17 @@ func (s *testSuite2) TestSetVar(c *C) {
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_table_split_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))

tk.MustExec("set session tidb_back_off_weight = 3")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("3"))
tk.MustExec("set session tidb_back_off_weight = 20")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("20"))
_, err = tk.Exec("set session tidb_back_off_weight = -1")
c.Assert(err, NotNil)
_, err = tk.Exec("set global tidb_back_off_weight = 0")
c.Assert(err, NotNil)
tk.MustExec("set global tidb_back_off_weight = 10")
tk.MustQuery("select @@global.tidb_back_off_weight;").Check(testkit.Rows("10"))
}

func (s *testSuite2) TestSetCharset(c *C) {
@@ -2467,6 +2467,7 @@ func (s *testSuite4) TestAutoIDInRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int not null auto_increment primary key)")

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("insert into t values ()")
tk.MustExec("insert into t values (),()")
@@ -18,6 +18,9 @@ type Variables struct {
// BackoffLockFast specifies the LockFast backoff base duration in milliseconds.
BackoffLockFast int

// BackOffWeight specifies the weight of the max back off time duration.
BackOffWeight int

This comment has been minimized.

Copy link
@zz-jason

zz-jason May 7, 2019

Member

backoff weight is a little confusion, how about directly set the max backoff time?

This comment has been minimized.

Copy link
@jackysp

jackysp May 7, 2019

Author Member

There are many kinds of backoffs. Maybe set them separately in the future.


// Hook is used for test to verify the variable take effect.
Hook func(name string, vars *Variables)
}
@@ -26,6 +29,7 @@ type Variables struct {
func NewVariables() *Variables {
return &Variables{
BackoffLockFast: DefBackoffLockFast,
BackOffWeight: DefBackOffWeight,
}
}

@@ -35,4 +39,5 @@ var DefaultVars = NewVariables()
// Default values
const (
DefBackoffLockFast = 100
DefBackOffWeight = 2
)
@@ -682,6 +682,7 @@ func runTestConcurrentUpdate(c *C) {
dbt.mustExec("drop table if exists test2")
dbt.mustExec("create table test2 (a int, b int)")
dbt.mustExec("insert test2 values (1, 1)")
dbt.mustExec("set @@tidb_disable_txn_auto_retry = 0")

txn1, err := dbt.db.Begin()
c.Assert(err, IsNil)
@@ -356,8 +356,8 @@ func (s *session) doCommit(ctx context.Context) error {

// mockCommitError and mockGetTSErrorInRetry use to test PR #8743.
failpoint.Inject("mockCommitError", func(val failpoint.Value) {
if val.(bool) && mockCommitErrorOnce {
mockCommitErrorOnce = false
if val.(bool) && kv.IsMockCommitErrorEnable() {
kv.MockCommitErrorDisable()
failpoint.Return(kv.ErrRetryable)
}
})
@@ -1632,6 +1632,7 @@ var builtinGlobalVariable = []string{
variable.TiDBHashAggPartialConcurrency,
variable.TiDBHashAggFinalConcurrency,
variable.TiDBBackoffLockFast,
variable.TiDBBackOffWeight,
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
@@ -95,14 +95,14 @@ func (s *testSessionSuite) TestGetTSFailDirtyState(c *C) {
func (s *testSessionSuite) TestGetTSFailDirtyStateInretry(c *C) {
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockCommitError"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry"), IsNil)
}()

tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int)")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockCommitError", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockGetTSErrorInRetry", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/mockGetTSErrorInRetry", `return(true)`), IsNil)
tk.MustExec("insert into t values (2)")
tk.MustQuery(`select * from t`).Check(testkit.Rows("2"))
}
@@ -293,6 +293,7 @@ func (s *testSessionSuite) TestRowLock(c *C) {
tk.MustExec("insert t values (12, 2, 3)")
tk.MustExec("insert t values (13, 2, 3)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set c2=21 where c1=11")

@@ -507,6 +508,7 @@ func (s *testSessionSuite) TestRetryResetStmtCtx(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table retrytxn (a int unique, b int)")
tk.MustExec("insert retrytxn values (1, 1)")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("update retrytxn set b = b + 1 where a = 1")

@@ -665,6 +667,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) {
tk.MustExec("create table t (c1 int, c2 int, c3 int)")
tk.MustExec("insert t values (11, 2, 3)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set c2=? where c1=11;", 21)

@@ -881,6 +884,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("create table t (c2 int, c1 int not null auto_increment, PRIMARY KEY (c1))")
tk.MustExec("insert into t (c2) values (1), (2), (3), (4), (5)")

@@ -1308,6 +1312,9 @@ func (s *testSessionSuite) TestRetry(c *C) {
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk3 := testkit.NewTestKitWithInit(c, s.store)
tk3.MustExec("SET SESSION autocommit=0;")
tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk2.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk3.MustExec("set @@tidb_disable_txn_auto_retry = 0")

var wg sync.WaitGroup
wg.Add(3)
@@ -1449,6 +1456,7 @@ func (s *testSessionSuite) TestResetCtx(c *C) {

tk.MustExec("create table t (i int auto_increment not null key);")
tk.MustExec("insert into t values (1);")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin;")
tk.MustExec("insert into t values (10);")
tk.MustExec("update t set i = i + row_count();")
@@ -1480,6 +1488,8 @@ func (s *testSessionSuite) TestUnique(c *C) {
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec(`CREATE TABLE test ( id int(11) UNSIGNED NOT NULL AUTO_INCREMENT, val int UNIQUE, PRIMARY KEY (id)); `)
tk.MustExec("begin;")
tk.MustExec("insert into test(id, val) values(1, 1);")
@@ -1764,6 +1774,7 @@ func (s *testSchemaSuite) TestSchemaCheckerSQL(c *C) {
tk.MustExec(`insert into t values(1, 1);`)

// The schema version is out of date in the first transaction, but the SQL can be retried.
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk.MustExec(`begin;`)
tk1.MustExec(`alter table t add index idx(c);`)
tk.MustExec(`insert into t values(2, 2);`)
@@ -1808,6 +1819,7 @@ func (s *testSchemaSuite) TestPrepareStmtCommitWhenSchemaChanged(c *C) {
tk1.MustExec("execute stmt using @a, @a")
tk1.MustExec("commit")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk.MustExec("alter table t drop column b")
tk1.MustExec("execute stmt using @a, @a")
@@ -1820,6 +1832,7 @@ func (s *testSchemaSuite) TestCommitWhenSchemaChanged(c *C) {
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (a int, b int)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("insert into t values (1, 1)")

@@ -1837,6 +1850,7 @@ func (s *testSchemaSuite) TestRetrySchemaChange(c *C) {
tk.MustExec("create table t (a int primary key, b int)")
tk.MustExec("insert into t values (1, 1)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set b = 5 where a = 1")

@@ -1867,6 +1881,7 @@ func (s *testSchemaSuite) TestRetryMissingUnionScan(c *C) {
tk.MustExec("create table t (a int primary key, b int unique, c int)")
tk.MustExec("insert into t values (1, 1, 1)")

tk1.MustExec("set @@tidb_disable_txn_auto_retry = 0")
tk1.MustExec("begin")
tk1.MustExec("update t set b = 1, c = 2 where b = 2")
tk1.MustExec("update t set b = 1, c = 2 where a = 1")
@@ -2320,9 +2335,12 @@ func (s *testSessionSuite) TestKVVars(c *C) {
tk.MustExec("insert kvvars values (1, 1)")
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustExec("set @@tidb_backoff_lock_fast = 1")
tk2.MustExec("set @@tidb_back_off_weight = 100")
backoffVal := new(int64)
backOffWeightVal := new(int32)
tk2.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) {
atomic.StoreInt64(backoffVal, int64(vars.BackoffLockFast))
atomic.StoreInt32(backOffWeightVal, int32(vars.BackOffWeight))
}
wg := new(sync.WaitGroup)
wg.Add(2)
@@ -2345,7 +2363,14 @@ func (s *testSessionSuite) TestKVVars(c *C) {
wg.Done()
}()
wg.Wait()
for {
tk2.MustQuery("select * from kvvars")
if atomic.LoadInt32(backOffWeightVal) != 0 {
break
}
}
c.Assert(atomic.LoadInt64(backoffVal), Equals, int64(1))
c.Assert(atomic.LoadInt32(backOffWeightVal), Equals, int32(100))
}

func (s *testSessionSuite) TestCommitRetryCount(c *C) {
@@ -346,25 +346,11 @@ type txnFuture struct {
mockFail bool
}

// mockGetTSErrorInRetryOnce use to make sure gofail mockGetTSErrorInRetry only mock get TS error once.
var mockGetTSErrorInRetryOnce = true

func (tf *txnFuture) wait() (kv.Transaction, error) {
if tf.mockFail {
return nil, errors.New("mock get timestamp fail")
}

// mockGetTSErrorInRetry should wait mockCommitErrorOnce first, then will run into retry() logic.
// Then mockGetTSErrorInRetry will return retryable error when first retry.
// Before PR #8743, we don't cleanup txn after meet error such as error like: PD server timeout[try again later]
// This may cause duplicate data to be written.
failpoint.Inject("mockGetTSErrorInRetry", func(val failpoint.Value) {
if val.(bool) && mockGetTSErrorInRetryOnce && !mockCommitErrorOnce {
mockGetTSErrorInRetryOnce = false
failpoint.Return(nil, errors.Errorf("PD server timeout[try again later]"))
}
})

startTS, err := tf.future.Wait()
if err == nil {
return tf.store.BeginWithStartTS(startTS)
@@ -699,6 +699,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.IndexSerialScanConcurrency = tidbOptPositiveInt32(val, DefIndexSerialScanConcurrency)
case TiDBBackoffLockFast:
s.KVVars.BackoffLockFast = tidbOptPositiveInt32(val, kv.DefBackoffLockFast)
case TiDBBackOffWeight:
s.KVVars.BackOffWeight = tidbOptPositiveInt32(val, kv.DefBackOffWeight)
case TiDBConstraintCheckInPlace:
s.ConstraintCheckInPlace = TiDBOptOn(val)
case TiDBBatchInsert:
@@ -670,6 +670,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBHashAggPartialConcurrency, strconv.Itoa(DefTiDBHashAggPartialConcurrency)},
{ScopeGlobal | ScopeSession, TiDBHashAggFinalConcurrency, strconv.Itoa(DefTiDBHashAggFinalConcurrency)},
{ScopeGlobal | ScopeSession, TiDBBackoffLockFast, strconv.Itoa(kv.DefBackoffLockFast)},
{ScopeGlobal | ScopeSession, TiDBBackOffWeight, strconv.Itoa(kv.DefBackOffWeight)},
{ScopeGlobal | ScopeSession, TiDBRetryLimit, strconv.Itoa(DefTiDBRetryLimit)},
{ScopeGlobal | ScopeSession, TiDBDisableTxnAutoRetry, BoolToIntStr(DefTiDBDisableTxnAutoRetry)},
{ScopeGlobal | ScopeSession, TiDBConstraintCheckInPlace, BoolToIntStr(DefTiDBConstraintCheckInPlace)},
@@ -219,6 +219,12 @@ const (
// tidb_backoff_lock_fast is used for tikv backoff base time in milliseconds.
TiDBBackoffLockFast = "tidb_backoff_lock_fast"

// tidb_back_off_weight is used to control the max back off time in TiDB.
// The default maximum back off time is a small value.
// BackOffWeight could multiply it to let the user adjust the maximum time for retrying.
// Only positive integers can be accepted, which means that the maximum back off time can only grow.
TiDBBackOffWeight = "tidb_back_off_weight"

// tidb_ddl_reorg_worker_cnt defines the count of ddl reorg workers.
TiDBDDLReorgWorkerCount = "tidb_ddl_reorg_worker_cnt"

@@ -300,7 +306,7 @@ const (
DefTiDBMemQuotaDistSQL = 32 << 30 // 32GB.
DefTiDBGeneralLog = 0
DefTiDBRetryLimit = 10
DefTiDBDisableTxnAutoRetry = false
DefTiDBDisableTxnAutoRetry = true
DefTiDBConstraintCheckInPlace = false
DefTiDBHashJoinConcurrency = 5
DefTiDBProjectionConcurrency = 4
@@ -388,7 +388,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
TiDBHashAggFinalConcurrency,
TiDBDistSQLScanConcurrency,
TiDBIndexSerialScanConcurrency, TiDBDDLReorgWorkerCount,
TiDBBackoffLockFast,
TiDBBackoffLockFast, TiDBBackOffWeight,
TiDBDMLBatchSize, TiDBOptimizerSelectivityLevel:
v, err := strconv.Atoi(value)
if err != nil {
@@ -20,7 +20,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util"
)

func TestT(t *testing.T) {
@@ -513,7 +512,7 @@ func (s *testMockTiKVSuite) TestDeleteRange(c *C) {

func (s *testMockTiKVSuite) mustWriteWriteConflict(c *C, errs []error, i int) {
c.Assert(errs[i], NotNil)
c.Assert(strings.Contains(errs[i].Error(), util.WriteConflictMarker), IsTrue)
c.Assert(strings.Contains(errs[i].Error(), writeConflictMarker), IsTrue)
}

func (s *testMockTiKVSuite) TestRC(c *C) {
@@ -23,7 +23,6 @@ import (
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
)

@@ -35,6 +34,8 @@ const (
typeRollback
)

const writeConflictMarker = "write conflict"

type mvccValue struct {
valueType mvccValueType
startTS uint64
@@ -256,7 +257,7 @@ func (e *mvccEntry) Get(ts uint64, isoLevel kvrpcpb.IsolationLevel) ([]byte, err
func (e *mvccEntry) Prewrite(mutation *kvrpcpb.Mutation, startTS uint64, primary []byte, ttl uint64) error {
if len(e.values) > 0 {
if e.values[0].commitTS >= startTS {
return ErrRetryable(util.WriteConflictMarker)
return ErrRetryable(writeConflictMarker)
}
}
if e.lock != nil {
@@ -27,7 +27,6 @@ import (
"github.com/pingcap/goleveldb/leveldb/util"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/terror"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
@@ -572,7 +571,7 @@ func prewriteMutation(db *leveldb.DB, batch *leveldb.Batch, mutation *kvrpcpb.Mu
}
// Note that it's a write conflict here, even if the value is a rollback one.
if ok && dec1.value.commitTS >= startTS {
return ErrRetryable(tidbutil.WriteConflictMarker)
return ErrRetryable(writeConflictMarker)
}

op := mutation.GetOp()
@@ -24,7 +24,6 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/logutil"
@@ -40,7 +39,7 @@ const (
type brokenStore struct{}

func (s *brokenStore) Open(schema string) (kv.Storage, error) {
return nil, errors.New("try again later")
return nil, kv.ErrRetryable
}

func TestT(t *testing.T) {
@@ -662,5 +661,5 @@ func (s *testKVSuite) TestRetryOpenStore(c *C) {
}
c.Assert(err, NotNil)
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second), Commentf("elapse: %s", elapse))
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.