Skip to content

Commit

Permalink
tikv: keep using origin goroutine when doing retry in commit phase (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot committed Apr 29, 2020
1 parent facd660 commit cab6200
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions store/tikv/2pc.go
Expand Up @@ -50,7 +50,7 @@ type twoPhaseCommitAction interface {
}

type actionPrewrite struct{}
type actionCommit struct{}
type actionCommit struct{ retry bool }
type actionCleanup struct{}
type actionPessimisticLock struct {
*kv.LockCtx
Expand Down Expand Up @@ -463,7 +463,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
firstIsPrimary = true
}

_, actionIsCommit := action.(actionCommit)
actionCommit, actionIsCommit := action.(actionCommit)
_, actionIsCleanup := action.(actionCleanup)
_, actionIsPessimiticLock := action.(actionPessimisticLock)

Expand Down Expand Up @@ -499,7 +499,7 @@ func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCo
}
batches = batches[1:]
}
if actionIsCommit {
if actionIsCommit && !actionCommit.retry {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potential data race in unit test since `CommitMaxBackoff` will be updated
Expand Down Expand Up @@ -527,16 +527,25 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
return nil
}

if len(batches) == 1 {
e := action.handleSingleBatch(c, bo, batches[0])
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
noNeedFork := len(batches) == 1
if !noNeedFork {
if ac, ok := action.(actionCommit); ok && ac.retry {
noNeedFork = true
}
return errors.Trace(e)
}
if noNeedFork {
for _, b := range batches {
e := action.handleSingleBatch(c, bo, b)
if e != nil {
logutil.BgLogger().Debug("2PC doActionOnBatches failed",
zap.Uint64("conn", c.connID),
zap.Stringer("action type", action),
zap.Error(e),
zap.Uint64("txnStartTS", c.startTS))
return errors.Trace(e)
}
}
return nil
}
rateLim := len(batches)
// Set rateLim here for the large transaction.
Expand Down Expand Up @@ -998,7 +1007,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch
return errors.Trace(err)
}
// re-split keys and commit again.
err = c.commitMutations(bo, batch.mutations)
err = c.doActionOnMutations(bo, actionCommit{retry: true}, batch.mutations)
return errors.Trace(err)
}
if resp.Resp == nil {
Expand Down

0 comments on commit cab6200

Please sign in to comment.