Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to default pending and default error handler. #607

Merged
merged 3 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io

[![License Apache 2](https://img.shields.io/badge/License-Apache2-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats.svg?type=shield)](https://app.fossa.io/projects/git%2Bgithub.com%2Fnats-io%2Fgo-nats?ref=badge_shield)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.org/nats-io/nats.go.svg?branch=master)](http://travis-ci.org/nats-io/nats.go) [![GoDoc](https://img.shields.io/badge/GoDoc-reference-007d9c)](https://pkg.go.dev/github.com/nats-io/nats.go)
[![Go Report Card](https://goreportcard.com/badge/github.com/nats-io/nats.go)](https://goreportcard.com/report/github.com/nats-io/nats.go) [![Build Status](https://travis-ci.com/nats-io/nats.go.svg?branch=master)](http://travis-ci.com/nats-io/nats.go) [![GoDoc](https://img.shields.io/badge/GoDoc-reference-007d9c)](https://pkg.go.dev/github.com/nats-io/nats.go)
[![Coverage Status](https://coveralls.io/repos/nats-io/nats.go/badge.svg?branch=master)](https://coveralls.io/r/nats-io/nats.go?branch=master)

## Installation
Expand Down
6 changes: 5 additions & 1 deletion enc_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -45,6 +45,10 @@ func TestPublishErrorAfterSubscribeDecodeError(t *testing.T) {
opts := options
nc, _ := opts.Connect()
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

c, _ := NewEncodedConn(nc, JSON_ENCODER)

//Test message type
Expand Down
53 changes: 41 additions & 12 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
DefaultJetStreamTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultMaxChanLen = 8 * 1024 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
Expand Down Expand Up @@ -1164,6 +1164,11 @@ func (o Options) Connect() (*Conn, error) {
nc.ach = &asyncCallbacksHandler{}
nc.ach.cond = sync.NewCond(&nc.ach.mu)

// Set a default error handler that will print to stderr.
if nc.Opts.AsyncErrorCB == nil {
nc.Opts.AsyncErrorCB = defaultErrHandler
}

if err := nc.connect(); err != nil {
return nil, err
}
Expand All @@ -1174,6 +1179,22 @@ func (o Options) Connect() (*Conn, error) {
return nc, nil
}

func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
var cid uint64
if nc != nil {
nc.mu.RLock()
cid = nc.info.CID
nc.mu.RUnlock()
}
var errStr string
if sub != nil {
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
} else {
errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
}
os.Stderr.WriteString(errStr)
}

const (
_CRLF_ = "\r\n"
_EMPTY_ = ""
Expand Down Expand Up @@ -2393,18 +2414,18 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
// or the pending queue is over the pending limits, the connection is
// considered a slow consumer.
func (nc *Conn) processMsg(data []byte) {
// Don't lock the connection to avoid server cutting us off if the
// flusher is holding the connection lock, trying to send to the server
// that is itself trying to send data to us.
nc.subsMu.RLock()

// Stats
atomic.AddUint64(&nc.InMsgs, 1)
atomic.AddUint64(&nc.InBytes, uint64(len(data)))

// Don't lock the connection to avoid server cutting us off if the
// flusher is holding the connection lock, trying to send to the server
// that is itself trying to send data to us.
nc.subsMu.RLock()
sub := nc.subs[nc.ps.ma.sid]
nc.subsMu.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was protected by the subsMu so that remove could not happen while processing a message. I believe this is the reason we now can get the panic:

=== RUN   TestCloseChanOnSubscriber
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x4bcb30]
goroutine 6453 [running]:
sync.(*Cond).Signal(0x0)
	/home/travis/.gimme/versions/go1.15.5.linux.amd64/src/sync/cond.go:65 +0x30
github.com/nats-io/nats%2ego.(*Conn).processMsg(0xc000702600, 0xc0005a000d, 0x5, 0x7ff3)
	/home/travis/gopath/src/github.com/nats-io/nats.go/nats.go:2498 +0xc7a
github.com/nats-io/nats%2ego.(*Conn).parse(0xc000702600, 0xc0005a0000, 0x398, 0x8000, 0x398, 0x0)
	/home/travis/gopath/src/github.com/nats-io/nats.go/parser.go:192 +0x26e5
github.com/nats-io/nats%2ego.(*Conn).readLoop(0xc000702600)
	/home/travis/gopath/src/github.com/nats-io/nats.go/nats.go:2323 +0x279
created by github.com/nats-io/nats%2ego.(*Conn).processConnectInit
	/home/travis/gopath/src/github.com/nats-io/nats.go/nats.go:1639 +0x32b
FAIL	github.com/nats-io/nats.go/test	42.511s

You need to check for sub.closed line 2466, after lock the sub. If closed, return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, for this test: TestUnsubscribeChanOnSubscriber, I also see the slow consumer error reported in some cases, so may have to set dummy error handler there too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will take a deeper look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fixed.


if sub == nil {
nc.subsMu.RUnlock()
return
}

Expand Down Expand Up @@ -2443,6 +2464,12 @@ func (nc *Conn) processMsg(data []byte) {

sub.mu.Lock()

// Check if closed.
if sub.closed {
sub.mu.Unlock()
return
}

// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
sub.pMsgs++
Expand Down Expand Up @@ -2474,7 +2501,9 @@ func (nc *Conn) processMsg(data []byte) {
if sub.pHead == nil {
sub.pHead = m
sub.pTail = m
sub.pCond.Signal()
if sub.pCond != nil {
sub.pCond.Signal()
}
} else {
sub.pTail.next = m
sub.pTail = m
Expand All @@ -2485,7 +2514,6 @@ func (nc *Conn) processMsg(data []byte) {
sub.sc = false

sub.mu.Unlock()
nc.subsMu.RUnlock()
return

slowConsumer:
Expand All @@ -2498,7 +2526,6 @@ slowConsumer:
sub.pBytes -= len(m.Data)
}
sub.mu.Unlock()
nc.subsMu.RUnlock()
if sc {
// Now we need connection's lock and we may end-up in the situation
// that we were trying to avoid, except that in this case, the client
Expand Down Expand Up @@ -3793,8 +3820,10 @@ func (s *Subscription) ClearMaxPending() error {

// Pending Limits
const (
DefaultSubPendingMsgsLimit = 65536
DefaultSubPendingBytesLimit = 65536 * 1024
// DefaultSubPendingMsgsLimit will be 512k msgs.
DefaultSubPendingMsgsLimit = 512 * 1024
// DefaultSubPendingBytesLimit is 64MB
DefaultSubPendingBytesLimit = 64 * 1024 * 1024
)

// PendingLimits returns the current limits for this subscription.
Expand Down
1 change: 1 addition & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
ReconnectJitter(0, 0),
MaxReconnects(-1),
DontRandomize(),
ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}),
DisconnectErrHandler(func(_ *Conn, e error) {
dch <- true
}),
Expand Down
79 changes: 78 additions & 1 deletion norace_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The NATS Authors
// Copyright 2019-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -16,8 +16,11 @@
package nats

import (
"os"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
)

func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
Expand Down Expand Up @@ -97,3 +100,77 @@ func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
}
nc.Close()
}

func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

if err := s.EnableJetStream(nil); err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer os.RemoveAll(s.JetStreamConfig().StoreDir)

str, err := s.GlobalAccount().AddStream(&server.StreamConfig{
Name: "PENDING_TEST",
Subjects: []string{"js.p"},
Storage: server.MemoryStorage,
})
if err != nil {
t.Fatalf("stream create failed: %v", err)
}

nc, _ := Connect(s.ClientURL())
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

// Queue up 1M small messages.
toSend := uint64(1_000_000)
for i := uint64(0); i < toSend; i++ {
nc.Publish("js.p", []byte("ok"))
}
nc.Flush()

if nm := str.State().Msgs; nm != toSend {
t.Fatalf("Expected to have stored all %d msgs, got only %d", toSend, nm)
}

var received uint64
done := make(chan bool, 1)

nc.Subscribe("d", func(m *Msg) {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// TODO(dlc) - If I put an ack in here this will fail again
// so need to look harder at this issues.
// m.Respond(nil) // Ack

received++
if received >= toSend {
done <- true
}
meta, err := m.JetStreamMetaData()
if err != nil {
t.Fatalf("could not get message metadata: %s", err)
}
if meta.StreamSeq != int(received) {
t.Errorf("Missed a sequence, was expecting %d but got %d, last error: '%v'", received, meta.StreamSeq, nc.LastError())
nc.Close()
}
})

o, err := str.AddConsumer(&server.ConsumerConfig{
Durable: "d",
DeliverSubject: "d",
AckPolicy: server.AckNone,
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
defer o.Stop()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Failed to get all %d messages, only got %d", toSend, received)
case <-done:
}
}
5 changes: 4 additions & 1 deletion test/auth_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -130,6 +130,9 @@ func TestAuthFailAllowReconnect(t *testing.T) {
}
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

// Stop the server
ts.Shutdown()

Expand Down
5 changes: 4 additions & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -213,6 +213,9 @@ func TestPublishDoesNotFailOnSlowConsumer(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unable to create subscription: %v", err)
Expand Down
5 changes: 4 additions & 1 deletion test/drain_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2019 The NATS Authors
// Copyright 2018-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -413,6 +413,9 @@ func TestDrainConnLastError(t *testing.T) {
}
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

wg := sync.WaitGroup{}
wg.Add(1)
if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {
Expand Down
29 changes: 27 additions & 2 deletions test/sub_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 The NATS Authors
// Copyright 2013-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -417,6 +417,9 @@ func TestSlowSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

sub, _ := nc.SubscribeSync("foo")
sub.SetPendingLimits(100, 1024)

Expand Down Expand Up @@ -444,6 +447,9 @@ func TestSlowChanSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

ch := make(chan *nats.Msg, 64)
sub, _ := nc.ChanSubscribe("foo", ch)
sub.SetPendingLimits(100, 1024)
Expand All @@ -467,6 +473,9 @@ func TestSlowAsyncSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

bch := make(chan bool)

sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
Expand Down Expand Up @@ -895,7 +904,8 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
// There was a defect that prevented to receive more than
// the default pending message limit. Trying to send more
// than this limit.
total := nats.DefaultSubPendingMsgsLimit + 100
pending := 1000
total := pending + 100

for typeSubs := 0; typeSubs < 3; typeSubs++ {

Expand All @@ -908,10 +918,19 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
switch typeSubs {
case 0:
sub, err = nc.ChanSubscribe("foo", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought limits did not apply to Channel subscribe, because the limit is really whatever the user sets for the provided channel?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test expects an error since its checking for err == nil but the t.Fatalf descriptions are wrong, will fix.

t.Fatalf("Expected an error setting pending limits")
}
case 1:
sub, err = nc.ChanQueueSubscribe("foo", "bar", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Expected an error setting pending limits")
}
case 2:
sub, err = nc.QueueSubscribeSyncWithChan("foo", "bar", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Expected an error setting pending limits")
}
}
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
Expand Down Expand Up @@ -1020,6 +1039,9 @@ func TestUnsubscribeChanOnSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

// Create our own channel.
ch := make(chan *nats.Msg, 8)
sub, _ := nc.ChanSubscribe("foo", ch)
Expand Down Expand Up @@ -1285,6 +1307,9 @@ func TestSetPendingLimits(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

payload := []byte("hello")
payloadLen := len(payload)
toSend := 100
Expand Down