diff --git a/server/monitor_test.go b/server/monitor_test.go index 16520d96..eebb5648 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -1216,3 +1216,113 @@ func TestMonitorClusterRole(t *testing.T) { }) } } + +func TestMonitorNumSubs(t *testing.T) { + resetPreviousHTTPConnections() + s := runMonitorServer(t, GetDefaultOptions()) + defer s.Shutdown() + + sc := NewDefaultConnection(t) + defer sc.Close() + + checkNumSubs := func(t *testing.T, expected int) { + waitFor(t, 2*time.Second, 15*time.Millisecond, func() error { + resp, body := getBody(t, ServerPath, expectedJSON) + resp.Body.Close() + sz := Serverz{} + if err := json.Unmarshal(body, &sz); err != nil { + t.Fatalf("Got an error unmarshalling the body: %v", err) + } + if sz.Subscriptions != expected { + return fmt.Errorf("Expected %v subscriptions, got %v", expected, sz.Subscriptions) + } + return nil + }) + } + + cb := func(_ *stan.Msg) {} + + dur, err := sc.Subscribe("foo", cb, stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 1) + + qsub1, err := sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 2) + + qsub2, err := sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 3) + + // Closing one of the durable queue member will get the count down. + qsub1.Close() + checkNumSubs(t, 2) + + // But the last one should keep the count since the durable interest stays. + qsub2.Close() + checkNumSubs(t, 2) + + // Same for closing the durable + dur.Close() + checkNumSubs(t, 2) + + // Create a non-durable, count should increase, then close, count should go down. + sub, err := sc.Subscribe("foo", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 3) + sub.Close() + checkNumSubs(t, 2) + + // Try with non durable queue group + qs1, err := sc.QueueSubscribe("foo", "ndqueue", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 3) + + qs2, err := sc.QueueSubscribe("foo", "ndqueue", cb) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 4) + + qs1.Close() + checkNumSubs(t, 3) + qs2.Close() + checkNumSubs(t, 2) + + // Now resume the durable, count should remain same. + dur, err = sc.Subscribe("foo", cb, stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 2) + + // Restart a queue member, same story + qsub1, err = sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 2) + + // Now a second and then count should go up. + qsub2, err = sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur")) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + checkNumSubs(t, 3) + + // Unsubscribe them all and count should go to 0. + qsub2.Unsubscribe() + qsub1.Unsubscribe() + dur.Unsubscribe() + checkNumSubs(t, 0) +} diff --git a/server/server.go b/server/server.go index 792cfa88..28ab86ee 100644 --- a/server/server.go +++ b/server/server.go @@ -982,6 +982,10 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { var qsubs map[uint64]*subState + // Assume we are removing a subscription for good. + // This will be set to false in some cases like durables, etc.. + decrementNumSubs := true + // Delete ourselves from the list if qs != nil { storageUpdate := false @@ -1017,6 +1021,8 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { // Will need to update the LastSent and clear the ClientID // with a storage update. storageUpdate = true + // Don't decrement count + decrementNumSubs = false } } else { if sub.stalled && qs.stalledSubCount > 0 { @@ -1115,10 +1121,17 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) { // After storage, clear the ClientID. sub.ClientID = "" sub.Unlock() + decrementNumSubs = false } } ss.Unlock() + if decrementNumSubs { + ss.stan.monMu.Lock() + ss.stan.numSubs-- + ss.stan.monMu.Unlock() + } + if !ss.stan.isClustered || ss.stan.isLeader() { // Calling this will sort current pending messages and ensure // that the ackTimer is properly set. It does not necessarily @@ -4254,9 +4267,6 @@ func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *su sub.RUnlock() err = ss.Flush() } - s.monMu.Lock() - s.numSubs-- - s.monMu.Unlock() return err } @@ -4645,9 +4655,11 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox traceSubState(s.log, sub, &traceCtx) } - s.monMu.Lock() - s.numSubs++ - s.monMu.Unlock() + if subIsNew { + s.monMu.Lock() + s.numSubs++ + s.monMu.Unlock() + } return sub, nil }