Skip to content

Commit

Permalink
Merge pull request #468 from nats-io/fix_467
Browse files Browse the repository at this point in the history
[FIXED] Failed subscription appears in client's list of subscriptions
  • Loading branch information
kozlovic committed Feb 15, 2018
2 parents fafc961 + 884d225 commit 84a8e03
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
6 changes: 5 additions & 1 deletion server/server.go
Expand Up @@ -3702,7 +3702,11 @@ func (s *StanServer) addSubscription(ss *subStore, sub *subState) error {
return fmt.Errorf("can't find clientID: %v", sub.ClientID)
}
// Store this subscription in subStore
return ss.Store(sub)
if err := ss.Store(sub); err != nil {
s.clients.removeSub(sub.ClientID, sub)
return err
}
return nil
}

// updateDurable adds back `sub` to the client and updates the store.
Expand Down
38 changes: 29 additions & 9 deletions server/server_limits_test.go
Expand Up @@ -73,19 +73,39 @@ func TestTooManySubs(t *testing.T) {
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err != nil {
t.Fatalf("Unexpected error on subscribe: %v", err)
}
check := func() {
cs := channelsGet(t, s.channels, "foo")
ss := cs.ss
ss.RLock()
if ss.psubs == nil || len(ss.psubs) != 1 {
stackFatalf(t, "Expected only one subscription, got %v", len(ss.psubs))
}
ss.RUnlock()
subs := s.clients.getSubs(clientName)
if len(subs) != 1 {
stackFatalf(t, "Expected 1 subscription for client %q, got %+v", clientName, subs)
}
}
// We should get an error here
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}); err == nil {
t.Fatal("Expected error on subscribe, go none")
}
cs := channelsGet(t, s.channels, "foo")
ss := cs.ss
func() {
ss.RLock()
defer ss.RUnlock()
if ss.psubs == nil || len(ss.psubs) != 1 {
t.Fatalf("Expected only one subscription, got %v", len(ss.psubs))
}
}()
check()
// Try with a durable
if _, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur")); err == nil {
t.Fatal("Expected error on subscribe, go none")
}
check()
// And a queue sub
if _, err := sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}); err == nil {
t.Fatal("Expected error on subscribe, go none")
}
check()
// Finally a durable queue sub
if _, err := sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur")); err == nil {
t.Fatal("Expected error on subscribe, go none")
}
check()
}

func TestMaxMsgs(t *testing.T) {
Expand Down

0 comments on commit 84a8e03

Please sign in to comment.