Skip to content

Commit

Permalink
Adding statistics for publish, delivered, and acked from different so…
Browse files Browse the repository at this point in the history
…urces.
  • Loading branch information
rmqtt committed Aug 12, 2023
1 parent 847d26a commit 0dd6193
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
36 changes: 28 additions & 8 deletions rmqtt-plugins/rmqtt-counter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![deny(unsafe_code)]

use rmqtt::broker::hook::Priority;
use rmqtt::{async_trait::async_trait, log};
use rmqtt::{async_trait::async_trait, log, FromType};
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
broker::metrics::Metrics,
Expand Down Expand Up @@ -211,23 +211,43 @@ impl Handler for CounterHandler {
Parameter::MessagePublishCheckAcl(_session, _client, _p) => {
self.metrics.client_publish_check_acl_inc();
}
Parameter::MessagePublish(_session, _client, _from, _p) => {
Parameter::MessagePublish(_session, _client, from, _p) => {
// self.metrics.messages_received_inc(); //@TODO ... elaboration
// match p.qos{
// QoS::AtMostOnce => self.metrics.messages_received_qos0_inc(),
// QoS::AtLeastOnce => self.metrics.messages_received_qos1_inc(),
// QoS::ExactlyOnce => self.metrics.messages_received_qos2_inc(),
// }
self.metrics.messages_publish_inc();
match from.typ() {
FromType::Custom => self.metrics.messages_publish_custom_inc(),
FromType::Admin => self.metrics.messages_publish_admin_inc(),
FromType::System => self.metrics.messages_publish_system_inc(),
FromType::LastWill => self.metrics.messages_publish_lastwill_inc(),
}
}
Parameter::MessageDelivered(_session, _client, from, _p) => {
if !from.is_system() {
self.metrics.messages_delivered_inc();
Parameter::MessageDelivered(_session, _client, from, p) => {
self.metrics.messages_delivered_inc();
if p.retain {
self.metrics.messages_delivered_retain_inc()
}
match from.typ() {
FromType::Custom => self.metrics.messages_delivered_custom_inc(),
FromType::Admin => self.metrics.messages_delivered_admin_inc(),
FromType::System => self.metrics.messages_delivered_system_inc(),
FromType::LastWill => self.metrics.messages_delivered_lastwill_inc(),
}
}
Parameter::MessageAcked(_session, _client, from, _p) => {
if !from.is_system() {
self.metrics.messages_acked_inc();
Parameter::MessageAcked(_session, _client, from, p) => {
self.metrics.messages_acked_inc();
if p.retain {
self.metrics.messages_acked_retain_inc()
}
match from.typ() {
FromType::Custom => self.metrics.messages_acked_custom_inc(),
FromType::Admin => self.metrics.messages_acked_admin_inc(),
FromType::System => self.metrics.messages_acked_system_inc(),
FromType::LastWill => self.metrics.messages_acked_lastwill_inc(),
}
}
Parameter::MessageDropped(_to, _from, _p, _r) => {
Expand Down
19 changes: 19 additions & 0 deletions rmqtt/src/broker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,23 @@ pub struct Metrics {
// messages_sent: AtomicUsize,
messages_acked: AtomicUsize,
messages_dropped: AtomicUsize,

messages_publish_custom: AtomicUsize,
messages_delivered_custom: AtomicUsize,
messages_acked_custom: AtomicUsize,

messages_publish_admin: AtomicUsize,
messages_delivered_admin: AtomicUsize,
messages_acked_admin: AtomicUsize,

messages_publish_lastwill: AtomicUsize,
messages_delivered_lastwill: AtomicUsize,
messages_acked_lastwill: AtomicUsize,

messages_publish_system: AtomicUsize,
messages_delivered_system: AtomicUsize,
messages_acked_system: AtomicUsize,

messages_delivered_retain: AtomicUsize,
messages_acked_retain: AtomicUsize,
}

0 comments on commit 0dd6193

Please sign in to comment.