Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to default pending and default error handler. #607

Merged
merged 3 commits into from Nov 16, 2020

Conversation

derekcollison
Copy link
Member

Slow consumer state still seems off, this is a first step trying to improve.
Increased the default number of pending messages limit but kept bytes the same.

Also introduced a default ErrHandler that will print out to stderr in case none has been set.

Someone needs to look deeper into why we are so imbalanced when receiving messages like queued up JetStream messages for a consumer.

Signed-off-by: Derek Collison derek@nats.io

Slow consumer state still seems off, this is a first step trying to improve.
Increased the default number of pending messages limit but kept bytes the same.

Also introduced a default ErrHandler that will print out to stderr in case none has been set.

Signed-off-by: Derek Collison <derek@nats.io>
nats.go Outdated
if nc != nil {
nc.mu.RLock()
cid = nc.info.CID
defer nc.mu.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that you keep the connection's lock until the end of this function? Error handlers are already serialized, if that was the concern.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, will fix.

sub := nc.subs[nc.ps.ma.sid]
nc.subsMu.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was protected by the subsMu so that remove could not happen while processing a message. I believe this is the reason we now can get the panic:

=== RUN   TestCloseChanOnSubscriber
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x4bcb30]
goroutine 6453 [running]:
sync.(*Cond).Signal(0x0)
	/home/travis/.gimme/versions/go1.15.5.linux.amd64/src/sync/cond.go:65 +0x30
github.com/nats-io/nats%2ego.(*Conn).processMsg(0xc000702600, 0xc0005a000d, 0x5, 0x7ff3)
	/home/travis/gopath/src/github.com/nats-io/nats.go/nats.go:2498 +0xc7a
github.com/nats-io/nats%2ego.(*Conn).parse(0xc000702600, 0xc0005a0000, 0x398, 0x8000, 0x398, 0x0)
	/home/travis/gopath/src/github.com/nats-io/nats.go/parser.go:192 +0x26e5
github.com/nats-io/nats%2ego.(*Conn).readLoop(0xc000702600)
	/home/travis/gopath/src/github.com/nats-io/nats.go/nats.go:2323 +0x279
created by github.com/nats-io/nats%2ego.(*Conn).processConnectInit
	/home/travis/gopath/src/github.com/nats-io/nats.go/nats.go:1639 +0x32b
FAIL	github.com/nats-io/nats.go/test	42.511s

You need to check for sub.closed line 2466, after lock the sub. If closed, return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, for this test: TestUnsubscribeChanOnSubscriber, I also see the slow consumer error reported in some cases, so may have to set dummy error handler there too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will take a deeper look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fixed.

norace_test.go Show resolved Hide resolved
@@ -908,10 +918,19 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
switch typeSubs {
case 0:
sub, err = nc.ChanSubscribe("foo", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought limits did not apply to Channel subscribe, because the limit is really whatever the user sets for the provided channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test expects an error since its checking for err == nil but the t.Fatalf descriptions are wrong, will fix.

Signed-off-by: Derek Collison <derek@nats.io>
@coveralls
Copy link

coveralls commented Nov 16, 2020

Coverage Status

Coverage decreased (-0.3%) to 88.857% when pulling ae67e0e on slowconsumer into 0482788 on master.

Signed-off-by: Derek Collison <derek@nats.io>
Copy link
Member

@kozlovic kozlovic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@derekcollison derekcollison merged commit a35836e into master Nov 16, 2020
@wallyqs wallyqs deleted the slowconsumer branch November 20, 2020 06:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants