Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support pessimistic transaction (experimental feature) #10297

Merged
merged 13 commits into from May 11, 2019
@@ -76,6 +76,7 @@ type Config struct {
Binlog Binlog `toml:"binlog" json:"binlog"`
CompatibleKillQuery bool `toml:"compatible-kill-query" json:"compatible-kill-query"`
Plugin Plugin `toml:"plugin" json:"plugin"`
PessimisticTxn PessimisticTxn `toml:"pessimistic-txn" json:"pessimistic_txn"`
CheckMb4ValueInUTF8 bool `toml:"check-mb4-value-in-utf8" json:"check-mb4-value-in-utf8"`
// TreatOldVersionUTF8AsUTF8MB4 is use to treat old version table/column UTF8 charset as UTF8MB4. This is for compatibility.
// Currently not support dynamic modify, because this need to reload all old version schema.
@@ -285,6 +286,18 @@ type Plugin struct {
Load string `toml:"load" json:"load"`
}

// PessimisticTxn is the config for pessimistic transaction.
type PessimisticTxn struct {
// Enable must be true for 'begin lock' or session variable to start a pessimistic transaction.
Enable bool `toml:"enable" json:"enable"`
// Starts a pessimistic transaction by default when Enable is true.
Default bool `toml:"default" json:"default"`
// The max count of retry for a single statement in a pessimistic transaction.
MaxRetryCount uint `toml:"max-retry-count" json:"max-retry-count"`
// The pessimistic lock ttl in milliseconds.
TTL uint64 `toml:"ttl" json:"ttl"`
}

var defaultConf = Config{
Host: "0.0.0.0",
AdvertiseAddress: "",
@@ -368,6 +381,12 @@ var defaultConf = Config{
WriteTimeout: "15s",
Strategy: "range",
},
PessimisticTxn: PessimisticTxn{
Enable: false,
Default: false,
This conversation was marked as resolved by tiancaiamao

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

What's the purpose of Default ?

This comment has been minimized.

Copy link
@coocood

coocood Apr 30, 2019

Author Member

When Enable is false, even begin lock can't start a pessimistic transaction.
When Default is true, begin starts a pessimistic transaction.

MaxRetryCount: 256,
TTL: 60 * 1000,
},
}

var globalConf = defaultConf
@@ -282,4 +282,17 @@ ignore-error = false
binlog-socket = ""

# the strategy for sending binlog to pump, value can be "range" or "hash" now.
strategy = "range"
strategy = "range"

[pessimistic-txn]
# enable pessimistic transaction.
enable = false

# start pessimistic transaction by default.
default = false

# max retry count for a statement in a pessimistic transaction.
max-retry-count = 256

# default TTL in milliseconds for pessimistic lock.
ttl = 60000
@@ -793,7 +793,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(idxRecord.key)
err := txn.LockKeys(context.Background(), 0, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
@@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync/atomic"
"time"
@@ -39,6 +40,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
@@ -151,8 +153,9 @@ type ExecStmt struct {

Ctx sessionctx.Context
// StartTime stands for the starting time when executing the statement.
StartTime time.Time
isPreparedStmt bool
StartTime time.Time
isPreparedStmt bool
isSelectForUpdate bool
}

// OriginText returns original statement as a string.
@@ -246,8 +249,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

isPessimistic := sctx.GetSessionVars().TxnCtx.IsPessimistic

// Special handle for "select for update statement" in pessimistic transaction.
if isPessimistic && a.isSelectForUpdate {
return a.handlePessimisticSelectForUpdate(ctx, sctx, e)
}

// If the executor doesn't return any result to the client, we execute it without delay.
if e.Schema().Len() == 0 {
if isPessimistic {
return nil, a.handlePessimisticDML(ctx, sctx, e)
}
return a.handleNoDelayExecutor(ctx, sctx, e)
} else if proj, ok := e.(*ProjectionExec); ok && proj.calculateNoDelay {
// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
@@ -257,9 +270,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}

var txnStartTS uint64
txn, err1 := sctx.Txn(false)
if err1 != nil {
return nil, err
txn, err := sctx.Txn(false)
if err != nil {
return nil, errors.Trace(err)
This conversation was marked as resolved by lysu

This comment has been minimized.

Copy link
@lysu

lysu May 5, 2019

Member
Suggested change
return nil, errors.Trace(err)
return nil, err
}
if txn.Valid() {
txnStartTS = txn.StartTS()
@@ -271,6 +284,102 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

type chunkRowRecordSet struct {
rows []chunk.Row
idx int
fields []*ast.ResultField
e Executor
}

func (c *chunkRowRecordSet) Fields() []*ast.ResultField {
return c.fields
}

func (c *chunkRowRecordSet) Next(ctx context.Context, req *chunk.RecordBatch) error {
chk := req.Chunk
chk.Reset()
for !chk.IsFull() && c.idx < len(c.rows) {
chk.AppendRow(c.rows[c.idx])
c.idx++
}
return nil
}

func (c *chunkRowRecordSet) NewRecordBatch() *chunk.RecordBatch {
return chunk.NewRecordBatch(c.e.newFirstChunk())
}

func (c *chunkRowRecordSet) Close() error {
return nil
}

func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
retryCnt := uint(0)
for {
rs, err := a.runPessimisticSelectForUpdate(ctx, sctx, e)
if err == nil {
return rs, nil
}
// Retry this "select for update" statement using a new startTS.
if !strings.Contains(err.Error(), tidbutil.WriteConflictMarker) {
This conversation was marked as resolved by jackysp

This comment has been minimized.

Copy link
@jackysp

jackysp May 5, 2019

Member

It is better to retry for try again later.

This comment has been minimized.

Copy link
@coocood

coocood May 5, 2019

Author Member

I suppose 'try again later' is handled by tikv-client.

return nil, err
}
if retryCnt >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return nil, errors.New("pessimistic max retry count reached")
}
retryCnt++
conflictTS := extractConflictTS(err.Error())
if conflictTS > txnCtx.GetForUpdateTS() {
txnCtx.SetForUpdateTS(conflictTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
This conversation was marked as resolved by jackysp

This comment has been minimized.

Copy link
@jackysp

jackysp May 5, 2019

Member

should we add a backoff for this, instead of return when it meets some errors.

This comment has been minimized.

Copy link
@coocood

coocood May 5, 2019

Author Member

Do you mean retry to get the timestamp?
I think it is already done in the pd-client.

This comment has been minimized.

Copy link
@jackysp

jackysp May 6, 2019

Member

I think pd-client won't do it.

if err1 != nil {
return nil, err1
}
txnCtx.SetForUpdateTS(ts)
}
e, err = a.buildExecutor(sctx)
if err != nil {
return nil, errors.Trace(err)
}
if err = e.Open(ctx); err != nil {
return nil, errors.Trace(err)
}
}
}

func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
rs := &recordSet{
executor: e,
stmt: a,
}
defer func() {
terror.Log(rs.Close())
}()

var rows []chunk.Row
var err error
fields := rs.Fields()
req := rs.NewRecordBatch()
for {
err = rs.Next(ctx, req)
if err != nil {
// Handle 'write conflict' error.
break
}
if req.NumRows() == 0 {
return &chunkRowRecordSet{rows: rows, fields: fields, e: e}, nil
This conversation was marked as resolved by lysu

This comment has been minimized.

Copy link
@lysu

lysu May 5, 2019

Member

maybe we should easier to meet OOM when we "select for update" a large num of records?

}
iter := chunk.NewIterator4Chunk(req.Chunk)
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
rows = append(rows, r)
}
req.Chunk = chunk.Renew(req.Chunk, sctx.GetSessionVars().MaxChunkSize)
}
return nil, err
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context()))
@@ -307,8 +416,92 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
if err != nil {
return nil, err
}
return nil, err
}

