Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: do not keep history when the transaction retry is disabled #11192

Merged
merged 8 commits into from Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/db_test.go
Expand Up @@ -3098,9 +3098,9 @@ func (s *testDBSuite2) TestLockTables(c *C) {
// Test lock table by other session in transaction and commit with retry.
tk.MustExec("unlock tables")
tk2.MustExec("unlock tables")
tk.MustExec("set @@session.tidb_disable_txn_auto_retry=0")
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=1")
tk.MustExec("set @@session.tidb_disable_txn_auto_retry=0")
tk2.MustExec("lock tables t1 write")
_, err = tk.Exec("commit")
c.Assert(terror.ErrorEqual(err, infoschema.ErrTableLocked), IsTrue, Commentf("err: %v\n", err))
Expand Down
61 changes: 48 additions & 13 deletions session/session.go
Expand Up @@ -134,10 +134,8 @@ var (
)

type stmtRecord struct {
stmtID uint32
st sqlexec.Statement
stmtCtx *stmtctx.StatementContext
params []interface{}
}

// StmtHistory holds all histories of statements in a txn.
Expand All @@ -146,12 +144,10 @@ type StmtHistory struct {
}

// Add appends a stmt to history list.
func (h *StmtHistory) Add(stmtID uint32, st sqlexec.Statement, stmtCtx *stmtctx.StatementContext, params ...interface{}) {
func (h *StmtHistory) Add(st sqlexec.Statement, stmtCtx *stmtctx.StatementContext) {
s := &stmtRecord{
stmtID: stmtID,
st: st,
stmtCtx: stmtCtx,
params: append(([]interface{})(nil), params...),
}
h.history = append(h.history, s)
}
Expand Down Expand Up @@ -451,14 +447,8 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
err := s.doCommit(ctx)
if err != nil {
commitRetryLimit := s.sessionVars.RetryLimit
if s.sessionVars.DisableTxnAutoRetry && !s.sessionVars.InRestrictedSQL {
// Do not retry non-autocommit transactions.
// For autocommit single statement transactions, the history count is always 1.
// For explicit transactions, the statement count is more than 1.
history := GetHistory(s)
if history.Count() > 1 {
commitRetryLimit = 0
}
if !s.sessionVars.TxnCtx.CouldRetry {
commitRetryLimit = 0
}
// Don't retry in BatchInsert mode. As a counter-example, insert into t1 select * from t2,
// BatchInsert already commit the first batch 1000 rows, then it commit 1000-2000 and retry the statement,
Expand Down Expand Up @@ -517,6 +507,13 @@ func (s *session) CommitTxn(ctx context.Context) error {
if commitDetail != nil {
s.sessionVars.StmtCtx.MergeExecDetails(nil, commitDetail)
}

failpoint.Inject("keepHistory", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(err)
}
})

s.sessionVars.TxnCtx.Cleanup()
s.recordTransactionCounter(err)
return err
Expand Down Expand Up @@ -1247,10 +1244,48 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
}
return &s.txn, nil
}

// isTxnRetryable (if returns true) means the transaction could retry.
// If the transaction is in pessimistic mode, do not retry.
// If the session is already in transaction, enable retry or internal SQL could retry.
// If not, the transaction could always retry, because it should be auto committed transaction.
// Anyway the retry limit is 0, the transaction could not retry.
func (s *session) isTxnRetryable() bool {
sessVars := s.sessionVars

// The pessimistic transaction no need to retry.
if sessVars.TxnCtx.IsPessimistic {
return false
}

// If retry limit is 0, the transaction could not retry.
if sessVars.RetryLimit == 0 {
return false
}

// If the session is not InTxn, it is an auto-committed transaction.
// The auto-committed transaction could always retry.
if !sessVars.InTxn() {
return true
}

// The internal transaction could always retry.
if sessVars.InRestrictedSQL {
return true
}

// If the retry is enabled, the transaction could retry.
if !sessVars.DisableTxnAutoRetry {
return true
}

return false
}

func (s *session) NewTxn(ctx context.Context) error {
if s.txn.Valid() {
txnID := s.txn.StartTS()
Expand Down
78 changes: 76 additions & 2 deletions session/session_test.go
Expand Up @@ -545,7 +545,7 @@ func (s *testSessionSuite) TestRetryCleanTxn(c *C) {
c.Assert(err, IsNil)
stmt, _ := session.Compile(context.TODO(), tk.Se, stmtNode)
executor.ResetContextOfStmt(tk.Se, stmtNode)
history.Add(0, stmt, tk.Se.GetSessionVars().StmtCtx)
history.Add(stmt, tk.Se.GetSessionVars().StmtCtx)
_, err = tk.Exec("commit")
c.Assert(err, NotNil)
txn, err := tk.Se.Txn(false)
Expand All @@ -559,6 +559,7 @@ func (s *testSessionSuite) TestReadOnlyNotInHistory(c *C) {
tk.MustExec("create table history (a int)")
tk.MustExec("insert history values (1), (2), (3)")
tk.MustExec("set @@autocommit = 0")
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustQuery("select * from history")
history := session.GetHistory(tk.Se)
c.Assert(history.Count(), Equals, 0)
Expand All @@ -572,6 +573,76 @@ func (s *testSessionSuite) TestReadOnlyNotInHistory(c *C) {
c.Assert(history.Count(), Equals, 0)
}

func (s *testSessionSuite) TestNoHistoryWhenDisableRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table history (a int)")
tk.MustExec("set @@autocommit = 0")

// retry_limit = 0 will not add history.
tk.MustExec("set @@tidb_retry_limit = 0")
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 0)

// Disable auto_retry will add history for auto committed only
tk.MustExec("set @@autocommit = 1")
tk.MustExec("set @@tidb_retry_limit = 10")
tk.MustExec("set @@tidb_disable_txn_auto_retry = 1")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/keepHistory", `1*return(true)->return(false)`), IsNil)
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 1)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/keepHistory"), IsNil)
tk.MustExec("begin")
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 0)
tk.MustExec("commit")

// Enable auto_retry will add history for both.
tk.MustExec("set @@tidb_disable_txn_auto_retry = 0")
c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/keepHistory", `1*return(true)->return(false)`), IsNil)
tk.MustExec("insert history values (1)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/keepHistory"), IsNil)
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 1)
tk.MustExec("begin")
tk.MustExec("insert history values (1)")
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 2)
tk.MustExec("commit")
}

