Skip to content

Commit

Permalink
[FIXED] Incorrect number of subscriptions in /streaming/serverz
Browse files Browse the repository at this point in the history
Replaces the use of a variable that was supposed to be updated
on subscription create/remove. It was started to be a bit brittle.

Provide a function that computes the number of subscriptions
(including offline durables/queue durables).
This is a bit more costly than using the variable but will be
less prone to issues.

Resolves #819

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed May 6, 2019
1 parent 5f943bc commit ec54483
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 35 deletions.
69 changes: 69 additions & 0 deletions server/clustering_test.go
Expand Up @@ -5322,3 +5322,72 @@ func TestClusteringGapsAfterSnapshotAndNoFlush(t *testing.T) {
}
}
}

func TestClusteringNumSubs(t *testing.T) {
cleanupDatastore(t)
defer cleanupDatastore(t)
cleanupRaftLog(t)
defer cleanupRaftLog(t)

// For this test, use a central NATS server.
ns := natsdTest.RunDefaultServer()
defer ns.Shutdown()

// Configure first server
s1sOpts := getTestDefaultOptsForClustering("a", true)
s1sOpts.Clustering.TrailingLogs = 3
s1 := runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

// Configure second server.
s2sOpts := getTestDefaultOptsForClustering("b", false)
s2 := runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

getLeader(t, 10*time.Second, s1, s2)

sc := NewDefaultConnection(t)
defer sc.Close()

var subs []stan.Subscription
for i := 0; i < 10; i++ {
sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {})
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
subs = append(subs, sub)
}

for i := 0; i < 8; i++ {
if err := subs[i].Close(); err != nil {
t.Fatalf("Error on sub close: %v", err)
}
}
if err := s1.raft.Snapshot().Error(); err != nil {
t.Fatalf("Error during snapshot: %v", err)
}
for i := 8; i < 10; i++ {
if err := subs[i].Close(); err != nil {
t.Fatalf("Error on sub close: %v", err)
}
}

checkNumSubs(t, s1, 0)
checkNumSubs(t, s2, 0)

s1.Shutdown()
s2.Shutdown()

s1 = runServerWithOpts(t, s1sOpts, nil)
defer s1.Shutdown()

s2 = runServerWithOpts(t, s2sOpts, nil)
defer s2.Shutdown()

leader := getLeader(t, 10*time.Second, s1, s2)

waitForNumClients(t, leader, 1)
checkNumSubs(t, leader, 0)
sc.Close()
leader.Shutdown()
}
5 changes: 2 additions & 3 deletions server/monitor.go
Expand Up @@ -192,9 +192,8 @@ func (s *StanServer) handleServerz(w http.ResponseWriter, r *http.Request) {
role = s.raft.State().String()
}
s.mu.RUnlock()
s.monMu.RLock()
numSubs := s.numSubs
s.monMu.RUnlock()

numSubs := s.numSubs()
now := time.Now()

fds := 0
Expand Down
58 changes: 29 additions & 29 deletions server/server.go
Expand Up @@ -605,10 +605,6 @@ type StanServer struct {
// Store
store stores.Store

// Monitoring
monMu sync.RWMutex
numSubs int

// IO Channel
ioChannel chan *ioPendingMsg
ioChannelQuit chan struct{}
Expand Down Expand Up @@ -742,6 +738,35 @@ type subSentAndAck struct {
applying bool
}

// Returns the total number of subscriptions (including offline (queue) durables).
// This is used essentially for the monitoring endpoint /streaming/serverz.
func (s *StanServer) numSubs() int {
total := 0
channels := s.channels.getAll()
for _, c := range channels {
c.ss.RLock()
total += len(c.ss.psubs)
// Need to add offline durables
for _, sub := range c.ss.durables {
if sub.ClientID == "" {
total++
}
}
for _, qsub := range c.ss.qsubs {
qsub.RLock()
total += len(qsub.subs)
// If this is a durable queue subscription and all members
// are offline, qsub.shadow will be not nil. Report this one.
if qsub.shadow != nil {
total++
}
qsub.RUnlock()
}
c.ss.RUnlock()
}
return total
}

// Looks up, or create a new channel if it does not exist
func (s *StanServer) lookupOrCreateChannel(name string) (*channel, error) {
cs := s.channels
Expand Down Expand Up @@ -984,10 +1009,6 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {

var qsubs map[uint64]*subState

// Assume we are removing a subscription for good.
// This will be set to false in some cases like durables, etc..
decrementNumSubs := true

// Delete ourselves from the list
if qs != nil {
storageUpdate := false
Expand Down Expand Up @@ -1023,8 +1044,6 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
// Will need to update the LastSent and clear the ClientID
// with a storage update.
storageUpdate = true
// Don't decrement count
decrementNumSubs = false
}
} else {
if sub.stalled && qs.stalledSubCount > 0 {
Expand Down Expand Up @@ -1123,17 +1142,10 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
// After storage, clear the ClientID.
sub.ClientID = ""
sub.Unlock()
decrementNumSubs = false
}
}
ss.Unlock()

if decrementNumSubs {
ss.stan.monMu.Lock()
ss.stan.numSubs--
ss.stan.monMu.Unlock()
}

if !ss.stan.isClustered || ss.stan.isLeader() {
// Calling this will sort current pending messages and ensure
// that the ackTimer is properly set. It does not necessarily
Expand Down Expand Up @@ -2294,9 +2306,6 @@ func (s *StanServer) recoverOneSub(c *channel, recSub *spb.SubState, pendingAcks
// durable would not be able to be restarted.
sub.savedClientID = sub.ClientID
sub.ClientID = ""
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
}

// Create a subState
Expand Down Expand Up @@ -2366,9 +2375,6 @@ func (s *StanServer) recoverOneSub(c *channel, recSub *spb.SubState, pendingAcks
// Add to the array, unless this is the shadow durable queue sub that
// was left in the store in order to maintain the group's state.
if !sub.isShadowQueueDurable() {
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
return sub
}
}
Expand Down Expand Up @@ -4647,12 +4653,6 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
s.nca.Flush()
}
}
// Do this before so that if we have to remove, count will be ok.
if subIsNew {
s.monMu.Lock()
s.numSubs++
s.monMu.Unlock()
}
if err != nil {
// Try to undo what has been done.
s.clients.removeSub(sr.ClientID, sub)
Expand Down
4 changes: 1 addition & 3 deletions server/server_sub_test.go
Expand Up @@ -1148,9 +1148,7 @@ func TestSubAckInboxFromOlderStore(t *testing.T) {
func checkNumSubs(t *testing.T, s *StanServer, expected int) {
t.Helper()
waitFor(t, 2*time.Second, 15*time.Millisecond, func() error {
s.monMu.Lock()
count := s.numSubs
s.monMu.Unlock()
count := s.numSubs()
if count != expected {
return fmt.Errorf("Expected %v subscriptions, got %v", expected, count)
}
Expand Down

0 comments on commit ec54483

Please sign in to comment.