Skip to content

Commit

Permalink
Perfect plugin: rmqtt-sys-topic, add $SYS/brokers/${node}/stats and $…
Browse files Browse the repository at this point in the history
…SYS/brokers/${node}/metrics
  • Loading branch information
rmqtt committed Aug 9, 2023
1 parent 0caf0e6 commit e3650ca
Showing 1 changed file with 61 additions and 39 deletions.
100 changes: 61 additions & 39 deletions rmqtt-plugins/rmqtt-sys-topic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rmqtt::{
serde_json::{self, json},
tokio::spawn,
tokio::sync::RwLock,
ClientId, NodeId, Publish, TopicName, UserName,
ClientId, NodeId, Publish, QoS, TopicName, UserName,
};
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
Expand Down Expand Up @@ -69,20 +69,41 @@ impl SystemTopicPlugin {
Ok(Self { runtime, name, descr: descr.into(), register, cfg, running })
}

fn start(_runtime: &'static Runtime, cfg: Arc<RwLock<PluginConfig>>, running: Arc<AtomicBool>) {
fn start(runtime: &'static Runtime, cfg: Arc<RwLock<PluginConfig>>, running: Arc<AtomicBool>) {
spawn(async move {
let min = Duration::from_secs(1);
loop {
let publish_interval = cfg.read().await.publish_interval;
let (publish_interval, publish_qos) = {
let cfg = cfg.read().await;
(cfg.publish_interval, cfg.publish_qos)
};
let publish_interval = if publish_interval < min { min } else { publish_interval };
sleep(publish_interval).await;
if running.load(Ordering::SeqCst) {
//@TODO ...
log::info!("send ... ");
Self::send_stats(runtime, publish_qos).await;
Self::send_metrics(runtime, publish_qos).await;
}
}
});
}

//Statistics
//$SYS/brokers/${node}/stats
async fn send_stats(runtime: &'static Runtime, publish_qos: QoS) {
let payload = runtime.stats.clone().await.to_json().await;
let nodeid = runtime.node.id();
let topic = format!("$SYS/brokers/{}/stats", nodeid);
sys_publish(nodeid, topic, publish_qos, payload).await;
}

//Metrics
//$SYS/brokers/${node}/metrics
async fn send_metrics(runtime: &'static Runtime, publish_qos: QoS) {
let payload = Runtime::instance().metrics.to_json();
let nodeid = runtime.node.id();
let topic = format!("$SYS/brokers/{}/metrics", nodeid);
sys_publish(nodeid, topic, publish_qos, payload).await;
}
}

#[async_trait]
Expand Down Expand Up @@ -260,43 +281,44 @@ impl Handler for SystemTopicHandler {
None
}
} {
match serde_json::to_string(&payload) {
Ok(payload) => {
//Runtime::instance().metrics.messages_publish_inc();
let cfg = self.cfg.clone();
let nodeid = self.nodeid;
let fut = async move {
let from = From::from_system(Id::new(
nodeid,
None,
None,
ClientId::from_static("system"),
Some(UserName::from("system")),
));
let nodeid = self.nodeid;
let publish_qos = self.cfg.read().await.publish_qos;
spawn(sys_publish(nodeid, topic, publish_qos, payload));
}
(true, acc)
}
}

let p = Publish {
dup: false,
retain: false,
qos: cfg.read().await.publish_qos,
topic: TopicName::from(topic),
packet_id: None,
payload: Bytes::from(payload),
properties: PublishProperties::default(),
create_time: chrono::Local::now().timestamp_millis(),
};
#[inline]
async fn sys_publish(nodeid: NodeId, topic: String, publish_qos: QoS, payload: serde_json::Value) {
match serde_json::to_string(&payload) {
Ok(payload) => {
let from = From::from_system(Id::new(
nodeid,
None,
None,
ClientId::from_static("system"),
Some(UserName::from("system")),
));

let replys = Runtime::instance().extends.shared().await.forwards(from, p).await;
if let Err(e) = replys {
log::warn!("send system message error, {:?}", e);
}
};
spawn(fut);
}
Err(e) => {
log::error!("{:?}", e);
}
let p = Publish {
dup: false,
retain: false,
qos: publish_qos,
topic: TopicName::from(topic),
packet_id: None,
payload: Bytes::from(payload),
properties: PublishProperties::default(),
create_time: chrono::Local::now().timestamp_millis(),
};

let replys = Runtime::instance().extends.shared().await.forwards(from, p).await;
if let Err(e) = replys {
log::warn!("send system message error, {:?}", e);
}
}
Err(e) => {
log::error!("{:?}", e);
}
(true, acc)
}
}

0 comments on commit e3650ca

Please sign in to comment.