forked from nsqio/nsq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stats.go
125 lines (103 loc) · 3.34 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"sort"
)
type TopicStats struct {
TopicName string `json:"topic_name"`
Channels []ChannelStats `json:"channels"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
}
func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats {
return TopicStats{
TopicName: t.name,
Channels: channels,
Depth: t.Depth(),
BackendDepth: t.backend.Depth(),
MessageCount: t.messageCount,
}
}
type ChannelStats struct {
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int `json:"in_flight_count"`
DeferredCount int `json:"deferred_count"`
MessageCount uint64 `json:"message_count"`
RequeueCount uint64 `json:"requeue_count"`
TimeoutCount uint64 `json:"timeout_count"`
Clients []ClientStats `json:"clients"`
Paused bool `json:"paused"`
}
func NewChannelStats(c *Channel, clients []ClientStats) ChannelStats {
return ChannelStats{
ChannelName: c.name,
Depth: c.Depth(),
BackendDepth: c.backend.Depth(),
InFlightCount: len(c.inFlightMessages),
DeferredCount: len(c.deferredMessages),
MessageCount: c.messageCount,
RequeueCount: c.requeueCount,
TimeoutCount: c.timeoutCount,
Clients: clients,
Paused: c.IsPaused(),
}
}
type ClientStats struct {
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
Name string `json:"name"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
}
type Topics []*Topic
func (t Topics) Len() int { return len(t) }
func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
type TopicsByName struct {
Topics
}
func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j].name }
type Channels []*Channel
func (c Channels) Len() int { return len(c) }
func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
type ChannelsByName struct {
Channels
}
func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name }
func (n *NSQd) getStats() []TopicStats {
n.RLock()
defer n.RUnlock()
realTopics := make([]*Topic, 0, len(n.topicMap))
for _, t := range n.topicMap {
realTopics = append(realTopics, t)
}
sort.Sort(TopicsByName{realTopics})
topics := make([]TopicStats, 0, len(n.topicMap))
for _, t := range realTopics {
t.RLock()
realChannels := make([]*Channel, 0, len(t.channelMap))
for _, c := range t.channelMap {
realChannels = append(realChannels, c)
}
sort.Sort(ChannelsByName{realChannels})
channels := make([]ChannelStats, 0, len(t.channelMap))
for _, c := range realChannels {
c.RLock()
clients := make([]ClientStats, 0, len(c.clients))
for _, client := range c.clients {
clients = append(clients, client.Stats())
}
channels = append(channels, NewChannelStats(c, clients))
c.RUnlock()
}
topics = append(topics, NewTopicStats(t, channels))
t.RUnlock()
}
return topics
}