diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 6215a2228b6e..c63d7aaaf5de 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed( case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT, roachpb.RangeFeedRetryError_REASON_RANGE_MERGED, roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: - // Evict the decriptor from the cache. + // Evict the descriptor from the cache. rangeInfo.token.Evict(ctx) return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh) default: @@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed( } // singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed -// RPC call. Results will be send on the provided channel. Returns the timestamp +// RPC call. Results will be sent on the provided channel. Returns the timestamp // of the maximum rangefeed checkpoint seen, which can be used to re-establish // the rangefeed with a larger starting timestamp, reflecting the fact that all // values up to the last checkpoint have already been observed. Returns the diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 0d56a46f0e63..bc1f835116ed 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error { // Config encompasses the configuration required to create a Processor. type Config struct { log.AmbientContext - Clock *hlc.Clock - Span roachpb.RSpan + Clock *hlc.Clock + RangeID roachpb.RangeID + Span roachpb.RSpan TxnPusher TxnPusher // PushTxnsInterval specifies the interval at which a Processor will push @@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { - p.run(ctx, rtsIterFunc, stopper) + p.run(ctx, p.RangeID, rtsIterFunc, stopper) }); err != nil { pErr := roachpb.NewError(err) p.reg.DisconnectWithErr(all, pErr) @@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor // run is called from Start and runs the rangefeed. func (p *Processor) run( - ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper, + ctx context.Context, + _forStacks roachpb.RangeID, + rtsIterFunc IteratorConstructor, + stopper *stop.Stopper, ) { defer close(p.stoppedC) ctx, cancelOutputLoops := context.WithCancel(ctx) @@ -256,7 +260,7 @@ func (p *Processor) run( // Run an output loop for the registry. runOutputLoop := func(ctx context.Context) { - r.runOutputLoop(ctx) + r.runOutputLoop(ctx, p.RangeID) select { case p.unregC <- &r: case <-p.stoppedC: diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index d6041ad62faf..db5c710f8c4e 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -264,8 +264,13 @@ func (r *registration) outputLoop(ctx context.Context) error { } } -func (r *registration) runOutputLoop(ctx context.Context) { +func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) { r.mu.Lock() + if r.mu.disconnected { + // The registration has already been disconnected. + r.mu.Unlock() + return + } ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx) r.mu.Unlock() err := r.outputLoop(ctx) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index c52983006bcf..f6db57ec6213 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -142,7 +142,7 @@ func TestRegistrationBasic(t *testing.T) { noCatchupReg.publish(ev1) noCatchupReg.publish(ev2) require.Equal(t, len(noCatchupReg.buf), 2) - go noCatchupReg.runOutputLoop(context.Background()) + go noCatchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, noCatchupReg.waitForCaughtUp()) require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events()) noCatchupReg.disconnect(nil) @@ -158,7 +158,7 @@ func TestRegistrationBasic(t *testing.T) { catchupReg.publish(ev1) catchupReg.publish(ev2) require.Equal(t, len(catchupReg.buf), 2) - go catchupReg.runOutputLoop(context.Background()) + go catchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, catchupReg.waitForCaughtUp()) events := catchupReg.stream.Events() require.Equal(t, 6, len(events)) @@ -171,19 +171,30 @@ func TestRegistrationBasic(t *testing.T) { disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) disconnectReg.publish(ev1) disconnectReg.publish(ev2) - go disconnectReg.runOutputLoop(context.Background()) + go disconnectReg.runOutputLoop(context.Background(), 0) require.NoError(t, disconnectReg.waitForCaughtUp()) discErr := roachpb.NewError(fmt.Errorf("disconnection error")) disconnectReg.disconnect(discErr) err := <-disconnectReg.errC require.Equal(t, discErr, err) + require.Equal(t, 2, len(disconnectReg.stream.Events())) + + // External Disconnect before output loop. + disconnectEarlyReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) + disconnectEarlyReg.publish(ev1) + disconnectEarlyReg.publish(ev2) + disconnectEarlyReg.disconnect(discErr) + go disconnectEarlyReg.runOutputLoop(context.Background(), 0) + err = <-disconnectEarlyReg.errC + require.Equal(t, discErr, err) + require.Equal(t, 0, len(disconnectEarlyReg.stream.Events())) // Overflow. overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) for i := 0; i < cap(overflowReg.buf)+3; i++ { overflowReg.publish(ev1) } - go overflowReg.runOutputLoop(context.Background()) + go overflowReg.runOutputLoop(context.Background(), 0) err = <-overflowReg.errC require.Equal(t, newErrBufferCapacityExceeded(), err) require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events())) @@ -192,7 +203,7 @@ func TestRegistrationBasic(t *testing.T) { streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) streamErr := fmt.Errorf("stream error") streamErrReg.stream.SetSendErr(streamErr) - go streamErrReg.runOutputLoop(context.Background()) + go streamErrReg.runOutputLoop(context.Background(), 0) streamErrReg.publish(ev1) err = <-streamErrReg.errC require.Equal(t, streamErr.Error(), err.GoError().Error()) @@ -200,7 +211,7 @@ func TestRegistrationBasic(t *testing.T) { // Stream Context Canceled. streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) streamCancelReg.stream.Cancel() - go streamCancelReg.runOutputLoop(context.Background()) + go streamCancelReg.runOutputLoop(context.Background(), 0) require.NoError(t, streamCancelReg.waitForCaughtUp()) err = <-streamCancelReg.errC require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error()) @@ -337,10 +348,10 @@ func TestRegistryBasic(t *testing.T) { rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */) rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */) rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */) - go rAB.runOutputLoop(context.Background()) - go rBC.runOutputLoop(context.Background()) - go rCD.runOutputLoop(context.Background()) - go rAC.runOutputLoop(context.Background()) + go rAB.runOutputLoop(context.Background(), 0) + go rBC.runOutputLoop(context.Background(), 0) + go rCD.runOutputLoop(context.Background(), 0) + go rAC.runOutputLoop(context.Background(), 0) defer rAB.disconnect(nil) defer rBC.disconnect(nil) defer rCD.disconnect(nil) @@ -446,11 +457,11 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { reg := makeRegistry() rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */) - go rNoDiff.runOutputLoop(context.Background()) + go rNoDiff.runOutputLoop(context.Background(), 0) reg.Register(&rNoDiff.registration) rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */) - go rWithDiff.runOutputLoop(context.Background()) + go rWithDiff.runOutputLoop(context.Background(), 0) reg.Register(&rWithDiff.registration) key := roachpb.Key("a") @@ -498,7 +509,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { reg := makeRegistry() r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false) - go r.runOutputLoop(context.Background()) + go r.runOutputLoop(context.Background(), 0) reg.Register(&r.registration) // Publish a value with a timestamp beneath the registration's start diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 9cbca03220b4..046faa8a5437 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -140,6 +140,14 @@ func (i iteratorWithCloser) Close() { // of rangefeeds using catchup iterators at the same time. func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) *roachpb.Error { + return r.rangeFeedWithRangeID(r.RangeID, args, stream) +} + +func (r *Replica) rangeFeedWithRangeID( + _forStacks roachpb.RangeID, + args *roachpb.RangeFeedRequest, + stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", @@ -353,6 +361,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( cfg := rangefeed.Config{ AmbientContext: r.AmbientContext, Clock: r.Clock(), + RangeID: r.RangeID, Span: desc.RSpan(), TxnPusher: &tp, PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval, diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 63ee1599d717..046846d5098e 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -46,7 +46,7 @@ func (r *Replica) Send( // // github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...) func (r *Replica) sendWithRangeID( - ctx context.Context, rangeID roachpb.RangeID, ba *roachpb.BatchRequest, + ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { var br *roachpb.BatchResponse if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {