Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support pessimistic transaction (experimental feature) #10297

Merged
merged 13 commits into from May 11, 2019
@@ -76,6 +76,7 @@ type Config struct {
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
PessimisticTxn PessimisticTxn `toml:"pessimistic-txn" json:"pessimistic_txn"`
CheckMb4ValueInUTF8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
// TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility.
// Currently not support dynamic modify, because this need to reload all old version schema.
@@ -285,6 +286,18 @@ type Plugin struct {
Load string `toml:"load" json:"load"`
}

// PessimisticTxn is the config for pessimistic transaction.
type PessimisticTxn struct {
// Enable must be true for 'begin lock' or session variable to start a pessimistic transaction.
Enable bool `toml:"enable" json:"enable"`
// Starts a pessimistic transaction by default when Enable is true.
Default bool `toml:"default" json:"default"`
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
// The pessimistic lock ttl in milliseconds.
TTL uint64 `toml:"ttl" json:"ttl"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
@@ -368,6 +381,12 @@ var defaultConf = Config{
WriteTimeout: "15s",
Strategy: "range",
},
PessimisticTxn: PessimisticTxn{
Enable: false,
Default: false,
This conversation was marked as resolved by tiancaiamao

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

What's the purpose of Default ?

This comment has been minimized.

Copy link
@coocood

coocood Apr 30, 2019

Author Member

When Enable is false, even begin lock can't start a pessimistic transaction.
When Default is true, begin starts a pessimistic transaction.

MaxRetryCount: 256,
TTL: 60 * 1000,
},
}

var globalConf = defaultConf
@@ -282,4 +282,17 @@ ignore-error = false
binlog-socket = ""

# the strategy for sending binlog to pump, value can be "range" or "hash" now.
strategy = "range"
strategy = "range"

[pessimistic-txn]
# enable pessimistic transaction.
enable = false

# start pessimistic transaction by default.
default = false

# max retry count for a statement in a pessimistic transaction.
max-retry-count = 256

# default TTL in milliseconds for pessimistic lock.
ttl = 60000
@@ -793,7 +793,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(idxRecord.key)
err := txn.LockKeys(context.Background(), 0, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
@@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
@@ -39,6 +40,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
@@ -151,8 +153,10 @@ type ExecStmt struct {

Ctx sessionctx.Context
// StartTime stands for the starting time when executing the statement.
StartTime time.Time
isPreparedStmt bool
StartTime time.Time
isPreparedStmt bool
isSelectForUpdate bool
retryCount uint
}

// OriginText returns original statement as a string.
@@ -219,7 +223,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}()
}

e, err := a.buildExecutor(sctx)
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
@@ -246,19 +250,29 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic

// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic && a.isSelectForUpdate {
return a.handlePessimisticSelectForUpdate(ctx, e)
}

// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
return a.handleNoDelayExecutor(ctx, sctx, e)
if isPessimistic {
return nil, a.handlePessimisticDML(ctx, e)
}
return a.handleNoDelayExecutor(ctx, e)
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
return a.handleNoDelayExecutor(ctx, sctx, e)
return a.handleNoDelayExecutor(ctx, e)
}

var txnStartTS uint64
txn, err1 := sctx.Txn(false)
if err1 != nil {
txn, err := sctx.Txn(false)
if err != nil {
return nil, err
}
if txn.Valid() {
@@ -271,7 +285,81 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
type chunkRowRecordSet struct {
rows []chunk.Row
idx int
fields []*ast.ResultField
e Executor
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}

func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
chk := req.Chunk
chk.Reset()
for !chk.IsFull() && c.idx < len(c.rows) {
chk.AppendRow(c.rows[c.idx])
c.idx++
}
return nil
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(c.e.newFirstChunk())
}

func (c *chunkRowRecordSet) Close() error {
return nil
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
for {
rs, err := a.runPessimisticSelectForUpdate(ctx, e)
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return nil, err
}
if e == nil {
return rs, nil
}
}
}

func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
rs := &recordSet{
executor: e,
stmt: a,
}
defer func() {
terror.Log(rs.Close())
}()

var rows []chunk.Row
var err error
fields := rs.Fields()
req := rs.NewRecordBatch()
for {
err = rs.Next(ctx, req)
if err != nil {
// Handle 'write conflict' error.
break
}
if req.NumRows() == 0 {
return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil
This conversation was marked as resolved by lysu

This comment has been minimized.

Copy link
@lysu

lysu May 5, 2019

Member

maybe we should easier to meet OOM when we "select for update" a large num of records?

}
iter := chunk.NewIterator4Chunk(req.Chunk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req.Chunk = chunk.Renew(req.Chunk, a.Ctx.GetSessionVars().MaxChunkSize)
}
return nil, err
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) {
sctx := a.Ctx
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
defer span1.Finish()
@@ -307,12 +395,114 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
if err != nil {
return nil, err
}
return nil, err
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
sctx := a.Ctx
txn, err := sctx.Txn(true)
if err != nil {
return err
}
txnCtx := sctx.GetSessionVars().TxnCtx
for {
_, err = a.handleNoDelayExecutor(ctx, e)
if err != nil {
return err
}
keys, err1 := txn.(pessimisticTxn).KeysNeedToLock()
if err1 != nil {
return err1
}
if len(keys) == 0 {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return err
}
if e == nil {
return nil
}
}
}

// handlePessimisticLockError updates TS and rebuild executor if the err is write conflict.
func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (Executor, error) {
if err == nil {
return nil, nil
}
errStr := err.Error()
if !strings.Contains(errStr, tidbutil.WriteConflictMarker) {
return nil, err
}
if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic lock retry limit reached")
}
a.retryCount++
conflictTS := extractConflictTS(errStr)
if conflictTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictTS from a conflict error")
}
sctx := a.Ctx
txnCtx := sctx.GetSessionVars().TxnCtx
forUpdateTS := txnCtx.GetForUpdateTS()
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.Uint64("conflictTS", conflictTS))
if conflictTS > txnCtx.GetForUpdateTS() {
txnCtx.SetForUpdateTS(conflictTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
if err1 != nil {
return nil, err1
}
txnCtx.SetForUpdateTS(ts)
}
e, err := a.buildExecutor()
if err != nil {
return nil, err
}
// Rollback the statement change before retry it.
sctx.StmtRollback()
sctx.GetSessionVars().StmtCtx.ResetForRetry()

if err = e.Open(ctx); err != nil {
return nil, err
}
return e, nil
}

func extractConflictTS(errStr string) uint64 {
strs := strings.Split(errStr, "conflictTS=")
This conversation was marked as resolved by tiancaiamao

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

Depends on error message format makes the code fragile
Is it possible to add a test for this in case the error message change in the future?

This comment has been minimized.

Copy link
@coocood

coocood Apr 30, 2019

Author Member

It's not easy to write the test because the code formats the error message is in 'tikv' package.
We can log a warning when failed to extract conflictTS.

if len(strs) != 2 {
return 0
}
tsPart := strs[1]
length := strings.IndexByte(tsPart, ',')
if length < 0 {
return 0
}
tsStr := tsPart[:length]
ts, err := strconv.ParseUint(tsStr, 10, 64)
if err != nil {
return 0
}
return ts
}

return nil, nil
type pessimisticTxn interface {
kv.Transaction
// KeysNeedToLock returns the keys need to be locked.
KeysNeedToLock() ([]kv.Key, error)
}

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx := a.Ctx
if _, ok := a.Plan.(*plannercore.Execute); !ok {
// Do not sync transaction for Execute statement, because the real optimization work is done in
// "ExecuteExec.Build".
@@ -354,14 +544,15 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build()
err := executorExec.Build(b)
if err != nil {
return nil, err
}
a.isPreparedStmt = true
a.Plan = executorExec.plan
e = executorExec.stmtExec
}
a.isSelectForUpdate = b.isSelectForUpdate
return e, nil
}

@@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(recordKey)
err := txn.LockKeys(ctx, 0, recordKey)
if err != nil {
return result, err
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.