func (a *ExecStmt) handlePessimisticDML(ctx context.Context, sctx sessionctx.Context, e Executor) error {
This conversation was marked as resolved by jackysp

This comment has been minimized.

Copy link
@jackysp

jackysp May 5, 2019

Member

This func is similar tohandlePessimisticSelectForUpdate.

txn, err := sctx.Txn(true)
if err != nil {
return err
}
txnCtx := sctx.GetSessionVars().TxnCtx
retryCnt := uint(0)
for {
_, err = a.handleNoDelayExecutor(ctx, sctx, e)
if err != nil {
return err
}
p := txn.(pessimisticTxn)
keys, err1 := p.KeysNeedToLock()
if err1 != nil {
return err1
}
if len(keys) == 0 {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, forUpdateTS, keys...)
if err == nil || !strings.Contains(err.Error(), tidbutil.WriteConflictMarker) {
return err
}
if retryCnt >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount {
return errors.New("pessimistic lock retry limit reached")
}
retryCnt++
conflictTS := extractConflictTS(err.Error())
if conflictTS == 0 {
logutil.Logger(ctx).Warn("failed to extract conflictTS from a conflict error")
}
logutil.Logger(ctx).Info("pessimistic write conflict, retry statement",
zap.Uint64("txn", txn.StartTS()),
zap.Uint64("forUpdateTS", forUpdateTS),
zap.Uint64("conflictTS", conflictTS))
if conflictTS > forUpdateTS {
txnCtx.SetForUpdateTS(conflictTS)
} else {
ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx)
if err1 != nil {
return err1
}
txnCtx.SetForUpdateTS(ts)
}
e, err = a.buildExecutor(sctx)
if err != nil {
return err
}

// Rollback the statement change before retry it.
sctx.StmtRollback()
sctx.GetSessionVars().StmtCtx.ResetForRetry()

if err = e.Open(ctx); err != nil {
return err
}
}
}

