Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Optimize pessimistic transaction by supporting locking with conflict #35588

Merged
merged 49 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
4ff1c14
update depencency
MyonKeminta Jun 16, 2022
6834395
Support aggressive locking
MyonKeminta Jun 21, 2022
776eb75
Refactory
MyonKeminta Jun 21, 2022
d53d09b
update client-go
MyonKeminta Jun 24, 2022
0e10f92
Fix missing conflict handling
MyonKeminta Jun 24, 2022
b6d92f4
update client-go
MyonKeminta Jun 24, 2022
96ed021
Skip deadlock related tests; remove too-verbose logs
MyonKeminta Jun 27, 2022
7b7d3d7
Add metrics
MyonKeminta Jun 28, 2022
5231f1b
Fix bucket settings
MyonKeminta Jun 29, 2022
f8e81e3
update client-go to fix transaction IsReadOnly didn't check aggressiv…
MyonKeminta Jun 29, 2022
7aadc23
update client-go
MyonKeminta Jul 27, 2022
90ed173
update client-go
MyonKeminta Jul 28, 2022
e64d13a
update client-go
MyonKeminta Aug 2, 2022
879477d
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Aug 3, 2022
94044ee
update client-go to fix some panics
MyonKeminta Aug 3, 2022
8cb818d
update client-go to fix the PessimisticLockNotFound issue
MyonKeminta Aug 4, 2022
4979123
support skipping resolve locks
MyonKeminta Aug 8, 2022
da6c7d4
Revert "executor: also collect unchanged unique keys for lock (#36498)"
MyonKeminta Aug 10, 2022
070f842
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Nov 28, 2022
65f200b
Fix bazel files
MyonKeminta Nov 28, 2022
a17a15a
Support locking with conflict in unistore
MyonKeminta Nov 28, 2022
c204143
Fix missing part of result filling in unitore
MyonKeminta Nov 29, 2022
20434bf
update client-go; fix build
MyonKeminta Nov 29, 2022
8e81a95
Fix bazel
MyonKeminta Nov 30, 2022
582b7ac
Fix lint
MyonKeminta Nov 30, 2022
ae1da53
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Nov 30, 2022
8303381
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Dec 7, 2022
f5c9eb8
Fix lint
MyonKeminta Dec 7, 2022
27ea2bb
Renaming; exit aggressive locking mode when trying to lock more than …
MyonKeminta Dec 12, 2022
c706bcc
Add tests
MyonKeminta Dec 21, 2022
e5f6602
Fix tests and temporarily diable some failed checks
MyonKeminta Dec 22, 2022
09f2e44
update client-go
MyonKeminta Dec 29, 2022
e71bf48
Fix test
MyonKeminta Dec 30, 2022
8a214d6
Update kvproto
MyonKeminta Jan 4, 2023
6f842f2
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Jan 4, 2023
d4a8f23
update client-go
MyonKeminta Jan 4, 2023
e819638
Address comments
MyonKeminta Jan 9, 2023
c3b55c1
Address comments
MyonKeminta Jan 10, 2023
e01610e
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Jan 10, 2023
f9629a4
Fix build
MyonKeminta Jan 10, 2023
ca141b6
Fix base reference in pessimistic txn providers
MyonKeminta Jan 10, 2023
8e1c744
Address comments
MyonKeminta Jan 10, 2023
1f95079
Fix tests
MyonKeminta Jan 10, 2023
0748d34
Fix the incorrect fix of nil check of txn in basePessimisticTxnCOntex…
MyonKeminta Jan 11, 2023
216f739
Address comments
MyonKeminta Jan 11, 2023
6c8fd71
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Jan 12, 2023
4aa76a7
Merge branch 'master' into m/pessimistic-lock-optimize
MyonKeminta Jan 12, 2023
3e8604d
Merge branch 'master' of https://github.com/pingcap/tidb into m/pessi…
MyonKeminta Jan 13, 2023
77c04bd
Merge branch 'master' into m/pessimistic-lock-optimize
ti-chi-bot Jan 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func TestAddColumn2(t *testing.T) {
require.NoError(t, err)
_, err = writeOnlyTable.AddRecord(tk.Session(), types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), table.IsUpdate)
require.NoError(t, err)
tk.Session().StmtCommit()
tk.Session().StmtCommit(ctx)
err = tk.Session().CommitTxn(ctx)
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,7 @@ func (s *session) begin() error {
}

