Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Jan 11, 2023
1 parent 0748d34 commit 216f739
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 35 deletions.
13 changes: 5 additions & 8 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,11 @@ func (t *mockTxn) Mem() uint64 {
return 0
}

func (t *mockTxn) StartAggressiveLocking() {}
func (t *mockTxn) RetryAggressiveLocking(_ context.Context) {}
func (t *mockTxn) CancelAggressiveLocking(_ context.Context) {}
func (t *mockTxn) DoneAggressiveLocking(_ context.Context) {}

func (t *mockTxn) IsInAggressiveLockingMode() bool {
return false
}
func (t *mockTxn) StartAggressiveLocking() error { return nil }
func (t *mockTxn) RetryAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) CancelAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) DoneAggressiveLocking(_ context.Context) error { return nil }
func (t *mockTxn) IsInAggressiveLockingMode() bool { return false }

// newMockTxn new a mockTxn.
func newMockTxn() Transaction {
Expand Down
8 changes: 4 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ type AssertionProto interface {

// AggressiveLockingController is the interface that defines aggressive locking related operations.
type AggressiveLockingController interface {
StartAggressiveLocking()
RetryAggressiveLocking(ctx context.Context)
CancelAggressiveLocking(ctx context.Context)
DoneAggressiveLocking(ctx context.Context)
StartAggressiveLocking() error
RetryAggressiveLocking(ctx context.Context) error
CancelAggressiveLocking(ctx context.Context) error
DoneAggressiveLocking(ctx context.Context) error
IsInAggressiveLockingMode() bool
}

Expand Down
55 changes: 37 additions & 18 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error {

if txn.enterAggressiveLockingOnValid {
txn.enterAggressiveLockingOnValid = false
txn.Transaction.StartAggressiveLocking()
err = txn.Transaction.StartAggressiveLocking()
if err != nil {
return err
}
}

// The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them.
Expand Down Expand Up @@ -471,64 +474,80 @@ func (txn *LazyTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn fu
}

// StartAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization.
func (txn *LazyTxn) StartAggressiveLocking() {
func (txn *LazyTxn) StartAggressiveLocking() error {
if txn.Valid() {
txn.Transaction.StartAggressiveLocking()
return txn.Transaction.StartAggressiveLocking()
} else if txn.pending() {
txn.enterAggressiveLockingOnValid = true
} else {
panic("trying to start aggressive locking on a transaction in invalid state")
err := errors.New("trying to start aggressive locking on a transaction in invalid state")
logutil.BgLogger().Error("unexpected error when starting aggressive locking", zap.Error(err), zap.Stringer("txn", txn))
return err
}
return nil
}

// RetryAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization.
func (txn *LazyTxn) RetryAggressiveLocking(ctx context.Context) {
func (txn *LazyTxn) RetryAggressiveLocking(ctx context.Context) error {
if txn.Valid() {
txn.Transaction.RetryAggressiveLocking(ctx)
return txn.Transaction.RetryAggressiveLocking(ctx)
} else if !txn.pending() {
panic("trying to retry aggressive locking on a transaction in invalid state")
err := errors.New("trying to retry aggressive locking on a transaction in invalid state")
logutil.BgLogger().Error("unexpected error when retrying aggressive locking", zap.Error(err), zap.Stringer("txnStartTS", txn))
return err
}
return nil
}

// CancelAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization.
func (txn *LazyTxn) CancelAggressiveLocking(ctx context.Context) {
func (txn *LazyTxn) CancelAggressiveLocking(ctx context.Context) error {
if txn.Valid() {
txn.Transaction.CancelAggressiveLocking(ctx)
return txn.Transaction.CancelAggressiveLocking(ctx)
} else if txn.pending() {
if txn.enterAggressiveLockingOnValid {
txn.enterAggressiveLockingOnValid = false
} else {
panic("trying to cancel aggressive locking when it's not started")
err := errors.New("trying to cancel aggressive locking when it's not started")
logutil.BgLogger().Error("unexpected error when cancelling aggressive locking", zap.Error(err), zap.Stringer("txnStartTS", txn))
return err
}
} else {
panic("trying to cancel aggressive locking on a transaction in invalid state")
err := errors.New("trying to cancel aggressive locking on a transaction in invalid state")
logutil.BgLogger().Error("unexpected error when cancelling aggressive locking", zap.Error(err), zap.Stringer("txnStartTS", txn))
return err
}
return nil
}

// DoneAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization.
func (txn *LazyTxn) DoneAggressiveLocking(ctx context.Context) {
func (txn *LazyTxn) DoneAggressiveLocking(ctx context.Context) error {
if txn.Valid() {
txn.Transaction.DoneAggressiveLocking(ctx)
return txn.Transaction.DoneAggressiveLocking(ctx)
} else if txn.pending() {
if txn.enterAggressiveLockingOnValid {
txn.enterAggressiveLockingOnValid = false
} else {
panic("trying to finish aggressive locking when it's not started")
err := errors.New("trying to finish aggressive locking when it's not started")
logutil.BgLogger().Error("unexpected error when finishing aggressive locking")
return err
}
} else {
panic("trying to cancel aggressive locking on a transaction in invalid state")
err := errors.New("trying to cancel aggressive locking on a transaction in invalid state")
logutil.BgLogger().Error("unexpected error when finishing aggressive locking")
return err
}
return nil
}

// IsInAggressiveLockingMode wraps the inner transaction to support using aggressive locking with lazy initialization.
func (txn *LazyTxn) IsInAggressiveLockingMode() bool {
if txn.Valid() {
return txn.Transaction.IsInAggressiveLockingMode()
}
if txn.pending() {
} else if txn.pending() {
return txn.enterAggressiveLockingOnValid
} else {
return false
}
return false
}

func (txn *LazyTxn) reset() {
Expand Down
16 changes: 12 additions & 4 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,9 @@ func (p *basePessimisticTxnContextProvider) OnHandlePessimisticStmtStart(ctx con
return err
}
if p.sctx.GetSessionVars().PessimisticTransactionAggressiveLocking && p.txn != nil {
p.txn.StartAggressiveLocking()
if err := p.txn.StartAggressiveLocking(); err != nil {
return err
}
}
return nil
}
Expand All @@ -525,7 +527,9 @@ func (p *basePessimisticTxnContextProvider) OnStmtRetry(ctx context.Context) err
return err
}
if p.txn != nil && p.txn.IsInAggressiveLockingMode() {
p.txn.RetryAggressiveLocking(ctx)
if err := p.txn.RetryAggressiveLocking(ctx); err != nil {
return err
}
}
return nil
}
Expand All @@ -536,7 +540,9 @@ func (p *basePessimisticTxnContextProvider) OnStmtCommit(ctx context.Context) er
return err
}
if p.txn != nil && p.txn.IsInAggressiveLockingMode() {
p.txn.DoneAggressiveLocking(ctx)
if err := p.txn.DoneAggressiveLocking(ctx); err != nil {
return err
}
}
return nil
}
Expand All @@ -547,7 +553,9 @@ func (p *basePessimisticTxnContextProvider) OnStmtRollback(ctx context.Context,
return err
}
if !isForPessimisticRetry && p.txn != nil && p.txn.IsInAggressiveLockingMode() {
p.txn.CancelAggressiveLocking(ctx)
if err := p.txn.CancelAggressiveLocking(ctx); err != nil {
return err
}
}
return nil
}
27 changes: 26 additions & 1 deletion store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (txn *tikvTxn) exitAggressiveLockingIfInapplicable(ctx context.Context, key
// Then the previously-locked keys during execution in this statement (if any) will be turned into the state
// as if they were locked in normal way.
// Note that the issue https://github.com/pingcap/tidb/issues/35682 also exists here.
txn.DoneAggressiveLocking(ctx)
txn.KVTxn.DoneAggressiveLocking(ctx)
}
}

Expand All @@ -382,6 +382,31 @@ func (txn *tikvTxn) generateWriteConflictForLockedWithConflict(lockCtx *kv.LockC
return nil
}

// StartAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController.
// TODO: Update the methods' signatures in client-go to avoid this adaptor functions.
func (txn *tikvTxn) StartAggressiveLocking() error {
txn.KVTxn.StartAggressiveLocking()
return nil
}

// RetryAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController.
func (txn *tikvTxn) RetryAggressiveLocking(ctx context.Context) error {
txn.KVTxn.RetryAggressiveLocking(ctx)
return nil
}

// CancelAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController.
func (txn *tikvTxn) CancelAggressiveLocking(ctx context.Context) error {
txn.KVTxn.CancelAggressiveLocking(ctx)
return nil
}

// DoneAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController.
func (txn *tikvTxn) DoneAggressiveLocking(ctx context.Context) error {
txn.KVTxn.DoneAggressiveLocking(ctx)
return nil
}

// TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed.
type TiDBKVFilter struct{}

Expand Down

0 comments on commit 216f739

Please sign in to comment.