Skip to content

Commit

Permalink
Merge pull request cockroachdb#68509 from nvanbenschoten/backport21.1…
Browse files Browse the repository at this point in the history
…-68442-68456

release-21.1: kv: include RangeID in rangefeed goroutine stacks
  • Loading branch information
nvanbenschoten committed Aug 27, 2021
2 parents d1b0369 + df88282 commit e079457
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 22 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Expand Up @@ -189,7 +189,7 @@ func (ds *DistSender) partialRangeFeed(
case roachpb.RangeFeedRetryError_REASON_RANGE_SPLIT,
roachpb.RangeFeedRetryError_REASON_RANGE_MERGED,
roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER:
// Evict the decriptor from the cache.
// Evict the descriptor from the cache.
rangeInfo.token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh)
default:
Expand All @@ -204,7 +204,7 @@ func (ds *DistSender) partialRangeFeed(
}

// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
// RPC call. Results will be send on the provided channel. Returns the timestamp
// RPC call. Results will be sent on the provided channel. Returns the timestamp
// of the maximum rangefeed checkpoint seen, which can be used to re-establish
// the rangefeed with a larger starting timestamp, reflecting the fact that all
// values up to the last checkpoint have already been observed. Returns the
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/rangefeed/processor.go
Expand Up @@ -50,8 +50,9 @@ func newErrBufferCapacityExceeded() *roachpb.Error {
// Config encompasses the configuration required to create a Processor.
type Config struct {
log.AmbientContext
Clock *hlc.Clock
Span roachpb.RSpan
Clock *hlc.Clock
RangeID roachpb.RangeID
Span roachpb.RSpan

TxnPusher TxnPusher
// PushTxnsInterval specifies the interval at which a Processor will push
Expand Down Expand Up @@ -193,7 +194,7 @@ type IteratorConstructor func() storage.SimpleMVCCIterator
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, rtsIterFunc, stopper)
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
}); err != nil {
pErr := roachpb.NewError(err)
p.reg.DisconnectWithErr(all, pErr)
Expand All @@ -203,7 +204,10 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor

// run is called from Start and runs the rangefeed.
func (p *Processor) run(
ctx context.Context, rtsIterFunc IteratorConstructor, stopper *stop.Stopper,
ctx context.Context,
_forStacks roachpb.RangeID,
rtsIterFunc IteratorConstructor,
stopper *stop.Stopper,
) {
defer close(p.stoppedC)
ctx, cancelOutputLoops := context.WithCancel(ctx)
Expand Down Expand Up @@ -256,7 +260,7 @@ func (p *Processor) run(

// Run an output loop for the registry.
runOutputLoop := func(ctx context.Context) {
r.runOutputLoop(ctx)
r.runOutputLoop(ctx, p.RangeID)
select {
case p.unregC <- &r:
case <-p.stoppedC:
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/rangefeed/registry.go
Expand Up @@ -264,8 +264,13 @@ func (r *registration) outputLoop(ctx context.Context) error {
}
}

func (r *registration) runOutputLoop(ctx context.Context) {
func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.RangeID) {
r.mu.Lock()
if r.mu.disconnected {
// The registration has already been disconnected.
r.mu.Unlock()
return
}
ctx, r.mu.outputLoopCancelFn = context.WithCancel(ctx)
r.mu.Unlock()
err := r.outputLoop(ctx)
Expand Down
37 changes: 24 additions & 13 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Expand Up @@ -142,7 +142,7 @@ func TestRegistrationBasic(t *testing.T) {
noCatchupReg.publish(ev1)
noCatchupReg.publish(ev2)
require.Equal(t, len(noCatchupReg.buf), 2)
go noCatchupReg.runOutputLoop(context.Background())
go noCatchupReg.runOutputLoop(context.Background(), 0)
require.NoError(t, noCatchupReg.waitForCaughtUp())
require.Equal(t, []*roachpb.RangeFeedEvent{ev1, ev2}, noCatchupReg.stream.Events())
noCatchupReg.disconnect(nil)
Expand All @@ -158,7 +158,7 @@ func TestRegistrationBasic(t *testing.T) {
catchupReg.publish(ev1)
catchupReg.publish(ev2)
require.Equal(t, len(catchupReg.buf), 2)
go catchupReg.runOutputLoop(context.Background())
go catchupReg.runOutputLoop(context.Background(), 0)
require.NoError(t, catchupReg.waitForCaughtUp())
events := catchupReg.stream.Events()
require.Equal(t, 6, len(events))
Expand All @@ -171,19 +171,30 @@ func TestRegistrationBasic(t *testing.T) {
disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
disconnectReg.publish(ev1)
disconnectReg.publish(ev2)
go disconnectReg.runOutputLoop(context.Background())
go disconnectReg.runOutputLoop(context.Background(), 0)
require.NoError(t, disconnectReg.waitForCaughtUp())
discErr := roachpb.NewError(fmt.Errorf("disconnection error"))
disconnectReg.disconnect(discErr)
err := <-disconnectReg.errC
require.Equal(t, discErr, err)
require.Equal(t, 2, len(disconnectReg.stream.Events()))

// External Disconnect before output loop.
disconnectEarlyReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
disconnectEarlyReg.publish(ev1)
disconnectEarlyReg.publish(ev2)
disconnectEarlyReg.disconnect(discErr)
go disconnectEarlyReg.runOutputLoop(context.Background(), 0)
err = <-disconnectEarlyReg.errC
require.Equal(t, discErr, err)
require.Equal(t, 0, len(disconnectEarlyReg.stream.Events()))

// Overflow.
overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
for i := 0; i < cap(overflowReg.buf)+3; i++ {
overflowReg.publish(ev1)
}
go overflowReg.runOutputLoop(context.Background())
go overflowReg.runOutputLoop(context.Background(), 0)
err = <-overflowReg.errC
require.Equal(t, newErrBufferCapacityExceeded(), err)
require.Equal(t, cap(overflowReg.buf), len(overflowReg.Events()))
Expand All @@ -192,15 +203,15 @@ func TestRegistrationBasic(t *testing.T) {
streamErrReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
streamErr := fmt.Errorf("stream error")
streamErrReg.stream.SetSendErr(streamErr)
go streamErrReg.runOutputLoop(context.Background())
go streamErrReg.runOutputLoop(context.Background(), 0)
streamErrReg.publish(ev1)
err = <-streamErrReg.errC
require.Equal(t, streamErr.Error(), err.GoError().Error())

// Stream Context Canceled.
streamCancelReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false)
streamCancelReg.stream.Cancel()
go streamCancelReg.runOutputLoop(context.Background())
go streamCancelReg.runOutputLoop(context.Background(), 0)
require.NoError(t, streamCancelReg.waitForCaughtUp())
err = <-streamCancelReg.errC
require.Equal(t, streamCancelReg.stream.Context().Err().Error(), err.GoError().Error())
Expand Down Expand Up @@ -337,10 +348,10 @@ func TestRegistryBasic(t *testing.T) {
rBC := newTestRegistration(spBC, hlc.Timestamp{}, nil, true /* withDiff */)
rCD := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */)
rAC := newTestRegistration(spAC, hlc.Timestamp{}, nil, false /* withDiff */)
go rAB.runOutputLoop(context.Background())
go rBC.runOutputLoop(context.Background())
go rCD.runOutputLoop(context.Background())
go rAC.runOutputLoop(context.Background())
go rAB.runOutputLoop(context.Background(), 0)
go rBC.runOutputLoop(context.Background(), 0)
go rCD.runOutputLoop(context.Background(), 0)
go rAC.runOutputLoop(context.Background(), 0)
defer rAB.disconnect(nil)
defer rBC.disconnect(nil)
defer rCD.disconnect(nil)
Expand Down Expand Up @@ -446,11 +457,11 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) {
reg := makeRegistry()

rNoDiff := newTestRegistration(spAB, hlc.Timestamp{}, nil, false /* withDiff */)
go rNoDiff.runOutputLoop(context.Background())
go rNoDiff.runOutputLoop(context.Background(), 0)
reg.Register(&rNoDiff.registration)

rWithDiff := newTestRegistration(spCD, hlc.Timestamp{}, nil, true /* withDiff */)
go rWithDiff.runOutputLoop(context.Background())
go rWithDiff.runOutputLoop(context.Background(), 0)
reg.Register(&rWithDiff.registration)

key := roachpb.Key("a")
Expand Down Expand Up @@ -498,7 +509,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) {
reg := makeRegistry()

r := newTestRegistration(spAB, hlc.Timestamp{WallTime: 10}, nil, false)
go r.runOutputLoop(context.Background())
go r.runOutputLoop(context.Background(), 0)
reg.Register(&r.registration)

// Publish a value with a timestamp beneath the registration's start
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed.go
Expand Up @@ -140,6 +140,14 @@ func (i iteratorWithCloser) Close() {
// of rangefeeds using catchup iterators at the same time.
func (r *Replica) RangeFeed(
args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
return r.rangeFeedWithRangeID(r.RangeID, args, stream)
}

func (r *Replica) rangeFeedWithRangeID(
_forStacks roachpb.RangeID,
args *roachpb.RangeFeedRequest,
stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
if !r.isSystemRange() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
Expand Down Expand Up @@ -353,6 +361,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
cfg := rangefeed.Config{
AmbientContext: r.AmbientContext,
Clock: r.Clock(),
RangeID: r.RangeID,
Span: desc.RSpan(),
TxnPusher: &tp,
PushTxnsInterval: r.store.TestingKnobs().RangeFeedPushTxnsInterval,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Expand Up @@ -46,7 +46,7 @@ func (r *Replica) Send(
//
// github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...)
func (r *Replica) sendWithRangeID(
ctx context.Context, rangeID roachpb.RangeID, ba *roachpb.BatchRequest,
ctx context.Context, _forStacks roachpb.RangeID, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
var br *roachpb.BatchResponse
if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 {
Expand Down

0 comments on commit e079457

Please sign in to comment.