func (s *session) commit() error {
s.StmtCommit()
s.StmtCommit(context.Background())
return s.CommitTxn(context.Background())
}

Expand All @@ -1546,12 +1546,12 @@ func (s *session) txn() (kv.Transaction, error) {
}

func (s *session) rollback() {
s.StmtRollback()
s.StmtRollback(context.Background(), false)
s.RollbackTxn(context.Background())
}

func (s *session) reset() {
s.StmtRollback()
s.StmtRollback(context.Background(), false)
}

func (s *session) execute(ctx context.Context, query string, label string) ([]chunk.Row, error) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestDDLStatsInfo(t *testing.T) {
require.NoError(t, err)
_, err = m.AddRecord(ctx, types.MakeDatums(3, 3))
require.NoError(t, err)
ctx.StmtCommit()
ctx.StmtCommit(context.Background())
require.NoError(t, ctx.CommitTxn(context.Background()))

job := buildCreateIdxJob(dbInfo, tblInfo, true, "idx", "c1")
Expand Down
56 changes: 50 additions & 6 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ var (
totalQueryProcHistogramInternal = metrics.TotalQueryProcHistogram.WithLabelValues(metrics.LblInternal)
totalCopProcHistogramInternal = metrics.TotalCopProcHistogram.WithLabelValues(metrics.LblInternal)
totalCopWaitHistogramInternal = metrics.TotalCopWaitHistogram.WithLabelValues(metrics.LblInternal)

selectForUpdateFirstAttemptDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("select-for-update", "first-attempt")
selectForUpdateRetryDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("select-for-update", "retry")
dmlFirstAttemptDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("dml", "first-attempt")
dmlRetryDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("dml", "retry")
)

// processinfoSetter is the interface use to set current running process info.
Expand Down Expand Up @@ -593,7 +598,7 @@ func (a *ExecStmt) handleStmtForeignKeyTrigger(ctx context.Context, e Executor)
// change first.
// Since `UnionScanExec` use `SnapshotIter` and `SnapshotGetter` to read txn mem-buffer, if we don't do `StmtCommit`,
// then the fk cascade executor can't read the mem-buffer changed by the ExecStmt.
a.Ctx.StmtCommit()
a.Ctx.StmtCommit(ctx)
}
err := a.handleForeignKeyTrigger(ctx, e, 1)
if err != nil {
Expand Down Expand Up @@ -684,7 +689,7 @@ func (a *ExecStmt) handleForeignKeyCascade(ctx context.Context, fkc *FKCascadeEx
}
// Call `StmtCommit` uses to flush the fk cascade executor change into txn mem-buffer,
// then the later fk cascade executors can see the mem-buffer changes.
a.Ctx.StmtCommit()
a.Ctx.StmtCommit(ctx)
err = a.handleForeignKeyTrigger(ctx, e, depth+1)
if err != nil {
return err
Expand Down Expand Up @@ -858,15 +863,34 @@ func (a *ExecStmt) handlePessimisticSelectForUpdate(ctx context.Context, e Execu
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
}

txnManager := sessiontxn.GetTxnManager(a.Ctx)
err := txnManager.OnHandlePessimisticStmtStart(ctx)
if err != nil {
return nil, err
}

isFirstAttempt := true

for {
startTime := time.Now()
rs, err := a.runPessimisticSelectForUpdate(ctx, e)

if isFirstAttempt {
selectForUpdateFirstAttemptDuration.Observe(time.Since(startTime).Seconds())
isFirstAttempt = false
} else {
selectForUpdateRetryDuration.Observe(time.Since(startTime).Seconds())
}

e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
return nil, err
}
if e == nil {
return rs, nil
}

failpoint.Inject("pessimisticSelectForUpdateRetry", nil)
}
}

