From e3650cad6072c8af6b3569390b5d9a811e5804a1 Mon Sep 17 00:00:00 2001 From: rmqtt Date: Wed, 9 Aug 2023 23:04:04 +0800 Subject: [PATCH] Perfect plugin: rmqtt-sys-topic, add $SYS/brokers/${node}/stats and $SYS/brokers/${node}/metrics --- rmqtt-plugins/rmqtt-sys-topic/src/lib.rs | 100 ++++++++++++++--------- 1 file changed, 61 insertions(+), 39 deletions(-) diff --git a/rmqtt-plugins/rmqtt-sys-topic/src/lib.rs b/rmqtt-plugins/rmqtt-sys-topic/src/lib.rs index 7f0597d9..68550d56 100644 --- a/rmqtt-plugins/rmqtt-sys-topic/src/lib.rs +++ b/rmqtt-plugins/rmqtt-sys-topic/src/lib.rs @@ -11,7 +11,7 @@ use rmqtt::{ serde_json::{self, json}, tokio::spawn, tokio::sync::RwLock, - ClientId, NodeId, Publish, TopicName, UserName, + ClientId, NodeId, Publish, QoS, TopicName, UserName, }; use rmqtt::{ broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type}, @@ -69,20 +69,41 @@ impl SystemTopicPlugin { Ok(Self { runtime, name, descr: descr.into(), register, cfg, running }) } - fn start(_runtime: &'static Runtime, cfg: Arc>, running: Arc) { + fn start(runtime: &'static Runtime, cfg: Arc>, running: Arc) { spawn(async move { let min = Duration::from_secs(1); loop { - let publish_interval = cfg.read().await.publish_interval; + let (publish_interval, publish_qos) = { + let cfg = cfg.read().await; + (cfg.publish_interval, cfg.publish_qos) + }; let publish_interval = if publish_interval < min { min } else { publish_interval }; sleep(publish_interval).await; if running.load(Ordering::SeqCst) { - //@TODO ... - log::info!("send ... "); + Self::send_stats(runtime, publish_qos).await; + Self::send_metrics(runtime, publish_qos).await; } } }); } + + //Statistics + //$SYS/brokers/${node}/stats + async fn send_stats(runtime: &'static Runtime, publish_qos: QoS) { + let payload = runtime.stats.clone().await.to_json().await; + let nodeid = runtime.node.id(); + let topic = format!("$SYS/brokers/{}/stats", nodeid); + sys_publish(nodeid, topic, publish_qos, payload).await; + } + + //Metrics + //$SYS/brokers/${node}/metrics + async fn send_metrics(runtime: &'static Runtime, publish_qos: QoS) { + let payload = Runtime::instance().metrics.to_json(); + let nodeid = runtime.node.id(); + let topic = format!("$SYS/brokers/{}/metrics", nodeid); + sys_publish(nodeid, topic, publish_qos, payload).await; + } } #[async_trait] @@ -260,43 +281,44 @@ impl Handler for SystemTopicHandler { None } } { - match serde_json::to_string(&payload) { - Ok(payload) => { - //Runtime::instance().metrics.messages_publish_inc(); - let cfg = self.cfg.clone(); - let nodeid = self.nodeid; - let fut = async move { - let from = From::from_system(Id::new( - nodeid, - None, - None, - ClientId::from_static("system"), - Some(UserName::from("system")), - )); + let nodeid = self.nodeid; + let publish_qos = self.cfg.read().await.publish_qos; + spawn(sys_publish(nodeid, topic, publish_qos, payload)); + } + (true, acc) + } +} - let p = Publish { - dup: false, - retain: false, - qos: cfg.read().await.publish_qos, - topic: TopicName::from(topic), - packet_id: None, - payload: Bytes::from(payload), - properties: PublishProperties::default(), - create_time: chrono::Local::now().timestamp_millis(), - }; +#[inline] +async fn sys_publish(nodeid: NodeId, topic: String, publish_qos: QoS, payload: serde_json::Value) { + match serde_json::to_string(&payload) { + Ok(payload) => { + let from = From::from_system(Id::new( + nodeid, + None, + None, + ClientId::from_static("system"), + Some(UserName::from("system")), + )); - let replys = Runtime::instance().extends.shared().await.forwards(from, p).await; - if let Err(e) = replys { - log::warn!("send system message error, {:?}", e); - } - }; - spawn(fut); - } - Err(e) => { - log::error!("{:?}", e); - } + let p = Publish { + dup: false, + retain: false, + qos: publish_qos, + topic: TopicName::from(topic), + packet_id: None, + payload: Bytes::from(payload), + properties: PublishProperties::default(), + create_time: chrono::Local::now().timestamp_millis(), }; + + let replys = Runtime::instance().extends.shared().await.forwards(from, p).await; + if let Err(e) = replys { + log::warn!("send system message error, {:?}", e); + } + } + Err(e) => { + log::error!("{:?}", e); } - (true, acc) } }