func (s *testSessionSuite) TestNoRetryForCurrentTxn(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table history (a int)")
tk.MustExec("insert history values (1)")

// Firstly, disable retry.
tk.MustExec("set tidb_disable_txn_auto_retry = 1")
tk.MustExec("begin")
tk.MustExec("update history set a = 2")
// Enable retry now.
tk.MustExec("set tidb_disable_txn_auto_retry = 0")

tk1.MustExec("update history set a = 3")
c.Assert(tk.ExecToErr("commit"), NotNil)
}

func (s *testSessionSuite) TestRetryForCurrentTxn(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table history (a int)")
tk.MustExec("insert history values (1)")

// Firstly, enable retry.
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("update history set a = 2")
// Disable retry now.
tk.MustExec("set tidb_disable_txn_auto_retry = 1")

tk1.MustExec("update history set a = 3")
tk.MustExec("commit")
tk.MustQuery("select * from history").Check(testkit.Rows("2"))
}

// TestTruncateAlloc tests that the auto_increment ID does not reuse the old table's allocator.
func (s *testSessionSuite) TestTruncateAlloc(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
Expand Down Expand Up @@ -990,6 +1061,7 @@ func (s *testSessionSuite) TestBinaryReadOnly(c *C) {
id2, _, _, err := tk.Se.PrepareStmt("insert into t values (?)")
c.Assert(err, IsNil)
tk.MustExec("set autocommit = 0")
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
_, err = tk.Se.ExecutePreparedStmt(context.Background(), id, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
c.Assert(session.GetHistory(tk.Se).Count(), Equals, 0)
Expand Down Expand Up @@ -2177,6 +2249,7 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
defer func() {
config.GetGlobalConfig().Performance.StmtCountLimit = saved
}()
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustExec("begin")
tk.MustExec("insert into stmt_count_limit values (1)")
tk.MustExec("insert into stmt_count_limit values (2)")
Expand All @@ -2195,6 +2268,7 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
func (s *testSessionSuite) TestBatchCommit(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set tidb_batch_commit = 1")
tk.MustExec("set tidb_disable_txn_auto_retry = 0")
tk.MustExec("create table t (id int)")
saved := config.GetGlobalConfig().Performance
config.GetGlobalConfig().Performance.StmtCountLimit = 3
Expand Down Expand Up @@ -2440,7 +2514,7 @@ func (s *testSchemaSuite) TestDisableTxnAutoRetry(c *C) {
// session 1 starts a transaction early.
// execute a select statement to clear retry history.
tk1.MustExec("select 1")
tk1.Se.NewTxn(context.Background())
tk1.Se.PrepareTxnCtx(context.Background())
// session 2 update the value.
tk2.MustExec("update no_retry set id = 4")
// AutoCommit update will retry, so it would not fail.
Expand Down
17 changes: 10 additions & 7 deletions session/tidb.go
Expand Up @@ -176,13 +176,14 @@ func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessV
return se.CommitTxn(ctx)
}

return checkStmtLimit(ctx, sctx, se, sessVars)
return checkStmtLimit(ctx, sctx, se)
}

func checkStmtLimit(ctx context.Context, sctx sessionctx.Context, se *session, sessVars *variable.SessionVars) error {
func checkStmtLimit(ctx context.Context, sctx sessionctx.Context, se *session) error {
// If the user insert, insert, insert ... but never commit, TiDB would OOM.
// So we limit the statement count in a transaction here.
var err error
sessVars := se.GetSessionVars()
history := GetHistory(sctx)
if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
if !sessVars.BatchCommit {
Expand All @@ -195,7 +196,7 @@ func checkStmtLimit(ctx context.Context, sctx sessionctx.Context, se *session, s
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
sessVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
return err
}
Expand All @@ -222,12 +223,14 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}
rs, err = s.Exec(ctx)
sessVars := se.GetSessionVars()
// All the history should be added here.
sessVars.TxnCtx.StatementCount++
if !s.IsReadOnly(sessVars) {
if err == nil && !sessVars.TxnCtx.IsPessimistic {
GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx)
// All the history should be added here.
if err == nil && sessVars.TxnCtx.CouldRetry {
GetHistory(sctx).Add(s, sessVars.StmtCtx)
}

// Handle the stmt commit/rollback.
if txn, err1 := sctx.Txn(false); err1 == nil {
if txn.Valid() {
if err != nil {
Expand All @@ -240,8 +243,8 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
logutil.BgLogger().Error("get txn error", zap.Error(err1))
}
}

err = finishStmt(ctx, sctx, se, sessVars, err)

if se.txn.pending() {
// After run statement finish, txn state is still pending means the
// statement never need a Txn(), such as:
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/session.go
Expand Up @@ -104,6 +104,7 @@ type TransactionContext struct {
DirtyDB interface{}
Binlog interface{}
InfoSchema interface{}
CouldRetry bool
History interface{}
SchemaVersion int64
StartTS uint64
Expand Down Expand Up @@ -135,7 +136,7 @@ func (tc *TransactionContext) UpdateDeltaForTable(tableID int64, delta int64, co

// Cleanup clears up transaction info that no longer use.
func (tc *TransactionContext) Cleanup() {
//tc.InfoSchema = nil; we cannot do it now, because some operation like handleFieldList depend on this.
// tc.InfoSchema = nil; we cannot do it now, because some operation like handleFieldList depend on this.
tc.DirtyDB = nil
tc.Binlog = nil
tc.History = nil
Expand Down