Skip to content

Commit

Permalink
add nsqd server side messageCount topology stats
Browse files Browse the repository at this point in the history
  • Loading branch information
zoemccormick committed Apr 5, 2024
1 parent 178da24 commit b84cfe2
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 135 deletions.
5 changes: 4 additions & 1 deletion internal/clusterinfo/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,11 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
if selectedTopic != "" && topic.TopicName != selectedTopic {
continue
}
topicStatsList = append(topicStatsList, topic)

for _, channel := range topic.Channels {
topic.ZoneLocalMsgCount += channel.ZoneLocalMsgCount
topic.RegionLocalMsgCount += channel.RegionLocalMsgCount
topic.GlobalMsgCount += channel.GlobalMsgCount
channel.Node = addr
channel.Hostname = p.Hostname
channel.TopicName = topic.TopicName
Expand All @@ -620,6 +622,7 @@ func (c *ClusterInfo) GetNSQDStats(producers Producers,
}
channelStats.Add(channel)
}
topicStatsList = append(topicStatsList, topic)
}
}(p)
}
Expand Down
66 changes: 39 additions & 27 deletions internal/clusterinfo/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,19 @@ func (p *Producer) IsInconsistent(numLookupd int) bool {
}

type TopicStats struct {
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount int64 `json:"message_count"`
NodeStats []*TopicStats `json:"nodes"`
Channels []*ChannelStats `json:"channels"`
Paused bool `json:"paused"`
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount int64 `json:"message_count"`
ZoneLocalMsgCount int64 `json:"zone_local_msg_count,omitempty"`
RegionLocalMsgCount int64 `json:"region_local_msg_count,omitempty"`
GlobalMsgCount int64 `json:"global_msg_count,omitempty"`
NodeStats []*TopicStats `json:"nodes"`
Channels []*ChannelStats `json:"channels"`
Paused bool `json:"paused"`

E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}
Expand All @@ -118,6 +121,9 @@ func (t *TopicStats) Add(a *TopicStats) {
t.MemoryDepth += a.MemoryDepth
t.BackendDepth += a.BackendDepth
t.MessageCount += a.MessageCount
t.ZoneLocalMsgCount += a.ZoneLocalMsgCount
t.RegionLocalMsgCount += a.RegionLocalMsgCount
t.GlobalMsgCount += a.GlobalMsgCount
if a.Paused {
t.Paused = a.Paused
}
Expand Down Expand Up @@ -145,23 +151,26 @@ func (t *TopicStats) Add(a *TopicStats) {
}

type ChannelStats struct {
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int64 `json:"in_flight_count"`
DeferredCount int64 `json:"deferred_count"`
RequeueCount int64 `json:"requeue_count"`
TimeoutCount int64 `json:"timeout_count"`
MessageCount int64 `json:"message_count"`
ClientCount int `json:"client_count"`
Selected bool `json:"-"`
NodeStats []*ChannelStats `json:"nodes"`
Clients []*ClientStats `json:"clients"`
Paused bool `json:"paused"`
Node string `json:"node"`
Hostname string `json:"hostname"`
TopicName string `json:"topic_name"`
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"`
MemoryDepth int64 `json:"memory_depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int64 `json:"in_flight_count"`
DeferredCount int64 `json:"deferred_count"`
RequeueCount int64 `json:"requeue_count"`
TimeoutCount int64 `json:"timeout_count"`
MessageCount int64 `json:"message_count"`
ZoneLocalMsgCount int64 `json:"zone_local_msg_count,omitempty"`
RegionLocalMsgCount int64 `json:"region_local_msg_count,omitempty"`
GlobalMsgCount int64 `json:"global_msg_count,omitempty"`
ClientCount int `json:"client_count"`
Selected bool `json:"-"`
NodeStats []*ChannelStats `json:"nodes"`
Clients []*ClientStats `json:"clients"`
Paused bool `json:"paused"`

E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
}
Expand All @@ -176,6 +185,9 @@ func (c *ChannelStats) Add(a *ChannelStats) {
c.RequeueCount += a.RequeueCount
c.TimeoutCount += a.TimeoutCount
c.MessageCount += a.MessageCount
c.ZoneLocalMsgCount += a.ZoneLocalMsgCount
c.RegionLocalMsgCount += a.RegionLocalMsgCount
c.GlobalMsgCount += a.GlobalMsgCount
c.ClientCount += a.ClientCount
if a.Paused {
c.Paused = a.Paused
Expand Down
2 changes: 1 addition & 1 deletion nsqadmin/static/build/main.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nsqadmin/static/build/main.js.map

Large diffs are not rendered by default.

119 changes: 86 additions & 33 deletions nsqadmin/static/js/views/channel.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,22 @@

<div class="row">
<div class="col-md-12">
<h4>Channel</h4>
<h3>Channel</h3>
<h4>Message Queues</h4>
<table class="table table-bordered table-condensed">
{{#if e2e_processing_latency.percentiles.length}}
<tr>
<th>&nbsp;</th>
<th colspan="4" class="text-center">Message Queues</th>
<th colspan="{{#if graph_active}}5{{else}}4{{/if}}" class="text-center">Statistics</th>
{{#if e2e_processing_latency.percentiles.length}}
<th colspan="{{e2e_processing_latency.percentiles.length}}">E2E Processing Latency</th>
{{/if}}
</tr>
{{/if}}
<tr>
<th>NSQd Host</th>
<th>Depth</th>
<th>Memory + Disk</th>
<th>In-Flight</th>
<th>Deferred</th>
<th>Requeued</th>
<th>Timed Out</th>
<th>Messages</th>
{{#if graph_active}}<th>Rate</th>{{/if}}
<th>Connections</th>
{{#each e2e_processing_latency.percentiles}}
<th>{{floatToPercent quantile}}<sup>{{percSuffix quantile}}</sup></th>
{{/each}}
Expand All @@ -85,13 +80,6 @@
<td>{{commafy memory_depth}} + {{commafy backend_depth}}</td>
<td>{{commafy in_flight_count}}</td>
<td>{{commafy deferred_count}}</td>
<td>{{commafy requeue_count}}</td>
<td>{{commafy timeout_count}}</td>
<td>{{commafy message_count}}</td>
{{#if ../graph_active}}
<td class="bold rate" target="{{rate "topic" node topic_name ""}}"></td>
{{/if}}
<td>{{commafy client_count}}</td>
{{#if e2e_processing_latency.percentiles.length}}
{{#each e2e_processing_latency.percentiles}}
<td>
Expand All @@ -107,11 +95,6 @@
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "in_flight_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "in_flight_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "deferred_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "deferred_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "requeue_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "requeue_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "timeout_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "timeout_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "message_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "message_count"}}"></a></td>
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "clients"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "clients"}}"></a></td>
{{#if e2e_processing_latency.percentiles.length}}
<td colspan="{{e2e_processing_latency.percentiles.length}}">
<a href="{{large_graph "e2e" node e2e_processing_latency "" "e2e_processing_latency"}}"><img width="120" height="20" src="{{sparkline "e2e" node e2e_processing_latency "" "e2e_processing_latency"}}"></a>
Expand All @@ -126,13 +109,6 @@
<td>{{commafy memory_depth}} + {{commafy backend_depth}}</td>
<td>{{commafy in_flight_count}}</td>
<td>{{commafy deferred_count}}</td>
<td>{{commafy requeue_count}}</td>
<td>{{commafy timeout_count}}</td>
<td>{{commafy message_count}}</td>
{{#if graph_active}}
<td class="bold rate" target="{{rate "topic" node topic_name ""}}"></td>
{{/if}}
<td>{{commafy client_count}}</td>
{{#if e2e_processing_latency.percentiles.length}}
{{#each e2e_processing_latency.percentiles}}
<td>
Expand All @@ -148,24 +124,101 @@
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "in_flight_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "in_flight_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "deferred_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "deferred_count"}}"></a></td>
{{#if e2e_processing_latency.percentiles.length}}
<td colspan="{{e2e_processing_latency.percentiles.length}}">
<a href="{{large_graph "e2e" node e2e_processing_latency "" "e2e_processing_latency"}}"><img width="120" height="20" src="{{sparkline "e2e" node e2e_processing_latency "" "e2e_processing_latency"}}"></a>
</td>
{{/if}}
</tr>
{{/if}}
</table>
</div>
</div>

<div class="row">
<div class="col-md-12">
<h4>Statistics</h4>
<table class="table table-bordered table-condensed">
<tr>
<th>NSQd Host</th>
<th>Requeued</th>
<th>Timed Out</th>
<th>Global Messages</th>
<th>RegionLocal Messages</th>
<th>ZoneLocal Messages</th>
<th>Messages</th>
{{#if graph_active}}<th>Rate</th>{{/if}}
<th>Connections</th>
</tr>
{{#each nodes}}
<tr>
<td>
{{#if show_broadcast_address}}
{{hostname_port}} (<a class="link" href="{{basePath "/nodes"}}/{{node}}">{{node}}</a>)
{{else}}
<a class="link" href="{{basePath "/nodes"}}/{{node}}">{{hostname_port}}</a>
{{/if}}
{{#if paused}} <span class="label label-primary">paused</span>{{/if}}
</td>
<td>{{commafy requeue_count}}</td>
<td>{{commafy timeout_count}}</td>
<td>{{commafy global_msg_count}}</td>
<td>{{commafy region_local_msg_count}}</td>
<td>{{commafy zone_local_msg_count}}</td>
<td>{{commafy message_count}}</td>
{{#if ../graph_active}}
<td class="bold rate" target="{{rate "topic" node topic_name ""}}"></td>
{{/if}}
<td>{{commafy client_count}}</td>
</tr>
{{#if ../graph_active}}
<tr class="graph-row">
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "requeue_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "requeue_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "timeout_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "timeout_count"}}"></a></td>
<td></td>
<td></td>
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "message_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "message_count"}}"></a></td>
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "clients"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "clients"}}"></a></td>
{{#if e2e_processing_latency.percentiles.length}}
<td colspan="{{e2e_processing_latency.percentiles.length}}">
<a href="{{large_graph "e2e" node e2e_processing_latency "" "e2e_processing_latency"}}"><img width="120" height="20" src="{{sparkline "e2e" node e2e_processing_latency "" "e2e_processing_latency"}}"></a>
</td>
</tr>
{{/if}}
{{/each}}
<tr class="info">
<td>Total:</td>
<td>{{commafy requeue_count}}</td>
<td>{{commafy timeout_count}}</td>
<td>{{commafy global_msg_count}}</td>
<td>{{commafy region_local_msg_count}}</td>
<td>{{commafy zone_local_msg_count}}</td>
<td>{{commafy message_count}}</td>
{{#if graph_active}}
<td class="bold rate" target="{{rate "topic" node topic_name ""}}"></td>
{{/if}}
<td>{{commafy client_count}}</td>
</tr>
{{#if graph_active}}
<tr class="graph-row">
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "requeue_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "requeue_count"}}"></a></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "timeout_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "timeout_count"}}"></a></td>
<td></td>
<td></td>
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "message_count"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "message_count"}}"></a></td>
<td></td>
<td><a href="{{large_graph "channel" node topic_name channel_name "clients"}}"><img width="120" height="20" src="{{sparkline "channel" node topic_name channel_name "clients"}}"></a></td>
</tr>
{{/if}}
</table>
</div>
</div>


{{/unless}}

<h4>Client Connections</h4>
<h3>Client Connections</h3>

<div class="row">
<div class="col-md-12">
Expand Down
16 changes: 13 additions & 3 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ type Consumer interface {
// messages, timeouts, requeuing, etc.
type Channel struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
requeueCount uint64
messageCount uint64
timeoutCount uint64
requeueCount uint64
messageCount uint64
zoneLocalMsgCount uint64
regionLocalMsgCount uint64
globalMsgCount uint64
timeoutCount uint64

sync.RWMutex

Expand Down Expand Up @@ -324,23 +327,29 @@ func (c *Channel) put(m *Message) error {
// attempt a higher priority channel can still win
select {
case c.zoneLocalMsgChan <- m:
atomic.AddUint64(&c.zoneLocalMsgCount, 1)
return nil
default:
}
select {
case c.zoneLocalMsgChan <- m:
atomic.AddUint64(&c.zoneLocalMsgCount, 1)
return nil
case c.regionLocalMsgChan <- m:
atomic.AddUint64(&c.regionLocalMsgCount, 1)
return nil
default:
}

select {
case c.zoneLocalMsgChan <- m:
atomic.AddUint64(&c.zoneLocalMsgCount, 1)
return nil
case c.regionLocalMsgChan <- m:
atomic.AddUint64(&c.regionLocalMsgCount, 1)
return nil
case c.memoryMsgChan <- m:
atomic.AddUint64(&c.globalMsgCount, 1)
return nil
default:
}
Expand All @@ -349,6 +358,7 @@ func (c *Channel) put(m *Message) error {

select {
case c.memoryMsgChan <- m:
atomic.AddUint64(&c.globalMsgCount, 1)
return nil
default:
}
Expand Down

0 comments on commit b84cfe2

Please sign in to comment.