Skip to content

Commit

Permalink
Merge pull request #185 from nats-io/fix_pending_limits
Browse files Browse the repository at this point in the history
Fix SetPendingLimits()
  • Loading branch information
derekcollison committed May 13, 2016
2 parents 2654b74 + 5729e49 commit fc85f44
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 1 deletion.
10 changes: 9 additions & 1 deletion nats.go
Expand Up @@ -68,6 +68,7 @@ var (
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
)

Expand Down Expand Up @@ -1409,7 +1410,8 @@ func (nc *Conn) processMsg(data []byte) {
}

// Check for a Slow Consumer
if sub.pMsgs > sub.pMsgsLimit || sub.pBytes > sub.pBytesLimit {
if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
(sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
goto slowConsumer
}

Expand Down Expand Up @@ -2084,6 +2086,8 @@ const (
)

// PendingLimits returns the current limits for this subscription.
// If no error is returned, a negative value indicates that the
// given metric is not limited.
func (s *Subscription) PendingLimits() (int, int, error) {
if s == nil {
return -1, -1, ErrBadSubscription
Expand All @@ -2100,6 +2104,7 @@ func (s *Subscription) PendingLimits() (int, int, error) {
}

// SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
// Zero is not allowed. Any negative value means that the given metric is not limited.
func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
if s == nil {
return ErrBadSubscription
Expand All @@ -2112,6 +2117,9 @@ func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
if s.typ == ChanSubscription {
return ErrTypeSubscription
}
if msgLimit == 0 || bytesLimit == 0 {
return ErrInvalidArg
}
s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
return nil
}
Expand Down
168 changes: 168 additions & 0 deletions test/sub_test.go
@@ -1,6 +1,7 @@
package test

import (
"fmt"
"runtime"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1163,6 +1164,173 @@ func TestSyncSubscriptionPending(t *testing.T) {
}
}

func TestSetPendingLimits(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc := NewDefaultConnection(t)
defer nc.Close()

payload := []byte("hello")
payloadLen := len(payload)
toSend := 100

var sub *nats.Subscription

// Check for invalid values
invalid := func() error {
if err := sub.SetPendingLimits(0, 1); err == nil {
return fmt.Errorf("Setting limit with 0 should fail")
}
if err := sub.SetPendingLimits(1, 0); err == nil {
return fmt.Errorf("Setting limit with 0 should fail")
}
return nil
}
// function to send messages
send := func(subject string, count int) {
for i := 0; i < count; i++ {
if err := nc.Publish(subject, payload); err != nil {
t.Fatalf("Unexpected error on publish: %v", err)
}
}
nc.Flush()
}

// Check pending vs expected values
var limitCount, limitBytes int
var expectedCount, expectedBytes int
checkPending := func() error {
lc, lb, err := sub.PendingLimits()
if err != nil {
return err
}
if lc != limitCount || lb != limitBytes {
return fmt.Errorf("Unexpected limits, expected %v msgs %v bytes, got %v msgs %v bytes",
limitCount, limitBytes, lc, lb)
}
msgs, bytes, err := sub.Pending()
if err != nil {
return fmt.Errorf("Unexpected error getting pending counts: %v", err)
}
if (msgs != expectedCount && msgs != expectedCount-1) ||
(bytes != expectedBytes && bytes != expectedBytes-payloadLen) {
return fmt.Errorf("Unexpected counts, expected %v msgs %v bytes, got %v msgs %v bytes",
expectedCount, expectedBytes, msgs, bytes)
}
return nil
}

recv := make(chan bool)
block := make(chan bool)
cb := func(m *nats.Msg) {
recv <- true
<-block
m.Sub.Unsubscribe()
}
subj := "foo"
sub, err := nc.Subscribe(subj, cb)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
if err := invalid(); err != nil {
t.Fatalf("%v", err)
}
// Check we apply limit only for size
limitCount = -1
limitBytes = (toSend / 2) * payloadLen
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
// Wait for message to be received
if err := Wait(recv); err != nil {
t.Fatal("Did not get our message")
}
expectedBytes = limitBytes
expectedCount = limitBytes / payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
// Release callback
block <- true

subj = "bar"
sub, err = nc.Subscribe(subj, cb)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
// Check we apply limit only for count
limitCount = toSend / 4
limitBytes = -1
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
// Wait for message to be received
if err := Wait(recv); err != nil {
t.Fatal("Did not get our message")
}
expectedCount = limitCount
expectedBytes = limitCount * payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
// Release callback
block <- true

subj = "baz"
sub, err = nc.SubscribeSync(subj)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
if err := invalid(); err != nil {
t.Fatalf("%v", err)
}
// Check we apply limit only for size
limitCount = -1
limitBytes = (toSend / 2) * payloadLen
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
expectedBytes = limitBytes
expectedCount = limitBytes / payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
sub.Unsubscribe()
nc.Flush()

subj = "boz"
sub, err = nc.SubscribeSync(subj)
if err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
defer sub.Unsubscribe()
// Check we apply limit only for count
limitCount = toSend / 4
limitBytes = -1
if err := sub.SetPendingLimits(limitCount, limitBytes); err != nil {
t.Fatalf("Unexpected error setting limits: %v", err)
}
// Send messages
send(subj, toSend)
expectedCount = limitCount
expectedBytes = limitCount * payloadLen
if err := checkPending(); err != nil {
t.Fatalf("%v", err)
}
sub.Unsubscribe()
nc.Flush()
}

func TestSubscriptionTypes(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
Expand Down

0 comments on commit fc85f44

Please sign in to comment.