func extractConflictTS(errStr string) uint64 {
strs := strings.Split(errStr, "conflictTS=")
This conversation was marked as resolved by tiancaiamao

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

Depends on error message format makes the code fragile
Is it possible to add a test for this in case the error message change in the future?

This comment has been minimized.

Copy link
@coocood

coocood Apr 30, 2019

Author Member

It's not easy to write the test because the code formats the error message is in 'tikv' package.
We can log a warning when failed to extract conflictTS.

if len(strs) != 2 {
return 0
}
tsPart := strs[1]
length := strings.IndexByte(tsPart, ',')
if length < 0 {
return 0
}
tsStr := tsPart[:length]
ts, err := strconv.ParseUint(tsStr, 10, 64)
if err != nil {
return 0
}
return ts
}

return nil, nil
type pessimisticTxn interface {
kv.Transaction
// KeysNeedToLock returns the keys need to be locked.
KeysNeedToLock() ([]kv.Key, error)
}

// buildExecutor build a executor from plan, prepared statement may need additional procedure.
@@ -354,14 +547,15 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build()
err := executorExec.Build(b)
if err != nil {
return nil, err
}
a.isPreparedStmt = true
a.Plan = executorExec.plan
e = executorExec.stmtExec
}
a.isSelectForUpdate = b.isSelectForUpdate
return e, nil
}

@@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(recordKey)
err := txn.LockKeys(ctx, 0, recordKey)
if err != nil {
return result, err
}
@@ -71,7 +71,8 @@ type executorBuilder struct {
is infoschema.InfoSchema
startTS uint64 // cached when the first time getStartTS() is called
// err is set when there is error happened during Executor building process.
err error
err error
isSelectForUpdate bool
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema) *executorBuilder {
@@ -470,6 +471,10 @@ func (b *executorBuilder) buildDeallocate(v *plannercore.Deallocate) Executor {
}

func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor {
b.isSelectForUpdate = true
// Build 'select for update' using the 'for update' ts.
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

Add txn.IsPessimistic check?

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

This line seems not necessary

This comment has been minimized.

Copy link
@tiancaiamao

tiancaiamao Apr 29, 2019

Contributor

Why not set b.startTS at the beginning, when builder is constructed?


src := b.build(v.Children()[0])
if b.err != nil {
return nil
@@ -590,6 +595,7 @@ func (b *executorBuilder) buildSet(v *plannercore.Set) Executor {
}

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selectExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
@@ -1254,6 +1260,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
@@ -1335,6 +1342,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
b.startTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
selExec := b.build(v.SelectPlan)
if b.err != nil {
return nil
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.