Expand Down Expand Up @@ -962,18 +986,39 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) (err er
err = ErrLazyUniquenessCheckFailure.GenWithStackByArgs(err.Error())
}
}()

txnManager := sessiontxn.GetTxnManager(a.Ctx)
err = txnManager.OnHandlePessimisticStmtStart(ctx)
if err != nil {
return err
}

isFirstAttempt := true

for {
startPointGetLocking := time.Now()
if !isFirstAttempt {
failpoint.Inject("pessimisticDMLRetry", nil)
}

startTime := time.Now()
_, err = a.handleNoDelayExecutor(ctx, e)
if !txn.Valid() {
return err
}

if isFirstAttempt {
dmlFirstAttemptDuration.Observe(time.Since(startTime).Seconds())
isFirstAttempt = false
} else {
dmlRetryDuration.Observe(time.Since(startTime).Seconds())
}

if err != nil {
// It is possible the DML has point get plan that locks the key.
e, err = a.handlePessimisticLockError(ctx, err)
if err != nil {
if ErrDeadlock.Equal(err) {
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startPointGetLocking).Seconds())
metrics.StatementDeadlockDetectDuration.Observe(time.Since(startTime).Seconds())
}
return err
}
Expand Down Expand Up @@ -1071,7 +1116,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
return nil, err
}
// Rollback the statement change before retry it.
a.Ctx.StmtRollback()
a.Ctx.StmtRollback(ctx, true)
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()

Expand Down Expand Up @@ -1957,7 +2002,6 @@ func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) {
}
return sqlDigest, planDigest
}

func convertStatusIntoString(sctx sessionctx.Context, statsLoadStatus map[model.TableItemID]string) map[string]map[string]string {
if len(statsLoadStatus) < 1 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (e *DeleteExec) doBatchDelete(ctx context.Context) error {
return ErrBatchInsertFail.GenWithStack("BatchDelete failed with error: %v", err)
}
e.memTracker.Consume(-int64(txn.Size()))
e.ctx.StmtCommit()
e.ctx.StmtCommit(ctx)
if err := sessiontxn.NewTxnInStmt(ctx, e.ctx); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.GenWithStack("BatchDelete failed with error: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4379,7 +4379,7 @@ func TestAdminShowDDLJobs(t *testing.T) {
require.NoError(t, err)
err = meta.NewMeta(txn).AddHistoryDDLJob(job, true)
require.NoError(t, err)
tk.Session().StmtCommit()
tk.Session().StmtCommit(context.Background())

re = tk.MustQuery("admin show ddl jobs 1")
row = re.Rows()[0]
Expand Down
2 changes: 1 addition & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ func (e *InsertValues) doBatchInsert(ctx context.Context) error {
return ErrBatchInsertFail.GenWithStack("BatchInsert failed with error: %v", err)
}
e.memTracker.Consume(-int64(txn.Size()))
e.ctx.StmtCommit()
e.ctx.StmtCommit(ctx)
if err := sessiontxn.NewTxnInStmt(ctx, e.ctx); err != nil {
// We should return a special error for batch insert.
return ErrBatchInsertFail.GenWithStack("BatchInsert failed with error: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error
var err error
defer func() {
if err != nil {
e.Ctx.StmtRollback()
e.Ctx.StmtRollback(ctx, false)
}
}()
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
Expand All @@ -327,7 +327,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error
failpoint.Inject("commitOneTaskErr", func() error {
return errors.New("mock commit one task error")
})
e.Ctx.StmtCommit()
e.Ctx.StmtCommit(ctx)
// Make sure process stream routine never use invalid txn
e.txnInUse.Lock()
defer e.txnInUse.Unlock()
Expand All @@ -353,7 +353,7 @@ func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
e.ForceQuit()
}
if err != nil {
e.ctx.StmtRollback()
e.ctx.StmtRollback(ctx, false)
}
}()
var tasks uint64
Expand Down
4 changes: 2 additions & 2 deletions executor/writetest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1896,7 +1896,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, t *testing.T, tk *t
}
ld.SetMessage()
require.Equal(t, tt.expectedMsg, tk.Session().LastMessage())
ctx.StmtCommit()
ctx.StmtCommit(context.Background())
txn, err := ctx.Txn(true)
require.NoError(t, err)
err = txn.Commit(context.Background())
Expand Down Expand Up @@ -2353,7 +2353,7 @@ func TestLoadDataIntoPartitionedTable(t *testing.T) {
require.NoError(t, err)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
ctx.StmtCommit()
ctx.StmtCommit(context.Background())
txn, err := ctx.Txn(true)
require.NoError(t, err)
err = txn.Commit(context.Background())
Expand Down
6 changes: 6 additions & 0 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ func (t *mockTxn) Mem() uint64 {
return 0
}

func (t *mockTxn) StartAggressiveLocking() error { return nil }
func (t *mockTxn) RetryAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) CancelAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) DoneAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) IsInAggressiveLockingMode() bool { return false }

