Skip to content

Commit

Permalink
[ADDED] Subscriptions count in some monitoring endpoint
Browse files Browse the repository at this point in the history
The detail of a client and channel will now have a `subs_count`
field that is the count of subscriptions, regardless if the
option to ask for subscriptions details is requested.

Even when the subscriptions details is requested, the count field
allows to quickly see how many subscriptions are present in the
array.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 13, 2021
1 parent ca5a8b2 commit 99fe41e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
38 changes: 35 additions & 3 deletions server/monitor.go
Expand Up @@ -92,6 +92,7 @@ type Clientsz struct {
type Clientz struct {
ID string `json:"id"`
HBInbox string `json:"hb_inbox"`
SubsCount int `json:"subs_count"`
Subscriptions map[string][]*Subscriptionz `json:"subscriptions,omitempty"`
}

Expand All @@ -115,6 +116,7 @@ type Channelz struct {
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
LastSeq uint64 `json:"last_seq"`
SubsCount int `json:"subs_count"`
Subscriptions []*Subscriptionz `json:"subscriptions,omitempty"`
}

Expand Down Expand Up @@ -331,13 +333,14 @@ func (s *StanServer) handleClientsz(w http.ResponseWriter, r *http.Request) {

// Since clients may be unregistered between the time we get the client IDs
// and the time we build carr array, lets count the number of elements
// actually intserted.
// actually inserted.
carrSize := 0
for _, c := range carr {
client := s.clients.lookup(c.ID)
if client != nil {
client.RLock()
c.HBInbox = client.info.HbInbox
c.SubsCount = len(client.subs)
if subsOption == 1 {
c.Subscriptions = getMonitorClientSubs(client)
}
Expand Down Expand Up @@ -368,8 +371,9 @@ func getMonitorClient(s *StanServer, clientID string, subsOption int) *Clientz {
cli.RLock()
defer cli.RUnlock()
cz := &Clientz{
HBInbox: cli.info.HbInbox,
ID: cli.info.ID,
HBInbox: cli.info.HbInbox,
ID: cli.info.ID,
SubsCount: len(cli.subs),
}
if subsOption == 1 {
cz.Subscriptions = getMonitorClientSubs(cli)
Expand Down Expand Up @@ -421,6 +425,29 @@ func getMonitorChannelSubs(ss *subStore) []*Subscriptionz {
return subsz
}

func getMonitorChannelSubsCount(ss *subStore) int {
ss.RLock()
defer ss.RUnlock()
count := len(ss.psubs)
// Get only offline durables (the online also appear in ss.psubs)
for _, sub := range ss.durables {
if sub.ClientID == "" {
count++
}
}
for _, qsub := range ss.qsubs {
qsub.RLock()
count += len(qsub.subs)
// If this is a durable queue subscription and all members
// are offline, qsub.shadow will be not nil. Report this one.
if qsub.shadow != nil {
count++
}
qsub.RUnlock()
}
return count
}

func createSubscriptionz(sub *subState) *Subscriptionz {
sub.RLock()
subz := &Subscriptionz{
Expand Down Expand Up @@ -534,9 +561,14 @@ func (s *StanServer) updateChannelz(cz *Channelz, c *channel, subsOption int) er
cz.Bytes = bytes
cz.FirstSeq = fseq
cz.LastSeq = lseq
var subsCount int
if subsOption == 1 {
cz.Subscriptions = getMonitorChannelSubs(c.ss)
subsCount = len(cz.Subscriptions)
} else {
subsCount = getMonitorChannelSubsCount(c.ss)
}
cz.SubsCount = subsCount
return nil
}

Expand Down
27 changes: 16 additions & 11 deletions server/monitor_test.go
Expand Up @@ -634,8 +634,9 @@ func TestMonitorClientsz(t *testing.T) {
cli := s.clients.lookup(cid)
cli.RLock()
cz := &Clientz{
ID: cid,
HBInbox: cli.info.HbInbox,
ID: cid,
HBInbox: cli.info.HbInbox,
SubsCount: len(cli.subs),
}
if expectSubs {
cz.Subscriptions = getCliSubs(cli.subs)
Expand Down Expand Up @@ -761,8 +762,9 @@ func TestMonitorClientz(t *testing.T) {
}
cli.RLock()
cz := &Clientz{
ID: cid,
HBInbox: cli.info.HbInbox,
ID: cid,
HBInbox: cli.info.HbInbox,
SubsCount: len(cli.subs),
}
if expectSubs {
cz.Subscriptions = getCliSubs(cli.subs)
Expand Down Expand Up @@ -985,6 +987,7 @@ func TestMonitorChannelsWithSubsz(t *testing.T) {
qsub.RUnlock()
}
ss.RUnlock()
channelz.SubsCount = len(subscriptions)
channelz.Subscriptions = subscriptions
channelsz.Channels = append(channelsz.Channels, channelz)
}
Expand Down Expand Up @@ -1075,16 +1078,18 @@ func TestMonitorChannelz(t *testing.T) {
}
msgs, bytes := msgStoreState(t, cs.store.Msgs)
firstSeq, lastSeq := msgStoreFirstAndLastSequence(t, cs.store.Msgs)
ss := cs.ss
subs := getChannelSubs(ss.psubs)
channelz := &Channelz{
Name: name,
FirstSeq: firstSeq,
LastSeq: lastSeq,
Msgs: msgs,
Bytes: bytes,
Name: name,
FirstSeq: firstSeq,
LastSeq: lastSeq,
Msgs: msgs,
Bytes: bytes,
SubsCount: len(subs),
}
if expectedSubs {
ss := cs.ss
channelz.Subscriptions = getChannelSubs(ss.psubs)
channelz.Subscriptions = subs
}
return channelz
}
Expand Down

0 comments on commit 99fe41e

Please sign in to comment.