Skip to content

Commit

Permalink
kv/concurrency: batch intent resolution of pushed intents from same txn
Browse files Browse the repository at this point in the history
Fixes cockroachdb#103126.

This commit extends the infrastructure introduced in cockroachdb#49218 for transaction
timestamp pushes. It avoids redundant txn pushes of PENDING transactions and
batches the resolution of PENDING intents. This breaks the O(num_intents) work
performed by high-priority scans (e.g. backups) over intent-heavy keyspaces into
something closer to O(num_ranges) work.

The commit accomplishes its goals by adding a second per-Range LRU cache of
transactions that are PENDING and are known to have been pushed to higher
timestamps. We use this cache for two purposes:

1. when we are a non-locking read and we see a lock at a conflicting timestamp
   who is held by a pushed txn above our read timestamp, we neither wait out the
   kv.lock_table.coordinator_liveness_push_delay (50 ms) nor push the
   transactions record (RPC to leaseholder of pushee's txn record range).
2. we use the existence of a transaction in the cache as an indication that
   it may have written multiple intents, so we begin deferring intent resolution
   to enable batching.

Together, these two changes make us much more effective at pushing transactions
with a large number of intents. The following example (from cockroachdb#103126) demonstrates
this:
```sql
-- SETUP: run in a 3-node GCP roachprod cluster

--- session 1 - write 100k intents
CREATE TABLE keys (k BIGINT NOT NULL PRIMARY KEY);
BEGIN; INSERT INTO keys SELECT generate_series(1, 100000);

--- session 2 - push intents with high-priority txn without uncertainty interval
BEGIN PRIORITY HIGH AS OF SYSTEM TIME '-1ms';
SELECT count(*) FROM keys;

--- BEFORE this PR and before cockroachdb#103265 (i.e. v23.1.2): takes ~7.1ms per intent
Time: 714.441s total

--- BEFORE this PR: takes ~1.5ms per intent
Time: 151.880s total

--- AFTER this PR: takes ~24μs per intent
Time: 2.405s
```

The change does have an unfortunate limitation. Deferred intent resolution
is only currently enabled for non-locking readers without uncertainty
intervals. Readers with uncertainty intervals must contend with the
possibility of pushing a conflicting intent up into their uncertainty
interval and causing more work for themselves, which is avoided with care
by the lockTableWaiter but difficult to coordinate through the
txnStatusCache. This limitation is acceptable because the most important
case here is optimizing the Export requests issued by backup.

This limitation also hints at the long-term plan for this interaction,
which is that non-locking readers can ignore known pending intents without
the need to even resolve those intents (see cockroachdb#94730). This will require a
request-scoped cache of pending, pushed transactions, which does not have
the same problems with uncertainty intervals.

Release note (performance improvement): Backups no longer perform work
proportional to the number of pending intents that they encounter, so they are
over 100x faster when encountering long-running, bulk writing transactions.
  • Loading branch information
nvanbenschoten committed Jul 13, 2023
1 parent 1c2a8cd commit 3582d97
Show file tree
Hide file tree
Showing 14 changed files with 1,161 additions and 95 deletions.
21 changes: 11 additions & 10 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,10 +624,10 @@ type lockTable interface {
// evaluation of this request. It adds the lock and enqueues this requester
// in its wait-queue. It is required that request evaluation discover such
// locks before acquiring its own locks, since the request needs to repeat
// ScanAndEnqueue. When consultFinalizedTxnCache=true, and the transaction
// holding the lock is finalized, the lock is not added to the lock table
// and instead tracked in the list of locks to resolve in the
// lockTableGuard.
// ScanAndEnqueue. When consultTxnStatusCache=true, and the transaction
// holding the lock is known to be pushed or finalized, the lock is not added
// to the lock table and instead tracked in the list of locks to resolve in
// the lockTableGuard.
//
// The lease sequence is used to detect lease changes between the when
// request that found the lock started evaluating and when the discovered
Expand All @@ -644,7 +644,7 @@ type lockTable interface {
// true) or whether it was ignored because the lockTable is currently
// disabled (false).
AddDiscoveredLock(
intent *roachpb.Intent, seq roachpb.LeaseSequence, consultFinalizedTxnCache bool,
intent *roachpb.Intent, seq roachpb.LeaseSequence, consultTxnStatusCache bool,
guard lockTableGuard) (bool, error)

// AcquireLock informs the lockTable that a new lock was acquired or an
Expand Down Expand Up @@ -718,11 +718,12 @@ type lockTable interface {
// txn.WriteTimestamp.
UpdateLocks(*roachpb.LockUpdate) error

// TransactionIsFinalized informs the lock table that a transaction is
// finalized. This is used by the lock table in a best-effort manner to avoid
// waiting on locks of finalized transactions and telling the caller via
// lockTableGuard.ResolveBeforeEvaluation to resolve a batch of intents.
TransactionIsFinalized(*roachpb.Transaction)
// PushedTransactionUpdated informs the lock table that a transaction has been
// pushed and is either finalized or has been moved to a higher timestamp.
// This is used by the lock table in a best-effort manner to avoid waiting on
// locks of finalized or pushed transactions and telling the caller via
// lockTableGuard.ResolveBeforeScanning to resolve a batch of intents.
PushedTransactionUpdated(*roachpb.Transaction)

// QueryLockTableState returns detailed metadata on locks managed by the lockTable.
QueryLockTableState(span roachpb.Span, opts QueryLockTableOptions) ([]roachpb.LockStateInfo, QueryLockTableResumeState)
Expand Down
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,18 @@ var MaxLockWaitQueueLength = settings.RegisterIntSetting(
},
)

// DiscoveredLocksThresholdToConsultFinalizedTxnCache sets a threshold as
// mentioned in the description string. The default of 200 is somewhat
// arbitrary but should suffice for small OLTP transactions. Given the default
// DiscoveredLocksThresholdToConsultTxnStatusCache sets a threshold as mentioned
// in the description string. The default of 200 is somewhat arbitrary but
// should suffice for small OLTP transactions. Given the default
// 10,000 lock capacity of the lock table, 200 is small enough to not matter
// much against the capacity, which is desirable. We have seen examples with
// discoveredCount > 100,000, caused by stats collection, where we definitely
// want to avoid adding these locks to the lock table, if possible.
var DiscoveredLocksThresholdToConsultFinalizedTxnCache = settings.RegisterIntSetting(
var DiscoveredLocksThresholdToConsultTxnStatusCache = settings.RegisterIntSetting(
settings.SystemOnly,
// NOTE: the name of this setting mentions "finalized" for historical reasons.
"kv.lock_table.discovered_locks_threshold_for_consulting_finalized_txn_cache",
"the maximum number of discovered locks by a waiter, above which the finalized txn cache"+
"the maximum number of discovered locks by a waiter, above which the txn status cache"+
"is consulted and resolvable locks are not added to the lock table -- this should be a small"+
"fraction of the maximum number of locks in the lock table",
200,
Expand Down Expand Up @@ -463,11 +464,11 @@ func (m *managerImpl) HandleWriterIntentError(
//
// Either way, there is no possibility of the request entering an infinite
// loop without making progress.
consultFinalizedTxnCache :=
int64(len(t.Intents)) > DiscoveredLocksThresholdToConsultFinalizedTxnCache.Get(&m.st.SV)
consultTxnStatusCache :=
int64(len(t.Intents)) > DiscoveredLocksThresholdToConsultTxnStatusCache.Get(&m.st.SV)
for i := range t.Intents {
intent := &t.Intents[i]
added, err := m.lt.AddDiscoveredLock(intent, seq, consultFinalizedTxnCache, g.ltg)
added, err := m.lt.AddDiscoveredLock(intent, seq, consultTxnStatusCache, g.ltg)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ import (
// debug-disable-txn-pushes
// debug-set-clock ts=<secs>
// debug-advance-clock ts=<secs>
// debug-set-discovered-locks-threshold-to-consult-finalized-txn-cache n=<count>
// debug-set-discovered-locks-threshold-to-consult-txn-status-cache n=<count>
// debug-set-max-locks n=<count>
// reset
func TestConcurrencyManagerBasic(t *testing.T) {
Expand Down Expand Up @@ -566,10 +566,10 @@ func TestConcurrencyManagerBasic(t *testing.T) {
c.manual.Advance(time.Duration(secs) * time.Second)
return ""

case "debug-set-discovered-locks-threshold-to-consult-finalized-txn-cache":
case "debug-set-discovered-locks-threshold-to-consult-txn-status-cache":
var n int
d.ScanArgs(t, "n", &n)
c.setDiscoveredLocksThresholdToConsultFinalizedTxnCache(n)
c.setDiscoveredLocksThresholdToConsultTxnStatusCache(n)
return ""

case "debug-set-max-locks":
Expand Down Expand Up @@ -944,8 +944,8 @@ func (c *cluster) disableTxnPushes() {
concurrency.LockTableDeadlockDetectionPushDelay.Override(context.Background(), &c.st.SV, time.Hour)
}

func (c *cluster) setDiscoveredLocksThresholdToConsultFinalizedTxnCache(n int) {
concurrency.DiscoveredLocksThresholdToConsultFinalizedTxnCache.Override(context.Background(), &c.st.SV, int64(n))
func (c *cluster) setDiscoveredLocksThresholdToConsultTxnStatusCache(n int) {
concurrency.DiscoveredLocksThresholdToConsultTxnStatusCache.Override(context.Background(), &c.st.SV, int64(n))
}

// reset clears all request state in the cluster. This avoids portions of tests
Expand Down
134 changes: 101 additions & 33 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,14 @@ type lockTableImpl struct {
// instead of clearing everything.
minLocks int64

// finalizedTxnCache is a small LRU cache that tracks transactions that
// were pushed and found to be finalized (COMMITTED or ABORTED). It is
// used as an optimization to avoid repeatedly pushing the transaction
// record when cleaning up the intents of an abandoned transaction.
// txnStatusCache is a small LRU cache that tracks the status of
// transactions that have been successfully pushed.
//
// NOTE: it probably makes sense to maintain a single finalizedTxnCache
// NOTE: it probably makes sense to maintain a single txnStatusCache
// across all Ranges on a Store instead of an individual cache per
// Range. For now, we don't do this because we don't share any state
// between separate concurrency.Manager instances.
finalizedTxnCache txnCache
txnStatusCache txnStatusCache

// clock is used to track the lock hold and lock wait start times.
clock *hlc.Clock
Expand Down Expand Up @@ -654,6 +652,10 @@ func (g *lockTableGuardImpl) txnMeta() *enginepb.TxnMeta {
return &g.txn.TxnMeta
}

func (g *lockTableGuardImpl) hasUncertaintyInterval() bool {
return g.txn != nil && g.txn.ReadTimestamp.Less(g.txn.GlobalUncertaintyLimit)
}

func (g *lockTableGuardImpl) isSameTxn(txn *enginepb.TxnMeta) bool {
return g.txn != nil && g.txn.ID == txn.ID
}
Expand All @@ -677,7 +679,7 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) {
span = &spans[g.index]
resumingInSameSpan = true
}
// Locks that transition to free because of the finalizedTxnCache are GC'd
// Locks that transition to free because of the txnStatusCache are GC'd
// before returning. Note that these are only unreplicated locks. Replicated
// locks are handled via the g.toResolve.
var locksToGC [spanset.NumSpanScope][]*lockState
Expand Down Expand Up @@ -733,8 +735,20 @@ func (g *lockTableGuardImpl) findNextLockAfter(notify bool) {
// we iterate over all the elements of toResolve and only keep the ones
// where removing the lock via the call to updateLockInternal is not a
// noop.
//
// For pushed transactions that are not finalized, we disable this
// deduplication and allow all resolution attempts to adjust the lock's
// timestamp to go through. This is because updating the lock ahead of
// resolution risks rediscovery loops where the lock is continually
// rediscovered at a lower timestamp than that in the lock table.
for i := range g.toResolve {
if heldByTxn := g.lt.updateLockInternal(&g.toResolve[i]); heldByTxn {
var doResolve bool
if g.toResolve[i].Status.IsFinalized() {
doResolve = g.lt.updateLockInternal(&g.toResolve[i])
} else {
doResolve = true
}
if doResolve {
g.toResolve[j] = g.toResolve[i]
j++
}
Expand Down Expand Up @@ -1057,8 +1071,8 @@ func (l *lockState) SafeFormat(w redact.SafePrinter, _ rune) {
}

// safeFormat is a helper for SafeFormat and String methods.
// REQUIRES: l.mu is locked. finalizedTxnCache can be nil.
func (l *lockState) safeFormat(sb *redact.StringBuilder, finalizedTxnCache *txnCache) {
// REQUIRES: l.mu is locked. txnStatusCache can be nil.
func (l *lockState) safeFormat(sb *redact.StringBuilder, txnStatusCache *txnStatusCache) {
sb.Printf(" lock: %s\n", l.key)
if l.isEmptyLock() {
sb.SafeString(" empty\n")
Expand Down Expand Up @@ -1087,8 +1101,8 @@ func (l *lockState) safeFormat(sb *redact.StringBuilder, finalizedTxnCache *txnC
} else {
sb.SafeString("unrepl ")
}
if finalizedTxnCache != nil {
finalizedTxn, ok := finalizedTxnCache.get(h.txn.ID)
if txnStatusCache != nil {
finalizedTxn, ok := txnStatusCache.finalizedTxns.get(h.txn.ID)
if ok {
var statusStr string
switch finalizedTxn.Status {
Expand Down Expand Up @@ -1535,11 +1549,12 @@ func (l *lockState) clearLockHolder() {
// that case the channel is notified first and the call to tryActiveWait()
// happens later in lockTableGuard.CurState().
//
// It uses the finalizedTxnCache to decide that the caller does not need to
// wait on a lock of a transaction that is already finalized.
// It uses the txnStatusCache to decide that the caller does not need to wait on
// a lock of a transaction that is already finalized or is pending but pushed
// above the request's read timestamp (for non-locking readers).
//
// - For unreplicated locks, this method will silently remove the lock and
// proceed as normal.
// - For unreplicated locks, this method will silently remove (or update) the
// lock and proceed as normal.
//
// - For replicated locks the behavior is more complicated since we need to
// resolve the intent. We desire:
Expand Down Expand Up @@ -1606,12 +1621,13 @@ func (l *lockState) tryActiveWait(

var replicatedLockFinalizedTxn *roachpb.Transaction
if lockHolderTxn != nil {
finalizedTxn, ok := g.lt.finalizedTxnCache.get(lockHolderTxn.ID)
finalizedTxn, ok := g.lt.txnStatusCache.finalizedTxns.get(lockHolderTxn.ID)
if ok {
if l.holder.holder[lock.Replicated].txn == nil {
// Only held unreplicated. Release immediately.
l.clearLockHolder()
if l.lockIsFree() {
up := roachpb.MakeLockUpdate(finalizedTxn, roachpb.Span{Key: l.key})
_, gc := l.tryUpdateLockLocked(up)
if gc {
// Empty lock.
return false, true
}
Expand All @@ -1633,6 +1649,40 @@ func (l *lockState) tryActiveWait(
if g.ts.Less(lockHolderTS) {
return false, false
}

// If the non-locking reader is reading at a higher timestamp than the lock
// holder, but it knows that the lock holder has been pushed above its read
// timestamp, it can proceed after rewriting the lock at its transaction's
// pushed timestamp. Intent resolution can be deferred to maximize batching
// opportunities.
//
// This fast-path is only enabled for readers without uncertainty intervals,
// as readers with uncertainty intervals must contend with the possibility
// of pushing a conflicting intent up into their uncertainty interval and
// causing more work for themselves, which is avoided with care by the
// lockTableWaiter but difficult to coordinate through the txnStatusCache.
// This limitation is acceptable because the most important case here is
// optimizing the Export requests issued by backup.
if !g.hasUncertaintyInterval() {
pushedTxn, ok := g.lt.txnStatusCache.pendingTxns.get(lockHolderTxn.ID)
if ok && g.ts.Less(pushedTxn.WriteTimestamp) {
up := roachpb.MakeLockUpdate(pushedTxn, roachpb.Span{Key: l.key})
if l.holder.holder[lock.Replicated].txn == nil {
// Only held unreplicated. Update lock directly in case other
// waiting readers can benefit from the pushed timestamp.
//
// TODO(arul): this case is only possible while non-locking reads
// block on Exclusive locks. Once non-locking reads start only
// blocking on intents, it can be removed and asserted against.
_, _ = l.tryUpdateLockLocked(up)
} else {
// Resolve to push the replicated intent.
g.toResolve = append(g.toResolve, up)
}
return false, false
}
}

g.mu.Lock()
_, alsoHasStrongerAccess := g.mu.locks[l]
g.mu.Unlock()
Expand Down Expand Up @@ -1831,7 +1881,7 @@ func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, sa spanset.SpanA
// Already locked by this txn.
return true
}
// NB: We do not look at the finalizedTxnCache in this optimistic evaluation
// NB: We do not look at the txnStatusCache in this optimistic evaluation
// path. A conflict with a finalized txn will be noticed when retrying
// pessimistically.

Expand Down Expand Up @@ -2194,9 +2244,14 @@ func removeIgnored(
func (l *lockState) tryUpdateLock(up *roachpb.LockUpdate) (heldByTxn, gc bool) {
l.mu.Lock()
defer l.mu.Unlock()
return l.tryUpdateLockLocked(*up)
}

// REQUIRES: l.mu is locked.
func (l *lockState) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bool) {
if l.isEmptyLock() {
// Already free. This can happen when an unreplicated lock is removed in
// tryActiveWait due to the txn being in the finalizedTxnCache.
// tryActiveWait due to the txn being in the txnStatusCache.
return false, true
}
if !l.isLockedBy(up.Txn.ID) {
Expand Down Expand Up @@ -2617,13 +2672,13 @@ func (t *lockTableImpl) Dequeue(guard lockTableGuard) {
//
// We discussed in
// https://github.com/cockroachdb/cockroach/issues/62470#issuecomment-818374388
// the possibility of consulting the finalizedTxnCache in AddDiscoveredLock,
// the possibility of consulting the txnStatusCache in AddDiscoveredLock,
// and not adding the lock if the txn is already finalized, and instead
// telling the caller to do batched intent resolution before calling
// ScanAndEnqueue.
// This reduces memory pressure on the lockTableImpl in the extreme case of
// huge numbers of discovered locks. Note that when there isn't memory
// pressure, the consultation of the finalizedTxnCache in the ScanAndEnqueue
// pressure, the consultation of the txnStatusCache in the ScanAndEnqueue
// achieves the same batched intent resolution. Additionally, adding the lock
// to the lock table allows it to coordinate the population of
// lockTableGuardImpl.toResolve for different requests that encounter the same
Expand All @@ -2635,11 +2690,11 @@ func (t *lockTableImpl) Dequeue(guard lockTableGuard) {
// For now we adopt the following heuristic: the caller calls DiscoveredLocks
// with the count of locks discovered, prior to calling AddDiscoveredLock for
// each of the locks. At that point a decision is made whether to consult the
// finalizedTxnCache eagerly when adding discovered locks.
// txnStatusCache eagerly when adding discovered locks.
func (t *lockTableImpl) AddDiscoveredLock(
intent *roachpb.Intent,
seq roachpb.LeaseSequence,
consultFinalizedTxnCache bool,
consultTxnStatusCache bool,
guard lockTableGuard,
) (added bool, _ error) {
t.enabledMu.RLock()
Expand All @@ -2664,13 +2719,26 @@ func (t *lockTableImpl) AddDiscoveredLock(
if err != nil {
return false, err
}
if consultFinalizedTxnCache {
finalizedTxn, ok := t.finalizedTxnCache.get(intent.Txn.ID)
if consultTxnStatusCache {
finalizedTxn, ok := t.txnStatusCache.finalizedTxns.get(intent.Txn.ID)
if ok {
g.toResolve = append(
g.toResolve, roachpb.MakeLockUpdate(finalizedTxn, roachpb.Span{Key: key}))
return true, nil
}

// If the discoverer is a non-locking read, also check whether the lock's
// holder is known to have been pushed above the reader's timestamp. See the
// comment in tryActiveWait for more details, including why we include the
// hasUncertaintyInterval condition.
if sa == spanset.SpanReadOnly && !g.hasUncertaintyInterval() {
pushedTxn, ok := g.lt.txnStatusCache.pendingTxns.get(intent.Txn.ID)
if ok && g.ts.Less(pushedTxn.WriteTimestamp) {
g.toResolve = append(
g.toResolve, roachpb.MakeLockUpdate(pushedTxn, roachpb.Span{Key: key}))
return true, nil
}
}
}
var l *lockState
tree := &t.locks[ss]
Expand Down Expand Up @@ -2951,14 +3019,14 @@ func stepToNextSpan(g *lockTableGuardImpl) *spanset.Span {
return nil
}

// TransactionIsFinalized implements the lockTable interface.
func (t *lockTableImpl) TransactionIsFinalized(txn *roachpb.Transaction) {
// PushedTransactionUpdated implements the lockTable interface.
func (t *lockTableImpl) PushedTransactionUpdated(txn *roachpb.Transaction) {
// TODO(sumeer): We don't take any action for requests that are already
// waiting on locks held by txn. They need to take some action, like
// pushing, and resume their scan, to notice the change to this txn. We
// could be more proactive if we knew which locks in lockTableImpl were held
// by txn.
t.finalizedTxnCache.add(txn)
t.txnStatusCache.add(txn)
}

// Enable implements the lockTable interface.
Expand Down Expand Up @@ -2989,9 +3057,9 @@ func (t *lockTableImpl) Clear(disable bool) {
}
// The numToClear=0 is arbitrary since it is unused when force=true.
t.tryClearLocks(true /* force */, 0)
// Also clear the finalized txn cache, since it won't be needed any time
// Also clear the txn status cache, since it won't be needed any time
// soon and consumes memory.
t.finalizedTxnCache.clear()
t.txnStatusCache.clear()
}

// QueryLockTableState implements the lockTable interface.
Expand Down Expand Up @@ -3098,7 +3166,7 @@ func (t *lockTableImpl) String() string {
for iter.First(); iter.Valid(); iter.Next() {
l := iter.Cur()
l.mu.Lock()
l.safeFormat(&sb, &t.finalizedTxnCache)
l.safeFormat(&sb, &t.txnStatusCache)
l.mu.Unlock()
}
tree.mu.RUnlock()
Expand Down
Loading

0 comments on commit 3582d97

Please sign in to comment.