Skip to content

Commit

Permalink
For KeyIsLocked error reported for timeout, if a lock is recently upd…
Browse files Browse the repository at this point in the history
…ated, don't try to resolve it. (#758)

* update client-go; format

Signed-off-by: ekexium <eke@fastmail.com>

* feat: do not resolve lock if duration_to_last_updated is short

Signed-off-by: ekexium <eke@fastmail.com>

* adjust the threshold to 1200ms to allow small deviation

Signed-off-by: ekexium <eke@fastmail.com>

* fix: don't treat it as WriteConflict, simply retry

Signed-off-by: ekexium <eke@fastmail.com>

* update kvproto

Signed-off-by: ekexium <eke@fastmail.com>

* set the threshold to 300ms

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
  • Loading branch information
ekexium committed Apr 13, 2023
1 parent f3e8703 commit 67e56a9
Showing 1 changed file with 110 additions and 47 deletions.
157 changes: 110 additions & 47 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ type diagnosticContext struct {
reqDuration time.Duration
}

func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionPessimisticLock) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
convertMutationsToPb := func(committerMutations CommitterMutations) []*kvrpcpb.Mutation {
mutations := make([]*kvrpcpb.Mutation, committerMutations.Len())
c.txn.GetMemBuffer().RLock()
Expand All @@ -120,26 +122,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *

m := batch.mutations
mutations := convertMutationsToPb(m)
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
req := tikvrpc.NewRequest(
tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
},
)
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
}
Expand Down Expand Up @@ -168,8 +172,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
for _, m := range mutations {
keys = append(keys, hex.EncodeToString(m.Key))
}
logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
logutil.BgLogger().Info(
"[failpoint] injected lock ttl = 1 on pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys),
)
}
req.PessimisticLock().LockTtl = ttl
if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil {
Expand Down Expand Up @@ -221,7 +227,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
}

func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error) (finished bool, err error) {
func (action actionPessimisticLock) handleRegionError(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error,
) (finished bool, err error) {
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
Expand All @@ -242,7 +250,13 @@ func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *
return true, err
}

func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError) (locks []*txnlock.Lock, finished bool, err error) {
// When handling wait timeout, if the current lock is updated within the threshold, do not try to resolve lock
// The default timeout in TiKV is 1 second. 300ms should be appropriate for common hot update workloads.
const skipResolveThresholdMs = 300

func (action actionPessimisticLock) handleKeyErrorForResolve(
c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError,
) (locks []*txnlock.Lock, finished bool, err error) {
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
Expand All @@ -253,17 +267,32 @@ func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs
return nil, true, errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock})
}

// Do not resolve the lock if the lock was recently updated which indicates the txn holding the lock is
// much likely alive.
// This should only happen for wait timeout.
if lockInfo := keyErr.GetLocked(); lockInfo != nil &&
lockInfo.DurationToLastUpdateMs > 0 &&
lockInfo.DurationToLastUpdateMs < skipResolveThresholdMs {
continue
}

// Extract lock from key error
lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil {
return nil, true, err1
}
locks = append(locks, lock)
}
if len(locks) == 0 {
return nil, false, nil
}
return locks, false, nil
}

func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation,
resp *tikvrpc.Response, diagCtx *diagnosticContext,
) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
Expand All @@ -283,7 +312,12 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
if len(keyErrs) == 0 {

if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
action.LockCtx.Stats.MergeReqDetails(
diagCtx.reqDuration,
batch.region.GetID(),
diagCtx.sender.GetStoreAddr(),
lockResp.ExecDetailsV2,
)
}

if batch.isPrimary {
Expand Down Expand Up @@ -314,10 +348,14 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
}
return true, nil
}
locks, finished, err := action.handleKeyError(c, keyErrs)

locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs)
if err != nil {
return finished, err
}
if len(locks) == 0 {
return false, nil
}

// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
Expand Down Expand Up @@ -360,7 +398,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
return false, nil
}

func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation,
resp *tikvrpc.Response, diagCtx *diagnosticContext,
) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
Expand All @@ -376,7 +417,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
if len(mutationsPb) > 1 || len(lockResp.Results) > 1 {
panic("unreachable")
}
if batch.isPrimary && len(lockResp.Results) > 0 && lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
if batch.isPrimary &&
len(lockResp.Results) > 0 &&
lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
// After locking the primary key, we should protect the primary lock from expiring.
c.run(c, action.LockCtx)
}
Expand Down Expand Up @@ -422,11 +465,16 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c

if len(lockResp.Results) > 0 && !isMutationFailed {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
action.LockCtx.Stats.MergeReqDetails(
diagCtx.reqDuration,
batch.region.GetID(),
diagCtx.sender.GetStoreAddr(),
lockResp.ExecDetailsV2,
)
}
}

locks, finished, err := action.handleKeyError(c, keyErrs)
locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs)
if err != nil {
return finished, err
}
Expand Down Expand Up @@ -477,9 +525,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
return false, nil
}

// If the failedMutations is not empty and the error is not KeyIsLocked, the function should have already
// returned before. So this is an unreachable path.
return true, errors.New("Pessimistic lock response corrupted")
// This can be the situation where KeyIsLocked errors are generated by timeout,
// and we decide not to resolve them. Instead, just retry
return false, nil
}

if len(locks) != 0 {
Expand All @@ -497,16 +545,20 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
return true, nil
}

func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (actionPessimisticRollback) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
forUpdateTS := c.forUpdateTS
if c.maxLockedWithConflictTS > forUpdateTS {
forUpdateTS = c.maxLockedWithConflictTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
})
req := tikvrpc.NewRequest(
tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
},
)
req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx())
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
Expand All @@ -528,7 +580,10 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret
return nil
}

func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) pessimisticLockMutations(
bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode,
mutations CommitterMutations,
) error {
if c.sessionID > 0 {
if val, err := util.EvalFailpoint("beforePessimisticLock"); err == nil {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
Expand All @@ -537,19 +592,27 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt
for _, action := range strings.Split(v, ",") {
if action == "delay" {
duration := time.Duration(rand.Int63n(int64(time.Second) * 5))
logutil.Logger(bo.GetCtx()).Info("[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
logutil.Logger(bo.GetCtx()).Info(
"[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration),
)
time.Sleep(duration)
} else if action == "fail" {
logutil.Logger(bo.GetCtx()).Info("[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS))
logutil.Logger(bo.GetCtx()).Info(
"[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS),
)
return errors.New("injected failure at pessimistic lock")
}
}
}
}
}
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations)
return c.doActionOnMutations(
bo,
actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()},
mutations,
)
}

func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
Expand Down

0 comments on commit 67e56a9

Please sign in to comment.