From 5c9ab2a9f2461f75066bd0de131d8a5450092fe4 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 4 Aug 2021 15:29:41 -0400 Subject: [PATCH 1/2] kv: include RangeID in rangefeed goroutine stacks This commit includes the RangeID in each of a rangefeed processor and its registations' associated goroutine stacks. This is a cheap and easy way to get better observability into the ranges that have active rangefeeds. It also tells us where those goroutines are spending their time. This will also become easier to use in Go 1.17, which improved the format of stack traces. --- .../kvclient/kvcoord/dist_sender_rangefeed.go | 4 +-- pkg/kv/kvserver/rangefeed/processor.go | 14 ++++++---- pkg/kv/kvserver/rangefeed/registry.go | 2 +- pkg/kv/kvserver/rangefeed/registry_test.go | 26 +++++++++---------- pkg/kv/kvserver/replica_rangefeed.go | 9 +++++++ pkg/kv/kvserver/replica_send.go | 2 +- 6 files changed, 35 insertions(+), 22 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 6215a2228b6e..c63d7aaaf5de 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed( case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the decriptor from the cache. + // Evict the descriptor from the cache. rangeInfo.token.Evict(ctx) return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh) default: @@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed( } // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed -// RPC call. Results will be send on the provided channel. Returns the timestamp +// RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish // the rangefeed with a larger starting timestamp, reflecting the fact that all // values up to the last checkpoint have already been observed. Returns the diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 0d56a46f0e63..bc1f835116ed 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error { // Config encompasses the configuration required to create a Processor. type Config struct { log.AmbientContext - Clock *hlc.Clock - Span roachpb.RSpan + Clock *hlc.Clock + RangeID roachpb.RangeID + Span roachpb.RSpan TxnPusher TxnPusher // PushTxnsInterval specifies the interval at which a Processor will push @@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { - p.run(ctx, rtsIterFunc, stopper) + p.run(ctx, p.RangeID, rtsIterFunc, stopper) }); err != nil { pErr := roachpb.NewError(err) p.reg.DisconnectWithErr(all, pErr) @@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor // run is called from Start and runs the rangefeed. func (p *Processor) run( - ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper, + ctx context.Context, + _forStacks roachpb.RangeID, + rtsIterFunc IteratorConstructor, + stopper *stop.Stopper, ) { defer close(p.stoppedC) ctx, cancelOutputLoops := context.WithCancel(ctx) @@ -256,7 +260,7 @@ func (p *Processor) run( // Run an output loop for the registry. runOutputLoop := func(ctx context.Context) { - r.runOutputLoop(ctx) + r.runOutputLoop(ctx, p.RangeID) select { case p.unregC <- &r: case <-p.stoppedC: diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index d6041ad62faf..cff3d8ce6af5 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -264,7 +264,7 @@ func (r *registration) outputLoop(ctx context.Context) error { } } -func (r *registration) runOutputLoop(ctx context.Context) { +func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) { r.mu.Lock() ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx) r.mu.Unlock() diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index c52983006bcf..ced4abaabdd9 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -142,7 +142,7 @@ func TestRegistrationBasic(t *testing.T) { noCatchupReg.publish(ev1) noCatchupReg.publish(ev2) require.Equal(t, len(noCatchupReg.buf), 2) - go noCatchupReg.runOutputLoop(context.Background()) + go noCatchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, noCatchupReg.waitForCaughtUp()) require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events()) noCatchupReg.disconnect(nil) @@ -158,7 +158,7 @@ func TestRegistrationBasic(t *testing.T) { catchupReg.publish(ev1) catchupReg.publish(ev2) require.Equal(t, len(catchupReg.buf), 2) - go catchupReg.runOutputLoop(context.Background()) + go catchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, catchupReg.waitForCaughtUp()) events := catchupReg.stream.Events() require.Equal(t, 6, len(events)) @@ -171,7 +171,7 @@ func TestRegistrationBasic(t *testing.T) { disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) disconnectReg.publish(ev1) disconnectReg.publish(ev2) - go disconnectReg.runOutputLoop(context.Background()) + go disconnectReg.runOutputLoop(context.Background(), 0) require.NoError(t, disconnectReg.waitForCaughtUp()) discErr := roachpb.NewError(fmt.Errorf("disconnection error")) disconnectReg.disconnect(discErr) @@ -183,7 +183,7 @@ func TestRegistrationBasic(t *testing.T) { for i := 0; i < cap(overflowReg.buf)+3; i++ { overflowReg.publish(ev1) } - go overflowReg.runOutputLoop(context.Background()) + go overflowReg.runOutputLoop(context.Background(), 0) err = <-overflowReg.errC require.Equal(t, newErrBufferCapacityExceeded(), err) require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) @@ -192,7 +192,7 @@ func TestRegistrationBasic(t *testing.T) { streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) streamErr := fmt.Errorf("stream error") streamErrReg.stream.SetSendErr(streamErr) - go streamErrReg.runOutputLoop(context.Background()) + go streamErrReg.runOutputLoop(context.Background(), 0) streamErrReg.publish(ev1) err = <-streamErrReg.errC require.Equal(t, streamErr.Error(), err.GoError().Error()) @@ -200,7 +200,7 @@ func TestRegistrationBasic(t *testing.T) { // Stream Context Canceled. streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) streamCancelReg.stream.Cancel() - go streamCancelReg.runOutputLoop(context.Background()) + go streamCancelReg.runOutputLoop(context.Background(), 0) require.NoError(t, streamCancelReg.waitForCaughtUp()) err = <-streamCancelReg.errC require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error()) @@ -337,10 +337,10 @@ func TestRegistryBasic(t *testing.T) { rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */) rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */) rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */) - go rAB.runOutputLoop(context.Background()) - go rBC.runOutputLoop(context.Background()) - go rCD.runOutputLoop(context.Background()) - go rAC.runOutputLoop(context.Background()) + go rAB.runOutputLoop(context.Background(), 0) + go rBC.runOutputLoop(context.Background(), 0) + go rCD.runOutputLoop(context.Background(), 0) + go rAC.runOutputLoop(context.Background(), 0) defer rAB.disconnect(nil) defer rBC.disconnect(nil) defer rCD.disconnect(nil) @@ -446,11 +446,11 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { reg := makeRegistry() rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */) - go rNoDiff.runOutputLoop(context.Background()) + go rNoDiff.runOutputLoop(context.Background(), 0) reg.Register(&rNoDiff.registration) rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */) - go rWithDiff.runOutputLoop(context.Background()) + go rWithDiff.runOutputLoop(context.Background(), 0) reg.Register(&rWithDiff.registration) key := roachpb.Key("a") @@ -498,7 +498,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { reg := makeRegistry() r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false) - go r.runOutputLoop(context.Background()) + go r.runOutputLoop(context.Background(), 0) reg.Register(&r.registration) // Publish a value with a timestamp beneath the registration's start diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 9cbca03220b4..046faa8a5437 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -140,6 +140,14 @@ func (i iteratorWithCloser) Close() { // of rangefeeds using catchup iterators at the same time. func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) *roachpb.Error { + return r.rangeFeedWithRangeID(r.RangeID, args, stream) +} + +func (r *Replica) rangeFeedWithRangeID( + _forStacks roachpb.RangeID, + args *roachpb.RangeFeedRequest, + stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", @@ -353,6 +361,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, Clock: r.Clock(), + RangeID: r.RangeID, Span: desc.RSpan(), TxnPusher: &tp, PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval, diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 63ee1599d717..046846d5098e 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -46,7 +46,7 @@ func (r *Replica) Send( // // github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...) func (r *Replica) sendWithRangeID( - ctx context.Context, rangeID roachpb.RangeID, ba *roachpb.BatchRequest, + ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { var br *roachpb.BatchResponse if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 { From df88282e3403d5ad92cb578ac636ac4d77aeb4d1 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 4 Aug 2021 18:16:01 -0400 Subject: [PATCH 2/2] kv/rangefeed: avoid race between new registration and processor shutdown This commit resolves a race between rangefeed registration connection and disconnection which could have resulted in a leaked goroutine. The call to `registration.runOutputLoop` is done in an async task, so it is not synchronized with the calls to `registration.disconnect` from the rangefeed processor (from `DisconnectWithErr`). Because these calls were not synchronized, it was possible for `disconnect` to see a nil `outputLoopCancelFn` and then shut down the stream. Then `runOutputLoop` could set `outputLoopCancelFn` and enter `registration.outputLoop`. I believe this could allow the output loop goroutine to leak, especially if it never sent anything on the stream. I don't think this relates to the resolved timestamp stall we're currently investigating because `registration.disconnect` would have still returned and error to the rangefeed stream during this race, so it would not have resulted in a rangefeed getting stalled indefinitely. Instead, only the outputLoop goroutine could have stalled, but we saw no indication of this in goroutine dumps. --- pkg/kv/kvserver/rangefeed/registry.go | 5 +++++ pkg/kv/kvserver/rangefeed/registry_test.go | 11 +++++++++++ 2 files changed, 16 insertions(+) diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index cff3d8ce6af5..db5c710f8c4e 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -266,6 +266,11 @@ func (r *registration) outputLoop(ctx context.Context) error { func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) { r.mu.Lock() + if r.mu.disconnected { + // The registration has already been disconnected. + r.mu.Unlock() + return + } ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx) r.mu.Unlock() err := r.outputLoop(ctx) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index ced4abaabdd9..f6db57ec6213 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -177,6 +177,17 @@ func TestRegistrationBasic(t *testing.T) { disconnectReg.disconnect(discErr) err := <-disconnectReg.errC require.Equal(t, discErr, err) + require.Equal(t, 2, len(disconnectReg.stream.Events())) + + // External Disconnect before output loop. + disconnectEarlyReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) + disconnectEarlyReg.publish(ev1) + disconnectEarlyReg.publish(ev2) + disconnectEarlyReg.disconnect(discErr) + go disconnectEarlyReg.runOutputLoop(context.Background(), 0) + err = <-disconnectEarlyReg.errC + require.Equal(t, discErr, err) + require.Equal(t, 0, len(disconnectEarlyReg.stream.Events())) // Overflow. overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)