Skip to content

Commit

Permalink
kv: include conflicting request information in latch manager traces/logs
Browse files Browse the repository at this point in the history
This commit logs latch waiter and latch holder's request information when latch
conflicts. This is achieved by adding redact.SafeFormatter from batch request
into concurrency.Request and pass it down to latch Guard. Since the Guard struct
embedded the signal struct, we will store store a pointer to the latch guard
in latch struct instead.

Fixes: cockroachdb#114601

Relase note: None
  • Loading branch information
lyang24 committed Jan 6, 2024
1 parent a8d8066 commit ba13697
Show file tree
Hide file tree
Showing 20 changed files with 222 additions and 101 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/redact"
)

// Manager is a structure that sequences incoming requests and provides
Expand Down Expand Up @@ -432,6 +433,10 @@ type Request struct {
// passed to SequenceReq. Only supplied to SequenceReq if the method is
// not also passed an exiting Guard.
LockSpans *lockspanset.LockSpanSet

// The SafeFormatter capable of formatting the request. This is used to enrich
// logging with request level information when latches conflict.
BaFmt redact.SafeFormatter
}

// Guard is returned from Manager.SequenceReq. The guard is passed back in to
Expand Down Expand Up @@ -510,7 +515,7 @@ type latchManager interface {
// WaitFor waits for conflicting latches on the specified spans without adding
// any latches itself. Fast path for operations that only require flushing out
// old operations without blocking any new ones.
WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy) *Error
WaitFor(ctx context.Context, spans *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter) *Error

// Poison a guard's latches, allowing waiters to fail fast.
Poison(latchGuard)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (m *managerImpl) sequenceReqWithGuard(
// them.
if shouldWaitOnLatchesWithoutAcquiring(g.Req) {
log.Event(ctx, "waiting on latches without acquiring")
return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy)
return nil, m.lm.WaitFor(ctx, g.Req.LatchSpans, g.Req.PoisonPolicy, g.Req.BaFmt)
}

// Provide the manager with an opportunity to intercept the request. It
Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,25 +194,33 @@ func TestConcurrencyManagerBasic(t *testing.T) {
if d.HasArg("max-lock-wait-queue-length") {
d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength)
}

ba := &kvpb.BatchRequest{}
pp := scanPoisonPolicy(t, d)

// Each kvpb.Request is provided on an indented line.
reqs, reqUnions := scanRequests(t, d, c)
ba.Txn = txn
ba.Timestamp = ts
ba.UserPriority = priority
ba.ReadConsistency = readConsistency
ba.WaitPolicy = waitPolicy
ba.LockTimeout = lockTimeout
ba.Requests = reqUnions
latchSpans, lockSpans := c.collectSpans(t, txn, ts, waitPolicy, reqs)

c.requestsByName[reqName] = concurrency.Request{
Txn: txn,
Timestamp: ts,
NonTxnPriority: priority,
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
LockTimeout: lockTimeout,
Txn: ba.Txn,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
ReadConsistency: ba.ReadConsistency,
WaitPolicy: ba.WaitPolicy,
LockTimeout: ba.LockTimeout,
Requests: ba.Requests,
MaxLockWaitQueueLength: maxLockWaitQueueLength,
Requests: reqUnions,
LatchSpans: latchSpans,
LockSpans: lockSpans,
PoisonPolicy: pp,
BaFmt: ba,
}
return ""

Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/concurrency/latch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanlatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/redact"
)

// latchManagerImpl implements the latchManager interface.
Expand All @@ -25,15 +26,15 @@ type latchManagerImpl struct {
}

func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) {
lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy)
lg, err := m.m.Acquire(ctx, req.LatchSpans, req.PoisonPolicy, req.BaFmt)
if err != nil {
return nil, kvpb.NewError(err)
}
return lg, nil
}

func (m *latchManagerImpl) AcquireOptimistic(req Request) latchGuard {
lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy)
lg := m.m.AcquireOptimistic(req.LatchSpans, req.PoisonPolicy, req.BaFmt)
return lg
}

Expand All @@ -52,9 +53,9 @@ func (m *latchManagerImpl) WaitUntilAcquired(
}

