Skip to content

Commit

Permalink
Add $SYS/brokers/{node}/message/dropped and send message_publish hook.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 12, 2023
1 parent de06a2a commit 7102f81
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions rmqtt-plugins/rmqtt-sys-topic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
extern crate serde;

use config::PluginConfig;
use rmqtt::bytes::Bytes;
use rmqtt::tokio::time::sleep;
use rmqtt::{
async_trait::async_trait,
base64,
bytes::Bytes,
chrono, log,
serde_json::{self, json},
tokio::spawn,
tokio::sync::RwLock,
ClientId, NodeId, Publish, QoS, TopicName, UserName,
tokio::time::sleep,
};
use rmqtt::{
broker::hook::{Handler, HookResult, Parameter, Register, ReturnType, Type},
broker::types::{From, Id, QoSEx},
plugin::{DynPlugin, DynPluginResult, Plugin},
PublishProperties, Result, Runtime,
ClientId, NodeId, Publish, PublishProperties, QoS, Result, Runtime, TopicName, UserName,
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -276,6 +276,27 @@ impl Handler for SystemTopicHandler {
Some((topic, body))
}

Parameter::MessageDropped(to, from, publish, reason) => {
let body = json!({
"dup": publish.dup(),
"retain": publish.retain(),
"qos": publish.qos().value(),
"topic": publish.topic(),
"packet_id": publish.packet_id(),
"payload": base64::encode(publish.payload()),
"reason": reason.to_string(),
"pts": publish.create_time(),
"ts": now.timestamp_millis(),
"time": now_time
});
let mut body = from.to_from_json(body);
if let Some(to) = to {
body = to.to_to_json(body);
}
let topic = format!("$SYS/brokers/{}/message/dropped", self.nodeid);
Some((topic, body))
}

_ => {
log::error!("unimplemented, {:?}", param);
None
Expand Down Expand Up @@ -312,6 +333,15 @@ async fn sys_publish(nodeid: NodeId, topic: String, publish_qos: QoS, payload: s
create_time: chrono::Local::now().timestamp_millis(),
};

//hook, message_publish
let p = Runtime::instance()
.extends
.hook_mgr()
.await
.message_publish(None, None, from.clone(), &p)
.await
.unwrap_or(p);

let replys = Runtime::instance().extends.shared().await.forwards(from, p).await;
if let Err(e) = replys {
log::warn!("send system message error, {:?}", e);
Expand Down

0 comments on commit 7102f81

Please sign in to comment.