From 0dd6193b16c51870d99801e09a7f934f85ed8666 Mon Sep 17 00:00:00 2001 From: rmqtt Date: Sat, 12 Aug 2023 20:20:48 +0800 Subject: [PATCH] Adding statistics for publish, delivered, and acked from different sources. --- rmqtt-plugins/rmqtt-counter/src/lib.rs | 36 ++++++++++++++++++++------ rmqtt/src/broker/metrics.rs | 19 ++++++++++++++ 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/rmqtt-plugins/rmqtt-counter/src/lib.rs b/rmqtt-plugins/rmqtt-counter/src/lib.rs index e8df6d35..3132160d 100644 --- a/rmqtt-plugins/rmqtt-counter/src/lib.rs +++ b/rmqtt-plugins/rmqtt-counter/src/lib.rs @@ -1,7 +1,7 @@ #![deny(unsafe_code)] use rmqtt::broker::hook::Priority; -use rmqtt::{async_trait::async_trait, log}; +use rmqtt::{async_trait::async_trait, log, FromType}; use rmqtt::{ broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type}, broker::metrics::Metrics, @@ -211,7 +211,7 @@ impl Handler for CounterHandler { Parameter::MessagePublishCheckAcl(_session, _client, _p) => { self.metrics.client_publish_check_acl_inc(); } - Parameter::MessagePublish(_session, _client, _from, _p) => { + Parameter::MessagePublish(_session, _client, from, _p) => { // self.metrics.messages_received_inc(); //@TODO ... elaboration // match p.qos{ // QoS::AtMostOnce => self.metrics.messages_received_qos0_inc(), @@ -219,15 +219,35 @@ impl Handler for CounterHandler { // QoS::ExactlyOnce => self.metrics.messages_received_qos2_inc(), // } self.metrics.messages_publish_inc(); + match from.typ() { + FromType::Custom => self.metrics.messages_publish_custom_inc(), + FromType::Admin => self.metrics.messages_publish_admin_inc(), + FromType::System => self.metrics.messages_publish_system_inc(), + FromType::LastWill => self.metrics.messages_publish_lastwill_inc(), + } } - Parameter::MessageDelivered(_session, _client, from, _p) => { - if !from.is_system() { - self.metrics.messages_delivered_inc(); + Parameter::MessageDelivered(_session, _client, from, p) => { + self.metrics.messages_delivered_inc(); + if p.retain { + self.metrics.messages_delivered_retain_inc() + } + match from.typ() { + FromType::Custom => self.metrics.messages_delivered_custom_inc(), + FromType::Admin => self.metrics.messages_delivered_admin_inc(), + FromType::System => self.metrics.messages_delivered_system_inc(), + FromType::LastWill => self.metrics.messages_delivered_lastwill_inc(), } } - Parameter::MessageAcked(_session, _client, from, _p) => { - if !from.is_system() { - self.metrics.messages_acked_inc(); + Parameter::MessageAcked(_session, _client, from, p) => { + self.metrics.messages_acked_inc(); + if p.retain { + self.metrics.messages_acked_retain_inc() + } + match from.typ() { + FromType::Custom => self.metrics.messages_acked_custom_inc(), + FromType::Admin => self.metrics.messages_acked_admin_inc(), + FromType::System => self.metrics.messages_acked_system_inc(), + FromType::LastWill => self.metrics.messages_acked_lastwill_inc(), } } Parameter::MessageDropped(_to, _from, _p, _r) => { diff --git a/rmqtt/src/broker/metrics.rs b/rmqtt/src/broker/metrics.rs index 05e166bc..d72403f9 100644 --- a/rmqtt/src/broker/metrics.rs +++ b/rmqtt/src/broker/metrics.rs @@ -39,4 +39,23 @@ pub struct Metrics { // messages_sent: AtomicUsize, messages_acked: AtomicUsize, messages_dropped: AtomicUsize, + + messages_publish_custom: AtomicUsize, + messages_delivered_custom: AtomicUsize, + messages_acked_custom: AtomicUsize, + + messages_publish_admin: AtomicUsize, + messages_delivered_admin: AtomicUsize, + messages_acked_admin: AtomicUsize, + + messages_publish_lastwill: AtomicUsize, + messages_delivered_lastwill: AtomicUsize, + messages_acked_lastwill: AtomicUsize, + + messages_publish_system: AtomicUsize, + messages_delivered_system: AtomicUsize, + messages_acked_system: AtomicUsize, + + messages_delivered_retain: AtomicUsize, + messages_acked_retain: AtomicUsize, }