diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index 7950d46d20ff..af093f4f4be0 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/redact" ) // Manager is a structure that sequences incoming requests and provides @@ -432,6 +433,10 @@ type Request struct { // passed to SequenceReq. Only supplied to SequenceReq if the method is // not also passed an exiting Guard. LockSpans *lockspanset.LockSpanSet + + // The SafeFormatter capable of formatting the request. This is used to enrich + // logging with request level information when latches conflict. + BaFmt redact.SafeFormatter } // Guard is returned from Manager.SequenceReq. The guard is passed back in to @@ -510,7 +515,7 @@ type latchManager interface { // WaitFor waits for conflicting latches on the specified spans without adding // any latches itself. Fast path for operations that only require flushing out // old operations without blocking any new ones. - WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) *Error + WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter) *Error // Poison a guard's latches, allowing waiters to fail fast. Poison(latchGuard) diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index 1e8aa222fbb2..3ee5558dad5f 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -260,7 +260,7 @@ func (m *managerImpl) sequenceReqWithGuard( // them. if shouldWaitOnLatchesWithoutAcquiring(g.Req) { log.Event(ctx, "waiting on latches without acquiring") - return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy) + return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy, g.Req.BaFmt) } // Provide the manager with an opportunity to intercept the request. It diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 462df780626b..01eb99ddc62d 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -194,25 +194,33 @@ func TestConcurrencyManagerBasic(t *testing.T) { if d.HasArg("max-lock-wait-queue-length") { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } - + ba := &kvpb.BatchRequest{} pp := scanPoisonPolicy(t, d) // Each kvpb.Request is provided on an indented line. reqs, reqUnions := scanRequests(t, d, c) + ba.Txn = txn + ba.Timestamp = ts + ba.UserPriority = priority + ba.ReadConsistency = readConsistency + ba.WaitPolicy = waitPolicy + ba.LockTimeout = lockTimeout + ba.Requests = reqUnions latchSpans, lockSpans := c.collectSpans(t, txn, ts, waitPolicy, reqs) c.requestsByName[reqName] = concurrency.Request{ - Txn: txn, - Timestamp: ts, - NonTxnPriority: priority, - ReadConsistency: readConsistency, - WaitPolicy: waitPolicy, - LockTimeout: lockTimeout, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + WaitPolicy: ba.WaitPolicy, + LockTimeout: ba.LockTimeout, + Requests: ba.Requests, MaxLockWaitQueueLength: maxLockWaitQueueLength, - Requests: reqUnions, LatchSpans: latchSpans, LockSpans: lockSpans, PoisonPolicy: pp, + BaFmt: ba, } return "" diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index d40c249eae32..482aedba2877 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/redact" ) // latchManagerImpl implements the latchManager interface. @@ -25,7 +26,7 @@ type latchManagerImpl struct { } func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) { - lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy) + lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy, req.BaFmt) if err != nil { return nil, kvpb.NewError(err) } @@ -33,7 +34,7 @@ func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard } func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard { - lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy) + lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy, req.BaFmt) return lg } @@ -52,9 +53,9 @@ func (m *latchManagerImpl) WaitUntilAcquired( } func (m *latchManagerImpl) WaitFor( - ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, + ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, ) *Error { - err := m.m.WaitFor(ctx, ss, pp) + err := m.m.WaitFor(ctx, ss, pp, baFmt) if err != nil { return kvpb.NewError(err) } diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index 0b02f6a225eb..ce5d2587ed7e 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" @@ -321,21 +322,26 @@ func TestLockTableBasic(t *testing.T) { d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength) } latchSpans, lockSpans := scanSpans(t, d, ts) + ba := &kvpb.BatchRequest{} + ba.Timestamp = ts + ba.WaitPolicy = waitPolicy req := Request{ - Timestamp: ts, - WaitPolicy: waitPolicy, + Timestamp: ba.Timestamp, + WaitPolicy: ba.WaitPolicy, MaxLockWaitQueueLength: maxLockWaitQueueLength, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } if txnMeta != nil { // Update the transaction's timestamp, if necessary. The transaction // may have needed to move its timestamp for any number of reasons. txnMeta.WriteTimestamp = ts - req.Txn = &roachpb.Transaction{ + ba.Txn = &roachpb.Transaction{ TxnMeta: *txnMeta, ReadTimestamp: ts, } + req.Txn = ba.Txn } requestsByName[reqName] = req return "" @@ -874,10 +880,13 @@ func TestLockTableMaxLocks(t *testing.T) { latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: k}, hlc.Timestamp{WallTime: 1}) lockSpans.Add(lock.Intent, roachpb.Span{Key: k}) } + ba := &kvpb.BatchRequest{} + ba.Timestamp = hlc.Timestamp{WallTime: 1} req := Request{ - Timestamp: hlc.Timestamp{WallTime: 1}, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } reqs = append(reqs, req) ltg, err := lt.ScanAndEnqueue(req, nil) @@ -1011,10 +1020,13 @@ func TestLockTableMaxLocksWithMultipleNotRemovableRefs(t *testing.T) { } latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key}, hlc.Timestamp{WallTime: 1}) lockSpans.Add(lock.Intent, roachpb.Span{Key: key}) + ba := &kvpb.BatchRequest{} + ba.Timestamp = hlc.Timestamp{WallTime: 1} req := Request{ - Timestamp: hlc.Timestamp{WallTime: 1}, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } ltg, err := lt.ScanAndEnqueue(req, nil) require.Nil(t, err) @@ -1111,7 +1123,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { // cancellation, the code makes sure to release latches when returning // early due to error. Otherwise other requests will get stuck and // group.Wait() will not return until the test times out. - lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error) + lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error, item.request.BaFmt) if err != nil { return err } @@ -1519,11 +1531,15 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) { ReadTimestamp: ts, } } + ba := &kvpb.BatchRequest{} + ba.Txn = txn + ba.Timestamp = ts request := &Request{ - Txn: txn, - Timestamp: ts, + Txn: ba.Txn, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } items = append(items, workloadItem{request: request}) if txn != nil { @@ -1601,10 +1617,13 @@ func TestLockTableConcurrentRequests(t *testing.T) { lockSpans := &lockspanset.LockSpanSet{} onlyReads := txnMeta == nil && rng.Intn(2) != 0 numKeys := rng.Intn(len(keys)-1) + 1 + ba := &kvpb.BatchRequest{} + ba.Timestamp = ts request := &Request{ - Timestamp: ts, + Timestamp: ba.Timestamp, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, } if txnMeta != nil { request.Txn = &roachpb.Transaction{ @@ -1691,7 +1710,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { var err error firstIter := true for { - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil { doneCh <- err return } @@ -1738,7 +1757,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { return } // Release locks. - if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil { + if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil { doneCh <- err return } @@ -1766,11 +1785,14 @@ func createRequests(index int, numOutstanding int, numKeys int, numReadKeys int) ts := hlc.Timestamp{WallTime: 10} latchSpans := &spanset.SpanSet{} lockSpans := &lockspanset.LockSpanSet{} + ba := &kvpb.BatchRequest{} + ba.Timestamp = ts wi := benchWorkItem{ Request: Request{ Timestamp: ts, LatchSpans: latchSpans, LockSpans: lockSpans, + BaFmt: ba, }, } for i := 0; i < numKeys; i++ { diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go index 3ad717c9abc7..770158634cc7 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter_test.go @@ -151,9 +151,13 @@ func TestLockTableWaiterWithTxn(t *testing.T) { if synthetic { txn.ReadTimestamp = txn.ReadTimestamp.WithSynthetic(true) } + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp return Request{ Txn: &txn, - Timestamp: txn.ReadTimestamp, + Timestamp: ba.Timestamp, + BaFmt: ba, } } @@ -241,9 +245,13 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) { reqHeaderTS := hlc.Timestamp{WallTime: 10} makeReq := func() Request { + ba := &kvpb.BatchRequest{} + ba.Timestamp = reqHeaderTS + ba.UserPriority = roachpb.NormalUserPriority return Request{ - Timestamp: reqHeaderTS, - NonTxnPriority: roachpb.NormalUserPriority, + Timestamp: ba.Timestamp, + NonTxnPriority: ba.UserPriority, + BaFmt: ba, } } @@ -441,10 +449,15 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) { makeReq := func() Request { txn := makeTxnProto("request") txn.GlobalUncertaintyLimit = uncertaintyLimit + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp + ba.WaitPolicy = lock.WaitPolicy_Error return Request{ Txn: &txn, - Timestamp: txn.ReadTimestamp, - WaitPolicy: lock.WaitPolicy_Error, + Timestamp: ba.Timestamp, + WaitPolicy: ba.WaitPolicy, + BaFmt: ba, } } makeHighPriReq := func() Request { @@ -608,17 +621,26 @@ func TestLockTableWaiterWithLockTimeout(t *testing.T) { const lockTimeout = 1 * time.Millisecond makeReq := func() Request { txn := makeTxnProto("request") + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp + ba.LockTimeout = lockTimeout return Request{ - Txn: &txn, - Timestamp: txn.ReadTimestamp, - LockTimeout: lockTimeout, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + LockTimeout: ba.LockTimeout, + BaFmt: ba, } } if !txn { makeReq = func() Request { + ba := &kvpb.BatchRequest{} + ba.Timestamp = hlc.Timestamp{WallTime: 10} + ba.LockTimeout = lockTimeout return Request{ - Timestamp: hlc.Timestamp{WallTime: 10}, - LockTimeout: lockTimeout, + Timestamp: ba.Timestamp, + LockTimeout: ba.LockTimeout, + BaFmt: ba, } } } @@ -793,9 +815,13 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) { err2 := kvpb.NewErrorf("error2") txn := makeTxnProto("request") + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp req := Request{ - Txn: &txn, - Timestamp: txn.ReadTimestamp, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + BaFmt: ba, } // Test with both synchronous and asynchronous pushes. @@ -849,9 +875,13 @@ func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) { defer w.stopper.Stop(ctx) txn := makeTxnProto("request") + ba := &kvpb.BatchRequest{} + ba.Txn = &txn + ba.Timestamp = txn.ReadTimestamp req := Request{ - Txn: &txn, - Timestamp: txn.ReadTimestamp, + Txn: ba.Txn, + Timestamp: ba.Timestamp, + BaFmt: ba, } keyA := roachpb.Key("keyA") pusheeTxn := makeTxnProto("pushee") diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier index 5ba3236e77d1..d03f34a0447c 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/barrier @@ -53,7 +53,7 @@ sequence req=barrier2 ---- [2] sequence barrier2: sequencing request [2] sequence barrier2: waiting on latches without acquiring -[2] sequence barrier2: waiting to acquire write latch ‹{a-f}›@0,0, held by read latch ‹c›@15.000000000,1 +[2] sequence barrier2: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by read latch ‹c›@15.000000000,1 for request Get [‹"c"›] [2] sequence barrier2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=read1 @@ -96,7 +96,7 @@ sequence req=barrier1 ---- [2] sequence barrier1: sequencing request [2] sequence barrier1: waiting on latches without acquiring -[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0, held by read latch ‹c›@10.000000000,1 +[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by read latch ‹c›@10.000000000,1 for request Get [‹"c"›] [2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal finish req=read1 @@ -143,7 +143,7 @@ sequence req=barrier1 ---- [2] sequence barrier1: sequencing request [2] sequence barrier1: waiting on latches without acquiring -[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0, held by write latch ‹c›@10.000000000,1 +[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by write latch ‹c›@10.000000000,1 for request Put [‹"c"›], [txn: 00000001] [2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic index c028e249dc64..8671437d8b8e 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic @@ -143,7 +143,7 @@ sequence req=req4 ---- [3] sequence req4: sequencing request [3] sequence req4: acquiring latches -[3] sequence req4: waiting to acquire write latch ‹k›@10.000000000,1, held by read latch ‹k{-2}›@14.000000000,1 +[3] sequence req4: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000001], held by read latch ‹k{-2}›@14.000000000,1 for request Get [‹"k"›], Scan [‹"k"›,‹"k2"›), [txn: 00000003] [3] sequence req4: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -245,13 +245,13 @@ sequence req=req7 ---- [4] sequence req7: sequencing request [4] sequence req7: acquiring latches -[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1, held by read latch ‹{a-m}›@14.000000000,1 +[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1 for request Put [‹"k"›], held by read latch ‹{a-m}›@14.000000000,1 for request Scan [‹"a"›,‹"m"›) [4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal finish req=req5 ---- [-] finish req5: finishing request -[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1, held by read latch ‹{c-z}›@16.000000000,1 +[4] sequence req7: waiting to acquire write latch ‹k›@12.000000000,1 for request Put [‹"k"›], held by read latch ‹{c-z}›@16.000000000,1 for request Scan [‹"c"›,‹"z"›) [4] sequence req7: blocked on select in spanlatch.(*Manager).waitForSignal finish req=req6 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic index c160357a2d20..e298dbe01180 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic @@ -172,7 +172,7 @@ sequence req=req6 eval-kind=pess-after-opt ---- [8] sequence req6: re-sequencing request after optimistic sequencing failed [8] sequence req6: optimistic failed, so waiting for latches -[8] sequence req6: waiting to acquire read latch ‹{a-e}›@12.000000000,1, held by write latch ‹d›@10.000000000,1 +[8] sequence req6: waiting to acquire read latch ‹{a-e}›@12.000000000,1 for request Scan [‹"a"›,‹"e"›), [txn: 00000002], held by write latch ‹d›@10.000000000,1 for request Put [‹"d"›], [txn: 00000003] [8] sequence req6: blocked on select in spanlatch.(*Manager).waitForSignal # req4 finishing releases the latch and allows req6 to proceed. diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err index 837c614aec83..4262d08ce0e6 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err @@ -27,7 +27,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 @@ -38,7 +38,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal poison req=putc diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect index bb5b400b322a..4732751b7827 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_err_indirect @@ -26,7 +26,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 @@ -37,7 +37,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal poison req=putc diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint index 32980a60e81a..227175415eaf 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_disjoint @@ -26,7 +26,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 poison-policy=wait @@ -37,7 +37,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal poison req=putc diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping index a3e2ccea768b..f76b7e71a10a 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/poison_policy_wait_overlapping @@ -26,7 +26,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=put2 ts=11,0 poison-policy=wait @@ -37,7 +37,7 @@ sequence req=put2 ---- [3] sequence put2: sequencing request [3] sequence put2: acquiring latches -[3] sequence put2: waiting to acquire write latch ‹c›@11.000000000,0, held by write latch ‹c›@10.000000000,0 +[3] sequence put2: waiting to acquire write latch ‹c›@11.000000000,0 for request Put [‹"c"›], held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [3] sequence put2: blocked on select in spanlatch.(*Manager).waitForSignal poison req=put1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener index 423ebd1d5775..d71378787f95 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener @@ -195,7 +195,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [7] sequence req2: lock wait-queue event: done waiting [7] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [7] sequence req2: acquiring latches -[7] sequence req2: waiting to acquire read latch ‹k2›@10.000000000,1, held by write latch ‹k2›@10.000000000,1 +[7] sequence req2: waiting to acquire read latch ‹k2›@10.000000000,1 for request Put [‹"k"›], Get [‹"k2"›], [txn: 00000002], held by write latch ‹k2›@10.000000000,1 for request ResolveIntent [‹"k"›], ResolveIntent [‹"k2"›] [7] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal on-lock-updated req=reqRes1 txn=txn1 key=k2 status=committed @@ -309,7 +309,7 @@ on-lock-updated req=reqRes2 txn=txn2 key=k status=committed [13] sequence req3: lock wait-queue event: done waiting [13] sequence req3: conflicted with ‹00000002-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [13] sequence req3: acquiring latches -[13] sequence req3: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[13] sequence req3: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000003], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [13] sequence req3: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes2 @@ -467,7 +467,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [4] sequence req2: lock wait-queue event: done waiting [4] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [4] sequence req2: acquiring latches -[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000002], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [4] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes1 @@ -687,7 +687,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [8] sequence req2: lock wait-queue event: done waiting [8] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [8] sequence req2: acquiring latches -[8] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[8] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000002], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [8] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes1 @@ -845,7 +845,7 @@ on-lock-updated req=reqRes1 txn=txn1 key=k status=committed [4] sequence req2: lock wait-queue event: done waiting [4] sequence req2: conflicted with ‹00000001-0000-0000-0000-000000000000› on ‹"k"› for 0.000s [4] sequence req2: acquiring latches -[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1, held by write latch ‹k›@10.000000000,1 +[4] sequence req2: waiting to acquire write latch ‹k›@10.000000000,1 for request Put [‹"k"›], [txn: 00000002], held by write latch ‹k›@10.000000000,1 for request ResolveIntent [‹"k"›] [4] sequence req2: blocked on select in spanlatch.(*Manager).waitForSignal finish req=reqRes1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches index 9781a62be7e0..47a83eba4aa8 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/shared_locks_latches @@ -176,7 +176,7 @@ sequence req=req10 ---- [10] sequence req10: sequencing request [10] sequence req10: acquiring latches -[10] sequence req10: waiting to acquire write latch ‹a›@9.000000000,1, held by read latch ‹a›@9223372036.854775807,2147483647 +[10] sequence req10: waiting to acquire write latch ‹a›@9.000000000,1 for request Get(Exclusive,Unreplicated) [‹"a"›], [txn: 00000003], held by read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000001] [10] sequence req10: blocked on select in spanlatch.(*Manager).waitForSignal # exclusive_lock(ts) == shared_lock(ts) @@ -188,7 +188,7 @@ sequence req=req11 ---- [11] sequence req11: sequencing request [11] sequence req11: acquiring latches -[11] sequence req11: waiting to acquire write latch ‹b›@10.000000000,1, held by read latch ‹b›@9223372036.854775807,2147483647 +[11] sequence req11: waiting to acquire write latch ‹b›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"b"›], [txn: 00000001], held by read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001] [11] sequence req11: blocked on select in spanlatch.(*Manager).waitForSignal # exclusive_lock(ts) > shared_lock(ts) @@ -200,7 +200,7 @@ sequence req=req12 ---- [12] sequence req12: sequencing request [12] sequence req12: acquiring latches -[12] sequence req12: waiting to acquire write latch ‹c›@11.000000000,1, held by read latch ‹c›@9223372036.854775807,2147483647 +[12] sequence req12: waiting to acquire write latch ‹c›@11.000000000,1 for request Get(Exclusive,Unreplicated) [‹"c"›], [txn: 00000002], held by read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000001] [12] sequence req12: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -286,7 +286,7 @@ sequence req=req16 ---- [16] sequence req16: sequencing request [16] sequence req16: acquiring latches -[16] sequence req16: waiting to acquire write latch ‹a›@9.000000000,1, held by read latch ‹a›@9223372036.854775807,2147483647 +[16] sequence req16: waiting to acquire write latch ‹a›@9.000000000,1 for request Put [‹"a"›], [txn: 00000003], held by read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000001] [16] sequence req16: blocked on select in spanlatch.(*Manager).waitForSignal # write(ts) == shared_lock(ts) @@ -298,7 +298,7 @@ sequence req=req17 ---- [17] sequence req17: sequencing request [17] sequence req17: acquiring latches -[17] sequence req17: waiting to acquire write latch ‹b›@10.000000000,1, held by read latch ‹b›@9223372036.854775807,2147483647 +[17] sequence req17: waiting to acquire write latch ‹b›@10.000000000,1 for request Put [‹"b"›], [txn: 00000001], held by read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001] [17] sequence req17: blocked on select in spanlatch.(*Manager).waitForSignal # write(ts) > shared_lock(ts) @@ -310,7 +310,7 @@ sequence req=req18 ---- [18] sequence req18: sequencing request [18] sequence req18: acquiring latches -[18] sequence req18: waiting to acquire write latch ‹c›@11.000000000,1, held by read latch ‹c›@9223372036.854775807,2147483647 +[18] sequence req18: waiting to acquire write latch ‹c›@11.000000000,1 for request Put [‹"c"›], [txn: 00000002], held by read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000001] [18] sequence req18: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -470,7 +470,7 @@ sequence req=req26 ---- [26] sequence req26: sequencing request [26] sequence req26: acquiring latches -[26] sequence req26: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647, held by write latch ‹a›@10.000000000,1 +[26] sequence req26: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000003], held by write latch ‹a›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"a"›], [txn: 00000001] [26] sequence req26: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) == exclusive_lock(ts) @@ -482,7 +482,7 @@ sequence req=req27 ---- [27] sequence req27: sequencing request [27] sequence req27: acquiring latches -[27] sequence req27: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647, held by write latch ‹b›@10.000000000,1 +[27] sequence req27: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001], held by write latch ‹b›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"b"›], [txn: 00000001] [27] sequence req27: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) > exclusive_lock(ts) @@ -494,7 +494,7 @@ sequence req=req28 ---- [28] sequence req28: sequencing request [28] sequence req28: acquiring latches -[28] sequence req28: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647, held by write latch ‹c›@10.000000000,1 +[28] sequence req28: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000002], held by write latch ‹c›@10.000000000,1 for request Get(Exclusive,Unreplicated) [‹"c"›], [txn: 00000001] [28] sequence req28: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -580,7 +580,7 @@ sequence req=req32 ---- [32] sequence req32: sequencing request [32] sequence req32: acquiring latches -[32] sequence req32: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647, held by write latch ‹a›@10.000000000,1 +[32] sequence req32: waiting to acquire read latch ‹a›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"a"›], [txn: 00000003], held by write latch ‹a›@10.000000000,1 for request Put [‹"a"›], [txn: 00000001] [32] sequence req32: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) == write(ts) @@ -592,7 +592,7 @@ sequence req=req33 ---- [33] sequence req33: sequencing request [33] sequence req33: acquiring latches -[33] sequence req33: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647, held by write latch ‹b›@10.000000000,1 +[33] sequence req33: waiting to acquire read latch ‹b›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"b"›], [txn: 00000001], held by write latch ‹b›@10.000000000,1 for request Put [‹"b"›], [txn: 00000001] [33] sequence req33: blocked on select in spanlatch.(*Manager).waitForSignal # shared_lock(ts) > write(ts) @@ -604,7 +604,7 @@ sequence req=req34 ---- [34] sequence req34: sequencing request [34] sequence req34: acquiring latches -[34] sequence req34: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647, held by write latch ‹c›@10.000000000,1 +[34] sequence req34: waiting to acquire read latch ‹c›@9223372036.854775807,2147483647 for request Get(Shared,Unreplicated) [‹"c"›], [txn: 00000002], held by write latch ‹c›@10.000000000,1 for request Put [‹"c"›], [txn: 00000001] [34] sequence req34: blocked on select in spanlatch.(*Manager).waitForSignal debug-latch-manager @@ -667,7 +667,7 @@ sequence req=req36 ---- [36] sequence req36: sequencing request [36] sequence req36: acquiring latches -[36] sequence req36: waiting to acquire write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0, held by write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0 +[36] sequence req36: waiting to acquire write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0 for request Scan(Shared,Replicated) [‹"a"›,‹"f"›), [txn: 00000002], held by write latch ‹/Local/RangeID/1/r/ReplicatedSharedLocksTransactionLatch/"00000002-0000-0000-0000-000000000000"›@0,0 for request Get(Shared,Replicated) [‹"c"›], [txn: 00000002] [36] sequence req36: blocked on select in spanlatch.(*Manager).waitForSignal new-request name=req37 txn=txn1 ts=11,1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability index 4a799a67e786..340a8d1616ed 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/slow_latch_observability @@ -28,7 +28,7 @@ sequence req=readbf ---- [2] sequence readbf: sequencing request [2] sequence readbf: acquiring latches -[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1, held by write latch ‹c›@10.000000000,0 +[2] sequence readbf: waiting to acquire read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›), held by write latch ‹c›@10.000000000,0 for request Put [‹"c"›] [2] sequence readbf: blocked on select in spanlatch.(*Manager).waitForSignal new-request txn=none name=pute ts=11,0 @@ -39,7 +39,7 @@ sequence req=pute ---- [3] sequence pute: sequencing request [3] sequence pute: acquiring latches -[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0, held by read latch ‹{b-f}›@11.000000000,1 +[3] sequence pute: waiting to acquire write latch ‹e›@11.000000000,0 for request Put [‹"e"›], held by read latch ‹{b-f}›@11.000000000,1 for request Scan [‹"b"›,‹"f"›) [3] sequence pute: blocked on select in spanlatch.(*Manager).waitForSignal finish req=putc diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 7e5a89a83485..8f838f31ce2c 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -477,6 +477,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( Requests: ba.Requests, LatchSpans: latchSpans, // nil if g != nil LockSpans: lockSpans, // nil if g != nil + BaFmt: ba, }, requestEvalKind) if pErr != nil { if poisonErr := (*poison.PoisonedError)(nil); errors.As(pErr.GoError(), &poisonErr) { diff --git a/pkg/kv/kvserver/spanlatch/BUILD.bazel b/pkg/kv/kvserver/spanlatch/BUILD.bazel index 4f9e5f5b319b..edbfe23adcc5 100644 --- a/pkg/kv/kvserver/spanlatch/BUILD.bazel +++ b/pkg/kv/kvserver/spanlatch/BUILD.bazel @@ -41,6 +41,7 @@ go_test( shard_count = 16, deps = [ "//pkg/keys", + "//pkg/kv/kvpb", "//pkg/kv/kvserver/concurrency/poison", "//pkg/kv/kvserver/spanset", "//pkg/roachpb", @@ -49,6 +50,7 @@ go_test( "//pkg/util/leaktest", "//pkg/util/timeutil", # keep "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index 32fe84e42081..2192c1517baf 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -85,7 +85,7 @@ func Make(stopper *stop.Stopper, slowReqs *metric.Gauge) Manager { // latches are stored in the Manager's btrees. They represent the latching // of a single key span. type latch struct { - *signals + g *Guard id uint64 span roachpb.Span ts hlc.Timestamp @@ -102,8 +102,12 @@ func (la *latch) String() string { } // SafeFormat implements the redact.SafeFormatter interface. -func (la *latch) SafeFormat(w redact.SafePrinter, _ rune) { +func (la *latch) SafeFormat(w redact.SafePrinter, verb rune) { w.Printf("%s@%s", la.span, la.ts) + if la.g != nil && la.g.baFmt != nil { + w.Printf(" for request ") + la.g.baFmt.SafeFormat(w, verb) + } } //go:generate ../../../util/interval/generic/gen.sh *latch spanlatch @@ -126,7 +130,8 @@ type signals struct { // Manager.Acquire and accepted by Manager.Release. type Guard struct { signals - pp poison.Policy + pp poison.Policy + baFmt redact.SafeFormatter // latches [spanset.NumSpanScope][spanset.NumSpanAccess][]latch, but half the size. latchesPtrs [spanset.NumSpanScope][spanset.NumSpanAccess]unsafe.Pointer latchesLens [spanset.NumSpanScope][spanset.NumSpanAccess]int32 @@ -183,10 +188,11 @@ func allocGuardAndLatches(nLatches int) (*Guard, []latch) { return new(Guard), make([]latch, nLatches) } -func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { +func newGuard(spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter) *Guard { nLatches := spans.Len() guard, latches := allocGuardAndLatches(nLatches) guard.pp = pp + guard.baFmt = baFmt for s := spanset.SpanScope(0); s < spanset.NumSpanScope; s++ { for a := spanset.SpanAccess(0); a < spanset.NumSpanAccess; a++ { ss := spans.GetSpans(a, s) @@ -199,7 +205,7 @@ func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { for i := range ssLatches { latch := &latches[i] latch.span = ss[i].Span - latch.signals = &guard.signals + latch.g = guard latch.ts = ss[i].Timestamp // latch.setID() in Manager.insert, under lock. } @@ -222,9 +228,9 @@ func newGuard(spans *spanset.SpanSet, pp poison.Policy) *Guard { // // It returns a Guard which must be provided to Release. func (m *Manager) Acquire( - ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, ) (*Guard, error) { - lg, snap := m.sequence(spans, pp) + lg, snap := m.sequence(spans, pp, baFmt) defer snap.close() err := m.wait(ctx, lg, snap) @@ -247,8 +253,10 @@ func (m *Manager) Acquire( // // The method returns a Guard which must be provided to the // CheckOptimisticNoConflicts, Release methods. -func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp poison.Policy) *Guard { - lg, snap := m.sequence(spans, pp) +func (m *Manager) AcquireOptimistic( + spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, +) *Guard { + lg, snap := m.sequence(spans, pp, baFmt) lg.snap = &snap return lg } @@ -256,10 +264,12 @@ func (m *Manager) AcquireOptimistic(spans *spanset.SpanSet, pp poison.Policy) *G // WaitFor waits for conflicting latches on the spans without adding // any latches itself. Fast path for operations that only require past latches // to be released without blocking new latches. -func (m *Manager) WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) error { +func (m *Manager) WaitFor( + ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, +) error { // The guard is only used to store latches by this request. These latches // are not actually inserted using insertLocked. - lg := newGuard(spans, pp) + lg := newGuard(spans, pp, baFmt) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -355,8 +365,10 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err // for each of the specified spans into the manager's interval trees, and // unlocks the manager. The role of the method is to sequence latch acquisition // attempts. -func (m *Manager) sequence(spans *spanset.SpanSet, pp poison.Policy) (*Guard, snapshot) { - lg := newGuard(spans, pp) +func (m *Manager) sequence( + spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter, +) (*Guard, snapshot) { + lg := newGuard(spans, pp, baFmt) m.mu.Lock() snap := m.snapshotLocked(spans) @@ -527,7 +539,7 @@ func (m *Manager) iterAndWait( ) error { for it.FirstOverlap(wait); it.Valid(); it.NextOverlap(wait) { held := it.Cur() - if held.done.signaled() { + if held.g.done.signaled() { continue } if ignore(wait.ts, held.ts) { @@ -549,10 +561,10 @@ func (m *Manager) waitForSignal( wait, held *latch, ) error { log.Eventf(ctx, "waiting to acquire %s latch %s, held by %s latch %s", waitType, wait, heldType, held) - poisonCh := held.poison.signalChan() + poisonCh := held.g.poison.signalChan() for { select { - case <-held.done.signalChan(): + case <-held.g.done.signalChan(): return nil case <-poisonCh: // The latch we're waiting on was poisoned. If we continue to wait, we have to @@ -564,7 +576,7 @@ func (m *Manager) waitForSignal( return poison.NewPoisonedError(held.span, held.ts) case poison.Policy_Wait: log.Eventf(ctx, "encountered poisoned latch; continuing to wait") - wait.poison.signal() + wait.g.poison.signal() // No need to self-poison multiple times. poisonCh = nil default: diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index be8cc806cce1..11870cad0bfa 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -18,8 +18,10 @@ import ( "strings" "testing" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -27,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/stretchr/testify/require" ) @@ -94,7 +97,7 @@ func testLatchBlocks(t *testing.T, a Attempt) { // MustAcquire is like Acquire, except it can't return context cancellation // errors. func (m *Manager) MustAcquire(spans *spanset.SpanSet) *Guard { - lg, err := m.Acquire(context.Background(), spans, poison.Policy_Error) + lg, err := m.Acquire(context.Background(), spans, poison.Policy_Error, nil) if err != nil { panic(err) } @@ -122,7 +125,7 @@ func (m *Manager) MustAcquireChExt( ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, ) Attempt { errCh := make(chan error, 1) - lg, snap := m.sequence(spans, pp) + lg, snap := m.sequence(spans, pp, nil) go func() { err := m.wait(ctx, lg, snap) if err != nil { @@ -607,13 +610,13 @@ func TestLatchManagerOptimistic(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error) + lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS)), poison.Policy_Error) lg1, err := m.WaitUntilAcquired(context.Background(), lg1) require.NoError(t, err) // Optimistic acquire encounters conflict in some cases. - lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), poison.Policy_Error) + lg2 := m.AcquireOptimistic(spans("a", "e", read, zeroTS), poison.Policy_Error, nil) require.False(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "e", read, zeroTS))) require.True(t, m.CheckOptimisticNoConflicts(lg2, spans("a", "d", read, zeroTS))) waitUntilAcquiredCh := func(g *Guard) Attempt { @@ -630,7 +633,7 @@ func TestLatchManagerOptimistic(t *testing.T) { testLatchSucceeds(t, a2) // Optimistic acquire encounters conflict. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error, nil) require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) m.Release(lg2) // There is still a conflict even though lg2 has been released. @@ -642,7 +645,7 @@ func TestLatchManagerOptimistic(t *testing.T) { // Optimistic acquire for read below write encounters no conflict. oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} lg4 := m.MustAcquire(spans("c", "e", write, twoTS)) - lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error) + lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) lg5, err = m.WaitUntilAcquired(context.Background(), lg5) @@ -656,14 +659,14 @@ func TestLatchManagerWaitFor(t *testing.T) { var m Manager // Acquire latches, no conflict. - lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error) + lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error, nil) require.NoError(t, err) // See if WaitFor waits for above latch. waitForCh := func() Attempt { errCh := make(chan error) go func() { - errCh <- m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error) + errCh <- m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error, nil) }() return Attempt{errCh: errCh} } @@ -674,7 +677,7 @@ func TestLatchManagerWaitFor(t *testing.T) { // Optimistic acquire should _not_ encounter conflict - as WaitFor should // not lay any latches. - lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error) + lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) lg3, err = m.WaitUntilAcquired(context.Background(), lg3) require.NoError(t, err) @@ -722,7 +725,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { b.ResetTimer() for i := range spans { - lg, snap := m.sequence(&spans[i], poison.Policy_Error) + lg, snap := m.sequence(&spans[i], poison.Policy_Error, nil) snap.close() if len(lgBuf) == cap(lgBuf) { m.Release(<-lgBuf) @@ -741,3 +744,40 @@ func randBytes(n int) []byte { } return b } + +// TestSizeOfLatch tests the size of the latch struct. +func TestSizeOfLatch(t *testing.T) { + var la latch + size := int(unsafe.Sizeof(la)) + require.Equal(t, 96, size) +} + +// TestSizeOfLatchGuard tests the size of the latch Guard struct. +func TestSizeOfLatchGuard(t *testing.T) { + var lg Guard + size := int(unsafe.Sizeof(lg)) + require.Equal(t, 112, size) +} + +// TestLatchStringAndSafeformat tests the output of latch.SafeFormat. +func TestLatchStringAndSafeformat(t *testing.T) { + gr := &kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: roachpb.Key("a"), + }, + } + ba := &kvpb.BatchRequest{} + ba.Add(gr) + guard := new(Guard) + guard.baFmt = ba + la := &latch{ + g: guard, + span: span(11), + ts: hlc.Timestamp{WallTime: 10}, + next: nil, + prev: nil, + } + require.EqualValues(t, `00011{-\x00}@0.000000010,0 for request Get ["a"]`, la.String()) + require.EqualValues(t, `‹00011{-\x00}›@0.000000010,0 for request Get [‹"a"›]`, redact.Sprint(la)) + require.EqualValues(t, `‹×›@0.000000010,0 for request Get [‹×›]`, redact.Sprint(la).Redact()) +}