Skip to content

Commit

Permalink
Merge 246ccf9 into 0482788
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Nov 16, 2020
2 parents 0482788 + 246ccf9 commit 84e9d25
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 19 deletions.
6 changes: 5 additions & 1 deletion enc_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -45,6 +45,10 @@ func TestPublishErrorAfterSubscribeDecodeError(t *testing.T) {
opts := options
nc, _ := opts.Connect()
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

c, _ := NewEncodedConn(nc, JSON_ENCODER)

//Test message type
Expand Down
53 changes: 41 additions & 12 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
DefaultJetStreamTimeout = 2 * time.Second
DefaultPingInterval = 2 * time.Minute
DefaultMaxPingOut = 2
DefaultMaxChanLen = 8192 // 8k
DefaultMaxChanLen = 8 * 1024 // 8k
DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
RequestChanLen = 8
DefaultDrainTimeout = 30 * time.Second
Expand Down Expand Up @@ -1164,6 +1164,11 @@ func (o Options) Connect() (*Conn, error) {
nc.ach = &asyncCallbacksHandler{}
nc.ach.cond = sync.NewCond(&nc.ach.mu)

// Set a default error handler that will print to stderr.
if nc.Opts.AsyncErrorCB == nil {
nc.Opts.AsyncErrorCB = defaultErrHandler
}

if err := nc.connect(); err != nil {
return nil, err
}
Expand All @@ -1174,6 +1179,22 @@ func (o Options) Connect() (*Conn, error) {
return nc, nil
}

func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
var cid uint64
if nc != nil {
nc.mu.RLock()
cid = nc.info.CID
nc.mu.RUnlock()
}
var errStr string
if sub != nil {
errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
} else {
errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
}
os.Stderr.WriteString(errStr)
}

const (
_CRLF_ = "\r\n"
_EMPTY_ = ""
Expand Down Expand Up @@ -2393,18 +2414,18 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
// or the pending queue is over the pending limits, the connection is
// considered a slow consumer.
func (nc *Conn) processMsg(data []byte) {
// Don't lock the connection to avoid server cutting us off if the
// flusher is holding the connection lock, trying to send to the server
// that is itself trying to send data to us.
nc.subsMu.RLock()

// Stats
atomic.AddUint64(&nc.InMsgs, 1)
atomic.AddUint64(&nc.InBytes, uint64(len(data)))

// Don't lock the connection to avoid server cutting us off if the
// flusher is holding the connection lock, trying to send to the server
// that is itself trying to send data to us.
nc.subsMu.RLock()
sub := nc.subs[nc.ps.ma.sid]
nc.subsMu.RUnlock()

if sub == nil {
nc.subsMu.RUnlock()
return
}

Expand Down Expand Up @@ -2443,6 +2464,12 @@ func (nc *Conn) processMsg(data []byte) {

sub.mu.Lock()

// Check if closed.
if sub.closed {
sub.mu.Unlock()
return
}

// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
sub.pMsgs++
Expand Down Expand Up @@ -2474,7 +2501,9 @@ func (nc *Conn) processMsg(data []byte) {
if sub.pHead == nil {
sub.pHead = m
sub.pTail = m
sub.pCond.Signal()
if sub.pCond != nil {
sub.pCond.Signal()
}
} else {
sub.pTail.next = m
sub.pTail = m
Expand All @@ -2485,7 +2514,6 @@ func (nc *Conn) processMsg(data []byte) {
sub.sc = false

sub.mu.Unlock()
nc.subsMu.RUnlock()
return

slowConsumer:
Expand All @@ -2498,7 +2526,6 @@ slowConsumer:
sub.pBytes -= len(m.Data)
}
sub.mu.Unlock()
nc.subsMu.RUnlock()
if sc {
// Now we need connection's lock and we may end-up in the situation
// that we were trying to avoid, except that in this case, the client
Expand Down Expand Up @@ -3793,8 +3820,10 @@ func (s *Subscription) ClearMaxPending() error {

// Pending Limits
const (
DefaultSubPendingMsgsLimit = 65536
DefaultSubPendingBytesLimit = 65536 * 1024
// DefaultSubPendingMsgsLimit will be 512k msgs.
DefaultSubPendingMsgsLimit = 512 * 1024
// DefaultSubPendingBytesLimit is 64MB
DefaultSubPendingBytesLimit = 64 * 1024 * 1024
)

// PendingLimits returns the current limits for this subscription.
Expand Down
1 change: 1 addition & 0 deletions nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,7 @@ func TestAuthErrorOnReconnect(t *testing.T) {
ReconnectJitter(0, 0),
MaxReconnects(-1),
DontRandomize(),
ErrorHandler(func(_ *Conn, _ *Subscription, _ error) {}),
DisconnectErrHandler(func(_ *Conn, e error) {
dch <- true
}),
Expand Down
79 changes: 78 additions & 1 deletion norace_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The NATS Authors
// Copyright 2019-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -16,8 +16,11 @@
package nats

import (
"os"
"testing"
"time"

"github.com/nats-io/nats-server/v2/server"
)

func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
Expand Down Expand Up @@ -97,3 +100,77 @@ func TestNoRaceParseStateReconnectFunctionality(t *testing.T) {
}
nc.Close()
}

func TestNoRaceJetStreamConsumerSlowConsumer(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

if err := s.EnableJetStream(nil); err != nil {
t.Fatalf("Expected no error, got %v", err)
}
defer os.RemoveAll(s.JetStreamConfig().StoreDir)

str, err := s.GlobalAccount().AddStream(&server.StreamConfig{
Name: "PENDING_TEST",
Subjects: []string{"js.p"},
Storage: server.MemoryStorage,
})
if err != nil {
t.Fatalf("stream create failed: %v", err)
}

nc, _ := Connect(s.ClientURL())
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *Conn, _ *Subscription, _ error) {})

// Queue up 1M small messages.
toSend := uint64(1_000_000)
for i := uint64(0); i < toSend; i++ {
nc.Publish("js.p", []byte("ok"))
}
nc.Flush()

if nm := str.State().Msgs; nm != toSend {
t.Fatalf("Expected to have stored all %d msgs, got only %d", toSend, nm)
}

var received uint64
done := make(chan bool, 1)

nc.Subscribe("d", func(m *Msg) {
// TODO(dlc) - If I put an ack in here this will fail again
// so need to look harder at this issues.
// m.Respond(nil) // Ack

received++
if received >= toSend {
done <- true
}
meta, err := m.JetStreamMetaData()
if err != nil {
t.Fatalf("could not get message metadata: %s", err)
}
if meta.StreamSeq != int(received) {
t.Errorf("Missed a sequence, was expecting %d but got %d, last error: '%v'", received, meta.StreamSeq, nc.LastError())
nc.Close()
}
})

o, err := str.AddConsumer(&server.ConsumerConfig{
Durable: "d",
DeliverSubject: "d",
AckPolicy: server.AckNone,
})
if err != nil {
t.Fatalf("Error creating consumer: %v", err)
}
defer o.Stop()

select {
case <-time.After(5 * time.Second):
t.Fatalf("Failed to get all %d messages, only got %d", toSend, received)
case <-done:
}
}
5 changes: 4 additions & 1 deletion test/auth_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -130,6 +130,9 @@ func TestAuthFailAllowReconnect(t *testing.T) {
}
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

// Stop the server
ts.Shutdown()

Expand Down
5 changes: 4 additions & 1 deletion test/basic_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -213,6 +213,9 @@ func TestPublishDoesNotFailOnSlowConsumer(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unable to create subscription: %v", err)
Expand Down
5 changes: 4 additions & 1 deletion test/drain_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2019 The NATS Authors
// Copyright 2018-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -413,6 +413,9 @@ func TestDrainConnLastError(t *testing.T) {
}
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

wg := sync.WaitGroup{}
wg.Add(1)
if _, err := nc.Subscribe("foo", func(_ *nats.Msg) {
Expand Down
29 changes: 27 additions & 2 deletions test/sub_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2019 The NATS Authors
// Copyright 2013-2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -417,6 +417,9 @@ func TestSlowSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

sub, _ := nc.SubscribeSync("foo")
sub.SetPendingLimits(100, 1024)

Expand Down Expand Up @@ -444,6 +447,9 @@ func TestSlowChanSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

ch := make(chan *nats.Msg, 64)
sub, _ := nc.ChanSubscribe("foo", ch)
sub.SetPendingLimits(100, 1024)
Expand All @@ -467,6 +473,9 @@ func TestSlowAsyncSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

bch := make(chan bool)

sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
Expand Down Expand Up @@ -895,7 +904,8 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
// There was a defect that prevented to receive more than
// the default pending message limit. Trying to send more
// than this limit.
total := nats.DefaultSubPendingMsgsLimit + 100
pending := 1000
total := pending + 100

for typeSubs := 0; typeSubs < 3; typeSubs++ {

Expand All @@ -908,10 +918,19 @@ func TestChanSubscriberPendingLimits(t *testing.T) {
switch typeSubs {
case 0:
sub, err = nc.ChanSubscribe("foo", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Expected an error setting pending limits")
}
case 1:
sub, err = nc.ChanQueueSubscribe("foo", "bar", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Expected an error setting pending limits")
}
case 2:
sub, err = nc.QueueSubscribeSyncWithChan("foo", "bar", ch)
if err := sub.SetPendingLimits(pending, -1); err == nil {
t.Fatalf("Expected an error setting pending limits")
}
}
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
Expand Down Expand Up @@ -1020,6 +1039,9 @@ func TestUnsubscribeChanOnSubscriber(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

// Create our own channel.
ch := make(chan *nats.Msg, 8)
sub, _ := nc.ChanSubscribe("foo", ch)
Expand Down Expand Up @@ -1285,6 +1307,9 @@ func TestSetPendingLimits(t *testing.T) {
nc := NewDefaultConnection(t)
defer nc.Close()

// Override default handler for test.
nc.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, _ error) {})

payload := []byte("hello")
payloadLen := len(payload)
toSend := 100
Expand Down

0 comments on commit 84e9d25

Please sign in to comment.