Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: add a batch commit session variable for the large transaction #8293

Merged
merged 16 commits into from
Dec 10, 2018
2 changes: 2 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ metrics-interval = 15
[performance]
# Max CPUs to use, 0 use number of CPUs in the machine.
max-procs = 0

# Max memory size to use, 0 use the total usable memory in the machine.
max-memory = 0

# StmtCountLimit limits the max count of statement inside a transaction.
stmt-count-limit = 5000

Expand Down
9 changes: 9 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustQuery("select @@session.tidb_query_log_max_len;").Check(testkit.Rows("20"))
_, err = tk.Exec("set global tidb_query_log_max_len = 20")
c.Assert(err, NotNil)

tk.MustExec("set tidb_batch_commit = 0")
tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("0"))
tk.MustExec("set tidb_batch_commit = 1")
tk.MustQuery("select @@session.tidb_batch_commit;").Check(testkit.Rows("1"))
_, err = tk.Exec("set global tidb_batch_commit = 0")
c.Assert(err, NotNil)
_, err = tk.Exec("set global tidb_batch_commit = 2")
c.Assert(err, NotNil)
}

func (s *testSuite) TestSetCharset(c *C) {
Expand Down
57 changes: 57 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,63 @@ func (s *testSessionSuite) TestStatementCountLimit(c *C) {
c.Assert(err, NotNil)
}

func (s *testSessionSuite) TestBatchCommit(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set tidb_batch_commit = 1")
tk.MustExec("create table t (id int)")
saved := config.GetGlobalConfig().Performance
config.GetGlobalConfig().Performance.StmtCountLimit = 3
defer func() {
config.GetGlobalConfig().Performance = saved
}()
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("SET SESSION autocommit = 1")
tk.MustExec("begin")
tk.MustExec("insert into t values (1)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (2)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("rollback")
tk1.MustQuery("select * from t").Check(testkit.Rows())

// The above rollback will not make the session in transaction.
tk.MustExec("insert into t values (1)")
tk1.MustQuery("select * from t").Check(testkit.Rows("1"))
tk.MustExec("delete from t")

tk.MustExec("begin")
tk.MustExec("insert into t values (5)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (6)")
tk1.MustQuery("select * from t").Check(testkit.Rows())
tk.MustExec("insert into t values (7)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))

// The session is still in transaction.
tk.MustExec("insert into t values (8)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("insert into t values (9)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("insert into t values (10)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7"))
tk.MustExec("commit")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10"))

// The above commit will not make the session in transaction.
tk.MustExec("insert into t values (11)")
tk1.MustQuery("select * from t").Check(testkit.Rows("5", "6", "7", "8", "9", "10", "11"))

tk.MustExec("delete from t")
tk.MustExec("SET SESSION autocommit = 0")
tk.MustExec("insert into t values (1)")
tk.MustExec("insert into t values (2)")
tk.MustExec("insert into t values (3)")
tk.MustExec("rollback")
tk1.MustExec("insert into t values (4)")
tk1.MustExec("insert into t values (5)")
tk.MustQuery("select * from t").Check(testkit.Rows("4", "5"))
}

func (s *testSessionSuite) TestCastTimeToDate(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set time_zone = '-8:00'")
Expand Down
21 changes: 15 additions & 6 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
var rs sqlexec.RecordSet
se := sctx.(*session)
rs, err = s.Exec(ctx)
sessVars := se.GetSessionVars()
// All the history should be added here.
se.GetSessionVars().TxnCtx.StatementCount++
sessVars.TxnCtx.StatementCount++
if !s.IsReadOnly() {
if err == nil {
GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx)
Expand All @@ -167,7 +168,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
}
}
}
if !se.sessionVars.InTxn() {
if !sessVars.InTxn() {
if err != nil {
log.Info("RollbackTxn for ddl/autocommit error.")
err1 := se.RollbackTxn(ctx)
Expand All @@ -180,10 +181,18 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
// So we limit the statement count in a transaction here.
history := GetHistory(sctx)
if history.Count() > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
err1 := se.RollbackTxn(ctx)
terror.Log(errors.Trace(err1))
return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
history.Count(), sctx.GetSessionVars().IsAutocommit())
if !sessVars.BatchCommit {
err1 := se.RollbackTxn(ctx)
terror.Log(errors.Trace(err1))
return rs, errors.Errorf("statement count %d exceeds the transaction limitation, autocommit = %t",
history.Count(), sctx.GetSessionVars().IsAutocommit())
}
err = se.NewTxn(ctx)
// The transaction does not committed yet, we need to keep it in transaction.
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
se.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if the last statement is a rollback, will there be a corner case that this line put the session into a new transaction while it should be rollbacked ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rollback is the same as commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the comment and add a rollback test case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s1:
set @@autocommit = 0;
insert into t value (1);
rollback;

s2:
insert into t values (1);
insert into t values (2);

s1:
select  * from t;    // what it see ?

@jackysp

Copy link
Member Author

@jackysp jackysp Nov 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the statement count limit is 1 for this case?

}
}
if se.txn.pending() {
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ type SessionVars struct {
// BatchDelete indicates if we should split delete data into multiple batches.
BatchDelete bool

// BatchCommit indicates if we should split the transaction into multiple batches.
BatchCommit bool

// IDAllocator is provided by kvEncoder, if it is provided, we will use it to alloc auto id instead of using
// Table.alloc.
IDAllocator autoid.Allocator
Expand Down Expand Up @@ -633,6 +636,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.BatchInsert = TiDBOptOn(val)
case TiDBBatchDelete:
s.BatchDelete = TiDBOptOn(val)
case TiDBBatchCommit:
s.BatchCommit = TiDBOptOn(val)
case TiDBDMLBatchSize:
s.DMLBatchSize = tidbOptPositiveInt32(val, DefDMLBatchSize)
case TiDBCurrentTS, TiDBConfig:
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBSkipUTF8Check, boolToIntStr(DefSkipUTF8Check)},
{ScopeSession, TiDBBatchInsert, boolToIntStr(DefBatchInsert)},
{ScopeSession, TiDBBatchDelete, boolToIntStr(DefBatchDelete)},
{ScopeSession, TiDBBatchCommit, boolToIntStr(DefBatchCommit)},
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ const (
// split data into multiple batches and use a single txn for each batch. This will be helpful when deleting large data.
TiDBBatchDelete = "tidb_batch_delete"

// tidb_batch_commit is used to enable/disable auto-split the transaction.
// If set this option on, the transaction will be committed when it reaches stmt-count-limit and starts a new transaction.
TiDBBatchCommit = "tidb_batch_commit"

// tidb_dml_batch_size is used to split the insert/delete data into small batches.
// It only takes effort when tidb_batch_insert/tidb_batch_delete is on.
// Its default value is 20000. When the row size is large, 20k rows could be larger than 100MB.
Expand Down Expand Up @@ -243,6 +247,7 @@ const (
DefOptInSubqToJoinAndAgg = true
DefBatchInsert = false
DefBatchDelete = false
DefBatchCommit = false
DefCurretTS = 0
DefMaxChunkSize = 32
DefDMLBatchSize = 20000
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
case AutocommitVar, TiDBSkipUTF8Check, TiDBOptAggPushDown,
TiDBOptInSubqToJoinAndAgg,
TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming,
TiDBBatchDelete, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction:
TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction:
if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" {
return value, nil
}
Expand Down