Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support pessimistic transaction (experimental feature) #10297

Merged
merged 13 commits into from May 11, 2019

support pessimistic when autocommit = 0, and check unique conflict.

  • Loading branch information...
coocood committed Apr 29, 2019
commit 7c241fd8b332d67d830a8c036cf0a5e6321e5bc5
@@ -249,11 +249,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

txn, err := sctx.Txn(false)
if err != nil {
return nil, errors.Trace(err)
}
isPessimistic := txn.Valid() && txn.IsPessimistic()
isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic

// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic && a.isSelectForUpdate {
@@ -274,6 +270,10 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}

var txnStartTS uint64
txn, err := sctx.Txn(false)
if err != nil {
return nil, errors.Trace(err)
This conversation was marked as resolved by lysu

This comment has been minimized.

Copy link
@lysu

lysu May 5, 2019

Member
Suggested change
return nil, errors.Trace(err)
return nil, err
}
if txn.Valid() {
txnStartTS = txn.StartTS()
}
@@ -420,7 +420,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, sctx sessionctx.Context, e Executor) error {
This conversation was marked as resolved by jackysp

This comment has been minimized.

Copy link
@jackysp

jackysp May 5, 2019

Member

This func is similar tohandlePessimisticSelectForUpdate.

txn, err := sctx.Txn(false)
txn, err := sctx.Txn(true)
if err != nil {
return err
}
@@ -303,12 +303,15 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// reverts to its previous state.
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
// Call ctx.Txn(true) to active pending txn.
pTxnConf := config.GetGlobalConfig().PessimisticTxn
if pTxnConf.Enable && s.Pessimistic {
e.ctx.GetSessionVars().TxnCtx.IsPessimistic = true
}
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
pTxnConf := config.GetGlobalConfig().PessimisticTxn
if pTxnConf.Enable && (s.Pessimistic || pTxnConf.Default || e.ctx.GetSessionVars().PessimisticLock) {
if e.ctx.GetSessionVars().TxnCtx.IsPessimistic {
txn.SetOption(kv.Pessimistic, true)
}
return nil
@@ -1162,6 +1162,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
return &s.txn, err
}
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
if s.sessionVars.TxnCtx.IsPessimistic {
s.txn.SetOption(kv.Pessimistic, true)
}
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
@@ -1674,6 +1677,12 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
SchemaVersion: is.SchemaMetaVersion(),
CreateTime: time.Now(),
}
if !s.sessionVars.IsAutocommit() {
txnConf := config.GetGlobalConfig().PessimisticTxn
if txnConf.Enable && (txnConf.Default || s.sessionVars.PessimisticLock) {
s.sessionVars.TxnCtx.IsPessimistic = true
}
}
}

// RefreshTxnCtx implements context.RefreshTxnCtx interface.
@@ -109,6 +109,7 @@ type TransactionContext struct {
StartTS uint64
Shard *int64
TableDeltaMap map[int64]TableDelta
IsPessimistic bool

// For metrics.
CreateTime time.Time
@@ -540,10 +540,15 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = &pb.Mutation{
mut := &pb.Mutation{
Op: pb.Op_PessimisticLock,
Key: k,
}
conditionPair := c.txn.us.LookupConditionPair(k)
if conditionPair != nil && conditionPair.ShouldNotExist() {
mut.Assertion = pb.Assertion_NotExist
}
mutations[i] = mut
}

req := &tikvrpc.Request{
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.