forked from VolantMQ/volantmq
/
metrics.go
155 lines (138 loc) · 4.41 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package systree
import (
"sync/atomic"
"github.com/VolantMQ/vlapi/mqttp"
"github.com/VolantMQ/volantmq/types"
)
type metricEntry struct {
sent *dynamicValueInteger
recv *dynamicValueInteger
}
type packetsMetric struct {
total *metricEntry
connect *metricEntry
connAck *metricEntry
publish *metricEntry
subscribe *metricEntry
suback *metricEntry
unsubscribe *metricEntry
unSubAck *metricEntry
pingReq *metricEntry
pingResp *metricEntry
disconnect *metricEntry
auth *metricEntry
}
type bytesMetric struct {
metricEntry
}
type metric struct {
packets *packetsMetric
bytes *bytesMetric
}
func newMetricEntry(topicPrefix string, retained *[]types.RetainObject) *metricEntry {
m := &metricEntry{
sent: newDynamicValueInteger(topicPrefix + "/sent"),
recv: newDynamicValueInteger(topicPrefix + "/received"),
}
*retained = append(*retained, m.sent, m.recv)
return m
}
func newBytesMetric(topicPrefix string, retained *[]types.RetainObject) *bytesMetric {
return &bytesMetric{
metricEntry: *newMetricEntry(topicPrefix+"/bytes", retained),
}
}
func newMetric(topicPrefix string, retained *[]types.RetainObject) metric {
return metric{
packets: newPacketsMetric(topicPrefix+"/metrics", retained),
bytes: newBytesMetric(topicPrefix+"/metrics", retained),
}
}
func newPacketsMetric(topicPrefix string, retained *[]types.RetainObject) *packetsMetric {
return &packetsMetric{
total: newMetricEntry(topicPrefix+"/packets/total", retained),
connect: newMetricEntry(topicPrefix+"/packets/connect", retained),
connAck: newMetricEntry(topicPrefix+"/packets/connack", retained),
publish: newMetricEntry(topicPrefix+"/packets/publish", retained),
subscribe: newMetricEntry(topicPrefix+"/packets/subscribe", retained),
suback: newMetricEntry(topicPrefix+"/packets/suback", retained),
unsubscribe: newMetricEntry(topicPrefix+"/packets/unsubscribe", retained),
unSubAck: newMetricEntry(topicPrefix+"/packets/unsuback", retained),
pingReq: newMetricEntry(topicPrefix+"/packets/pingreq", retained),
pingResp: newMetricEntry(topicPrefix+"/packets/pingresp", retained),
disconnect: newMetricEntry(topicPrefix+"/packets/disconnect", retained),
auth: newMetricEntry(topicPrefix+"/packets/auth", retained),
}
}
// Sent add sent packet to metrics
func (t *packetsMetric) Sent(mt mqttp.Type) {
atomic.AddUint64(&t.total.sent.val, 1)
switch mt {
case mqttp.CONNECT:
atomic.AddUint64(&t.connect.sent.val, 1)
case mqttp.CONNACK:
atomic.AddUint64(&t.connAck.sent.val, 1)
case mqttp.PUBLISH:
atomic.AddUint64(&t.publish.sent.val, 1)
case mqttp.SUBSCRIBE:
atomic.AddUint64(&t.subscribe.sent.val, 1)
case mqttp.SUBACK:
atomic.AddUint64(&t.suback.sent.val, 1)
case mqttp.UNSUBSCRIBE:
atomic.AddUint64(&t.unsubscribe.sent.val, 1)
case mqttp.UNSUBACK:
atomic.AddUint64(&t.unSubAck.sent.val, 1)
case mqttp.PINGREQ:
atomic.AddUint64(&t.pingReq.sent.val, 1)
case mqttp.PINGRESP:
atomic.AddUint64(&t.pingResp.sent.val, 1)
case mqttp.DISCONNECT:
atomic.AddUint64(&t.disconnect.sent.val, 1)
case mqttp.AUTH:
atomic.AddUint64(&t.auth.sent.val, 1)
}
}
// Received add received packet to metrics
func (t *packetsMetric) Received(mt mqttp.Type) {
atomic.AddUint64(&t.total.recv.val, 1)
switch mt {
case mqttp.CONNECT:
atomic.AddUint64(&t.connect.recv.val, 1)
case mqttp.CONNACK:
atomic.AddUint64(&t.connAck.recv.val, 1)
case mqttp.PUBLISH:
atomic.AddUint64(&t.publish.recv.val, 1)
case mqttp.SUBSCRIBE:
atomic.AddUint64(&t.subscribe.recv.val, 1)
case mqttp.SUBACK:
atomic.AddUint64(&t.suback.recv.val, 1)
case mqttp.UNSUBSCRIBE:
atomic.AddUint64(&t.unsubscribe.recv.val, 1)
case mqttp.UNSUBACK:
atomic.AddUint64(&t.unSubAck.recv.val, 1)
case mqttp.PINGREQ:
atomic.AddUint64(&t.pingReq.recv.val, 1)
case mqttp.PINGRESP:
atomic.AddUint64(&t.pingResp.recv.val, 1)
case mqttp.DISCONNECT:
atomic.AddUint64(&t.disconnect.recv.val, 1)
case mqttp.AUTH:
atomic.AddUint64(&t.auth.recv.val, 1)
}
}
// Bytes get bytes metric provider
func (t *metric) Bytes() BytesMetric {
return t.bytes
}
// Packets get packets metric provider
func (t *metric) Packets() PacketsMetric {
return t.packets
}
// Sent add sent bytes to statistic
func (t *bytesMetric) Sent(bytes uint64) {
atomic.AddUint64(&t.sent.val, bytes)
}
// Received add received bytes to statistic
func (t *bytesMetric) Received(bytes uint64) {
atomic.AddUint64(&t.recv.val, bytes)
}