// newMockTxn new a mockTxn.
func newMockTxn() Transaction {
return &mockTxn{
Expand Down
10 changes: 10 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type LockCtx = tikvstore.LockCtx
type Transaction interface {
RetrieverMutator
AssertionProto
AggressiveLockingController
// Size returns sum of keys and values length.
Size() int
// Mem returns the memory consumption of the transaction.
Expand Down Expand Up @@ -281,6 +282,15 @@ type AssertionProto interface {
SetAssertion(key []byte, assertion ...FlagsOp) error
}

// AggressiveLockingController is the interface that defines aggressive locking related operations.
type AggressiveLockingController interface {
StartAggressiveLocking() error
RetryAggressiveLocking(ctx context.Context) error
CancelAggressiveLocking(ctx context.Context) error
DoneAggressiveLocking(ctx context.Context) error
IsInAggressiveLockingMode() bool
}

// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Expand Down
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func RegisterMetrics() {
prometheus.MustRegister(ReadFromTableCacheCounter)
prometheus.MustRegister(LoadTableCacheDurationHistogram)
prometheus.MustRegister(NonTransactionalDMLCount)
prometheus.MustRegister(PessimisticDMLDurationByAttempt)
prometheus.MustRegister(MemoryUsage)
prometheus.MustRegister(StatsCacheLRUCounter)
prometheus.MustRegister(StatsCacheLRUGauge)
Expand Down
9 changes: 9 additions & 0 deletions metrics/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ var (
Help: "Counter of setting tidb_constraint_check_in_place to false, note that it doesn't count the default value set by tidb config",
},
)

PessimisticDMLDurationByAttempt = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "session",
Name: "transaction_pessimistic_dml_duration_by_attempt",
Help: "Bucketed histogram of duration of pessimistic DMLs, distinguished by first attempt and retries",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 28), // 1ms ~ 1.5days
}, []string{LblType, LblPhase})
)

// Label constants.
Expand Down
6 changes: 3 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,10 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
}
s.txn.onStmtEnd()
if err != nil {
s.StmtRollback()
s.StmtRollback(ctx, false)
break
}
s.StmtCommit()
s.StmtCommit(ctx)
}
logutil.Logger(ctx).Warn("transaction association",
zap.Uint64("retrying txnStartTS", s.GetSessionVars().TxnCtx.StartTS),
Expand Down Expand Up @@ -2355,7 +2355,7 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
if rs != nil {
if se.GetSessionVars().StmtCtx.IsExplainAnalyzeDML {
if !sessVars.InTxn() {
se.StmtCommit()
se.StmtCommit(ctx)
if err := se.CommitTxn(ctx); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,9 @@ func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.St
// Handle the stmt commit/rollback.
if se.txn.Valid() {
if meetsErr != nil {
se.StmtRollback()
se.StmtRollback(ctx, false)
} else {
se.StmtCommit()
se.StmtCommit(ctx)
}
}
}
Expand Down