func (m *latchManagerImpl) WaitFor(
ctx context.Context, ss *spanset.SpanSet, pp poison.Policy,
ctx context.Context, ss *spanset.SpanSet, pp poison.Policy, baFmt redact.SafeFormatter,
) *Error {
err := m.m.WaitFor(ctx, ss, pp)
err := m.m.WaitFor(ctx, ss, pp, baFmt)
if err != nil {
return kvpb.NewError(err)
}
Expand Down
44 changes: 33 additions & 11 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison"
Expand Down Expand Up @@ -321,21 +322,26 @@ func TestLockTableBasic(t *testing.T) {
d.ScanArgs(t, "max-lock-wait-queue-length", &maxLockWaitQueueLength)
}
latchSpans, lockSpans := scanSpans(t, d, ts)
ba := &kvpb.BatchRequest{}
ba.Timestamp = ts
ba.WaitPolicy = waitPolicy
req := Request{
Timestamp: ts,
WaitPolicy: waitPolicy,
Timestamp: ba.Timestamp,
WaitPolicy: ba.WaitPolicy,
MaxLockWaitQueueLength: maxLockWaitQueueLength,
LatchSpans: latchSpans,
LockSpans: lockSpans,
BaFmt: ba,
}
if txnMeta != nil {
// Update the transaction's timestamp, if necessary. The transaction
// may have needed to move its timestamp for any number of reasons.
txnMeta.WriteTimestamp = ts
req.Txn = &roachpb.Transaction{
ba.Txn = &roachpb.Transaction{
TxnMeta: *txnMeta,
ReadTimestamp: ts,
}
req.Txn = ba.Txn
}
requestsByName[reqName] = req
return ""
Expand Down Expand Up @@ -874,10 +880,13 @@ func TestLockTableMaxLocks(t *testing.T) {
latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: k}, hlc.Timestamp{WallTime: 1})
lockSpans.Add(lock.Intent, roachpb.Span{Key: k})
}
ba := &kvpb.BatchRequest{}
ba.Timestamp = hlc.Timestamp{WallTime: 1}
req := Request{
Timestamp: hlc.Timestamp{WallTime: 1},
Timestamp: ba.Timestamp,
LatchSpans: latchSpans,
LockSpans: lockSpans,
BaFmt: ba,
}
reqs = append(reqs, req)
ltg, err := lt.ScanAndEnqueue(req, nil)
Expand Down Expand Up @@ -1011,10 +1020,13 @@ func TestLockTableMaxLocksWithMultipleNotRemovableRefs(t *testing.T) {
}
latchSpans.AddMVCC(spanset.SpanReadWrite, roachpb.Span{Key: key}, hlc.Timestamp{WallTime: 1})
lockSpans.Add(lock.Intent, roachpb.Span{Key: key})
ba := &kvpb.BatchRequest{}
ba.Timestamp = hlc.Timestamp{WallTime: 1}
req := Request{
Timestamp: hlc.Timestamp{WallTime: 1},
Timestamp: ba.Timestamp,
LatchSpans: latchSpans,
LockSpans: lockSpans,
BaFmt: ba,
}
ltg, err := lt.ScanAndEnqueue(req, nil)
require.Nil(t, err)
Expand Down Expand Up @@ -1111,7 +1123,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
// cancellation, the code makes sure to release latches when returning
// early due to error. Otherwise other requests will get stuck and
// group.Wait() will not return until the test times out.
lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error)
lg, err = e.lm.Acquire(context.Background(), item.request.LatchSpans, poison.Policy_Error, item.request.BaFmt)
if err != nil {
return err
}
Expand Down Expand Up @@ -1519,11 +1531,15 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
ReadTimestamp: ts,
}
}
ba := &kvpb.BatchRequest{}
ba.Txn = txn
ba.Timestamp = ts
request := &Request{
Txn: txn,
Timestamp: ts,
Txn: ba.Txn,
Timestamp: ba.Timestamp,
LatchSpans: latchSpans,
LockSpans: lockSpans,
BaFmt: ba,
}
items = append(items, workloadItem{request: request})
if txn != nil {
Expand Down Expand Up @@ -1601,10 +1617,13 @@ func TestLockTableConcurrentRequests(t *testing.T) {
lockSpans := &lockspanset.LockSpanSet{}
onlyReads := txnMeta == nil && rng.Intn(2) != 0
numKeys := rng.Intn(len(keys)-1) + 1
ba := &kvpb.BatchRequest{}
ba.Timestamp = ts
request := &Request{
Timestamp: ts,
Timestamp: ba.Timestamp,
LatchSpans: latchSpans,
LockSpans: lockSpans,
BaFmt: ba,
}
if txnMeta != nil {
request.Txn = &roachpb.Transaction{
Expand Down Expand Up @@ -1691,7 +1710,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) {
var err error
firstIter := true
for {
if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil {
if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil {
doneCh <- err
return
}
Expand Down Expand Up @@ -1738,7 +1757,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) {
return
}
// Release locks.
if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error); err != nil {
if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil {
doneCh <- err
return
}
Expand Down Expand Up @@ -1766,11 +1785,14 @@ func createRequests(index int, numOutstanding int, numKeys int, numReadKeys int)
ts := hlc.Timestamp{WallTime: 10}
latchSpans := &spanset.SpanSet{}
lockSpans := &lockspanset.LockSpanSet{}
ba := &kvpb.BatchRequest{}
ba.Timestamp = ts
wi := benchWorkItem{
Request: Request{
Timestamp: ts,
LatchSpans: latchSpans,
LockSpans: lockSpans,
BaFmt: ba,
},
}
for i := 0; i < numKeys; i++ {
Expand Down
58 changes: 44 additions & 14 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ func TestLockTableWaiterWithTxn(t *testing.T) {
if synthetic {
txn.ReadTimestamp = txn.ReadTimestamp.WithSynthetic(true)
}
ba := &kvpb.BatchRequest{}
ba.Txn = &txn
ba.Timestamp = txn.ReadTimestamp
return Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
Timestamp: ba.Timestamp,
BaFmt: ba,
}
}

Expand Down Expand Up @@ -241,9 +245,13 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) {

reqHeaderTS := hlc.Timestamp{WallTime: 10}
makeReq := func() Request {
ba := &kvpb.BatchRequest{}
ba.Timestamp = reqHeaderTS
ba.UserPriority = roachpb.NormalUserPriority
return Request{
Timestamp: reqHeaderTS,
NonTxnPriority: roachpb.NormalUserPriority,
Timestamp: ba.Timestamp,
NonTxnPriority: ba.UserPriority,
BaFmt: ba,
}
}

Expand Down Expand Up @@ -441,10 +449,15 @@ func TestLockTableWaiterWithErrorWaitPolicy(t *testing.T) {
makeReq := func() Request {
txn := makeTxnProto("request")
txn.GlobalUncertaintyLimit = uncertaintyLimit
ba := &kvpb.BatchRequest{}
ba.Txn = &txn
ba.Timestamp = txn.ReadTimestamp
ba.WaitPolicy = lock.WaitPolicy_Error
return Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
WaitPolicy: lock.WaitPolicy_Error,
Timestamp: ba.Timestamp,
WaitPolicy: ba.WaitPolicy,
BaFmt: ba,
}
}
makeHighPriReq := func() Request {
Expand Down Expand Up @@ -608,17 +621,26 @@ func TestLockTableWaiterWithLockTimeout(t *testing.T) {
const lockTimeout = 1 * time.Millisecond
makeReq := func() Request {
txn := makeTxnProto("request")
ba := &kvpb.BatchRequest{}
ba.Txn = &txn
ba.Timestamp = txn.ReadTimestamp
ba.LockTimeout = lockTimeout
return Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
LockTimeout: lockTimeout,
Txn: ba.Txn,
Timestamp: ba.Timestamp,
LockTimeout: ba.LockTimeout,
BaFmt: ba,
}
}
if !txn {
makeReq = func() Request {
ba := &kvpb.BatchRequest{}
ba.Timestamp = hlc.Timestamp{WallTime: 10}
ba.LockTimeout = lockTimeout
return Request{
Timestamp: hlc.Timestamp{WallTime: 10},
LockTimeout: lockTimeout,
Timestamp: ba.Timestamp,
LockTimeout: ba.LockTimeout,
BaFmt: ba,
}
}
}
Expand Down Expand Up @@ -793,9 +815,13 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
err2 := kvpb.NewErrorf("error2")

txn := makeTxnProto("request")
ba := &kvpb.BatchRequest{}
ba.Txn = &txn
ba.Timestamp = txn.ReadTimestamp
req := Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
Txn: ba.Txn,
Timestamp: ba.Timestamp,
BaFmt: ba,
}

// Test with both synchronous and asynchronous pushes.
Expand Down Expand Up @@ -849,9 +875,13 @@ func TestLockTableWaiterDeferredIntentResolverError(t *testing.T) {
defer w.stopper.Stop(ctx)

txn := makeTxnProto("request")
ba := &kvpb.BatchRequest{}
ba.Txn = &txn
ba.Timestamp = txn.ReadTimestamp
req := Request{
Txn: &txn,
Timestamp: txn.ReadTimestamp,
Txn: ba.Txn,
Timestamp: ba.Timestamp,
BaFmt: ba,
}
keyA := roachpb.Key("keyA")
pusheeTxn := makeTxnProto("pushee")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sequence req=barrier2
----
[2] sequence barrier2: sequencing request
[2] sequence barrier2: waiting on latches without acquiring
[2] sequence barrier2: waiting to acquire write latch ‹{a-f}›@0,0, held by read latch ‹c›@15.000000000,1
[2] sequence barrier2: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by read latch ‹c›@15.000000000,1 for request Get [‹"c"›]
[2] sequence barrier2: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=read1
Expand Down Expand Up @@ -96,7 +96,7 @@ sequence req=barrier1
----
[2] sequence barrier1: sequencing request
[2] sequence barrier1: waiting on latches without acquiring
[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0, held by read latch ‹c›@10.000000000,1
[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by read latch ‹c›@10.000000000,1 for request Get [‹"c"›]
[2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal

finish req=read1
Expand Down Expand Up @@ -143,7 +143,7 @@ sequence req=barrier1
----
[2] sequence barrier1: sequencing request
[2] sequence barrier1: waiting on latches without acquiring
[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0, held by write latch ‹c›@10.000000000,1
[2] sequence barrier1: waiting to acquire write latch ‹{a-f}›@0,0 for request Barrier [‹"a"›,‹"f"›), held by write latch ‹c›@10.000000000,1 for request Put [‹"c"›], [txn: 00000001]
[2] sequence barrier1: blocked on select in spanlatch.(*Manager).waitForSignal

debug-latch-manager
Expand Down
Loading

0 comments on commit ba13697

Please sign in to comment.