Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Number of subscriptions in /serverz endpoint is incorrect #820

Merged
merged 1 commit into from May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 110 additions & 0 deletions server/monitor_test.go
Expand Up @@ -1216,3 +1216,113 @@ func TestMonitorClusterRole(t *testing.T) {
})
}
}

func TestMonitorNumSubs(t *testing.T) {
resetPreviousHTTPConnections()
s := runMonitorServer(t, GetDefaultOptions())
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

checkNumSubs := func(t *testing.T, expected int) {
waitFor(t, 2*time.Second, 15*time.Millisecond, func() error {
resp, body := getBody(t, ServerPath, expectedJSON)
resp.Body.Close()
sz := Serverz{}
if err := json.Unmarshal(body, &sz); err != nil {
t.Fatalf("Got an error unmarshalling the body: %v", err)
}
if sz.Subscriptions != expected {
return fmt.Errorf("Expected %v subscriptions, got %v", expected, sz.Subscriptions)
}
return nil
})
}

cb := func(_ *stan.Msg) {}

dur, err := sc.Subscribe("foo", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 1)

qsub1, err := sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 2)

qsub2, err := sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 3)

// Closing one of the durable queue member will get the count down.
qsub1.Close()
checkNumSubs(t, 2)

// But the last one should keep the count since the durable interest stays.
qsub2.Close()
checkNumSubs(t, 2)

// Same for closing the durable
dur.Close()
checkNumSubs(t, 2)

// Create a non-durable, count should increase, then close, count should go down.
sub, err := sc.Subscribe("foo", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 3)
sub.Close()
checkNumSubs(t, 2)

// Try with non durable queue group
qs1, err := sc.QueueSubscribe("foo", "ndqueue", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 3)

qs2, err := sc.QueueSubscribe("foo", "ndqueue", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 4)

qs1.Close()
checkNumSubs(t, 3)
qs2.Close()
checkNumSubs(t, 2)

// Now resume the durable, count should remain same.
dur, err = sc.Subscribe("foo", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 2)

// Restart a queue member, same story
qsub1, err = sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 2)

// Now a second and then count should go up.
qsub2, err = sc.QueueSubscribe("foo", "queue", cb, stan.DurableName("dur"))
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
checkNumSubs(t, 3)

// Unsubscribe them all and count should go to 0.
qsub2.Unsubscribe()
qsub1.Unsubscribe()
dur.Unsubscribe()
checkNumSubs(t, 0)
}
24 changes: 18 additions & 6 deletions server/server.go
Expand Up @@ -982,6 +982,10 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {

var qsubs map[uint64]*subState

// Assume we are removing a subscription for good.
// This will be set to false in some cases like durables, etc..
decrementNumSubs := true

// Delete ourselves from the list
if qs != nil {
storageUpdate := false
Expand Down Expand Up @@ -1017,6 +1021,8 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
// Will need to update the LastSent and clear the ClientID
// with a storage update.
storageUpdate = true
// Don't decrement count
decrementNumSubs = false
}
} else {
if sub.stalled && qs.stalledSubCount > 0 {
Expand Down Expand Up @@ -1115,10 +1121,17 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
// After storage, clear the ClientID.
sub.ClientID = ""
sub.Unlock()
decrementNumSubs = false
}
}
ss.Unlock()

if decrementNumSubs {
ss.stan.monMu.Lock()
ss.stan.numSubs--
ss.stan.monMu.Unlock()
}

if !ss.stan.isClustered || ss.stan.isLeader() {
// Calling this will sort current pending messages and ensure
// that the ackTimer is properly set. It does not necessarily
Expand Down Expand Up @@ -4254,9 +4267,6 @@ func (s *StanServer) unsubscribeSub(c *channel, clientID, action string, sub *su
sub.RUnlock()
err = ss.Flush()
}
s.monMu.Lock()
s.numSubs--
s.monMu.Unlock()
return err
}

Expand Down Expand Up @@ -4645,9 +4655,11 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
traceSubState(s.log, sub, &traceCtx)
}

s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
if subIsNew {
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
}

return sub, nil
}
Expand Down