Skip to content

Commit

Permalink
When publishing a message, if no subscription relationship is found, …
Browse files Browse the repository at this point in the history
…send a hook message and add 'nonsubscribed' statistical information
  • Loading branch information
rmqtt committed Aug 13, 2023
1 parent e627e89 commit 719f881
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 50 deletions.
12 changes: 7 additions & 5 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use rmqtt::broker::{Router, Shared};
use rmqtt::{async_trait::async_trait, log};
use rmqtt::{async_trait::async_trait, log, SubscriptionSize};
use rmqtt::{
broker::{
hook::{Handler, HookResult, Parameter, ReturnType},
Expand Down Expand Up @@ -39,8 +39,9 @@ impl Handler for HookHandler {
}
match msg {
Message::Forwards(from, publish) => {
let shared_subs = forwards(from.clone(), publish.clone()).await;
let new_acc = HookResult::GrpcMessageReply(Ok(MessageReply::Forwards(shared_subs)));
let (shared_subs, subs_size) = forwards(from.clone(), publish.clone()).await;
let new_acc =
HookResult::GrpcMessageReply(Ok(MessageReply::Forwards(shared_subs, subs_size)));
return (false, Some(new_acc));
}
Message::ForwardsTo(from, publish, sub_rels) => {
Expand Down Expand Up @@ -152,12 +153,13 @@ impl Handler for HookHandler {
}
}

async fn forwards(from: From, publish: Publish) -> SubRelationsMap {
async fn forwards(from: From, publish: Publish) -> (SubRelationsMap, SubscriptionSize) {
log::debug!("forwards, From: {:?}, publish: {:?}", from, publish);
match Runtime::instance().extends.shared().await.forwards_and_get_shareds(from, publish).await {
Err(droppeds) => {
let subs_size = droppeds.len();
hook_message_dropped(droppeds).await;
SubRelationsMap::default()
(SubRelationsMap::default(), subs_size)
}
Ok(relations_map) => relations_map,
}
Expand Down
36 changes: 25 additions & 11 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use rmqtt::{
session::{ClientInfo, Session, SessionOfflineInfo},
types::{
ClientId, From, Id, IsAdmin, IsOnline, NodeId, Publish, QoS, Reason, SessionStatus, SharedGroup,
SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn, To, TopicFilter, Tx, Unsubscribe,
SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn, SubscriptionSize, To,
TopicFilter, Tx, Unsubscribe,
},
Entry, Shared, SubRelations, SubRelationsMap,
},
Expand Down Expand Up @@ -223,15 +224,20 @@ impl Shared for &'static ClusterShared {
}

#[inline]
async fn forwards(&self, from: From, publish: Publish) -> Result<(), Vec<(To, From, Publish, Reason)>> {
async fn forwards(
&self,
from: From,
publish: Publish,
) -> Result<SubscriptionSize, Vec<(To, From, Publish, Reason)>> {
let this_node_id = Runtime::instance().node.id();
let topic = publish.topic();
log::debug!("forwards, from: {:?}, topic: {:?}", from, topic.to_string());

//Matching subscriptions
let (relations, shared_relations) =
let (relations, shared_relations, subs_size) =
match Runtime::instance().extends.router().await.matches(topic).await {
Ok(mut relations_map) => {
let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();
let mut relations = SubRelations::new();
let mut shared_relations = Vec::new();
for (node_id, rels) in relations_map.drain() {
Expand All @@ -244,11 +250,11 @@ impl Shared for &'static ClusterShared {
}
}
}
(relations, shared_relations)
(relations, shared_relations, subs_size)
}
Err(e) => {
log::warn!("forwards, from:{:?}, topic:{:?}, error: {:?}", from, topic, e);
(Vec::new(), Vec::new())
(Vec::new(), Vec::new(), 0)
}
};

Expand All @@ -262,6 +268,7 @@ impl Shared for &'static ClusterShared {
let grpc_clients = self.grpc_clients.clone();
let message_type = self.message_type;
let inner = self.inner;
let (subs_size_tx, subs_size_rx) = tokio::sync::oneshot::channel();
let broadcast_fut = async move {
//forwards to other node and get shared subscription relations
let replys = MessageBroadcaster::new(
Expand Down Expand Up @@ -301,12 +308,17 @@ impl Shared for &'static ClusterShared {
};

add_to_shared_sub_groups(&mut shared_sub_groups, shared_relations);

let mut all_subs_size = 0;
for (_, reply) in replys {
match reply {
Ok(reply) => {
if let MessageReply::Forwards(mut o_relations_map) = reply {
log::debug!("other noade relations: {:?}", o_relations_map);
if let MessageReply::Forwards(mut o_relations_map, subs_size) = reply {
log::debug!(
"other noade relations: {:?}, subs_size: {}",
o_relations_map,
subs_size
);
all_subs_size += subs_size;
for (node_id, rels) in o_relations_map.drain() {
for (topic_filter, client_id, qos, group) in rels {
if let Some(group) = group {
Expand All @@ -329,6 +341,8 @@ impl Shared for &'static ClusterShared {
}
}

let _ = subs_size_tx.send(all_subs_size);

//shared subscription choice
let mut node_shared_subs: HashMap<NodeId, SubRelations> = HashMap::default();
for (topic_filter, sub_groups) in shared_sub_groups.iter_mut() {
Expand Down Expand Up @@ -376,17 +390,17 @@ impl Shared for &'static ClusterShared {
};

tokio::spawn(broadcast_fut);

let subs_size = subs_size + subs_size_rx.await.unwrap_or_default();
local_res?;
Ok(())
Ok(subs_size)
}

#[inline]
async fn forwards_and_get_shareds(
&self,
from: From,
publish: Publish,
) -> Result<SubRelationsMap, Vec<(To, From, Publish, Reason)>> {
) -> Result<(SubRelationsMap, SubscriptionSize), Vec<(To, From, Publish, Reason)>> {
self.inner.forwards_and_get_shareds(from, publish).await
}

Expand Down
14 changes: 10 additions & 4 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use rmqtt::{
session::{ClientInfo, Session, SessionOfflineInfo},
types::{
From, Id, IsAdmin, NodeId, NodeName, Publish, Reason, SessionStatus, SubsSearchParams,
SubsSearchResult, Subscribe, SubscribeReturn, To, Tx, Unsubscribe,
SubsSearchResult, Subscribe, SubscribeReturn, SubscriptionSize, To, Tx, Unsubscribe,
},
Entry, Shared, SubRelations, SubRelationsMap,
},
Expand Down Expand Up @@ -311,7 +311,11 @@ impl Shared for &'static ClusterShared {
}

#[inline]
async fn forwards(&self, from: From, publish: Publish) -> Result<(), Vec<(To, From, Publish, Reason)>> {
async fn forwards(
&self,
from: From,
publish: Publish,
) -> Result<SubscriptionSize, Vec<(To, From, Publish, Reason)>> {
log::debug!("[forwards] from: {:?}, publish: {:?}", from, publish);

let topic = publish.topic();
Expand All @@ -324,6 +328,8 @@ impl Shared for &'static ClusterShared {
}
};

let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();

let mut errs = Vec::new();

let this_node_id = Runtime::instance().node.id();
Expand Down Expand Up @@ -378,7 +384,7 @@ impl Shared for &'static ClusterShared {
}

if errs.is_empty() {
Ok(())
Ok(subs_size)
} else {
Err(errs)
}
Expand All @@ -399,7 +405,7 @@ impl Shared for &'static ClusterShared {
&self,
from: From,
publish: Publish,
) -> Result<SubRelationsMap, Vec<(To, From, Publish, Reason)>> {
) -> Result<(SubRelationsMap, SubscriptionSize), Vec<(To, From, Publish, Reason)>> {
self.inner.forwards_and_get_shareds(from, publish).await
}

Expand Down
12 changes: 12 additions & 0 deletions rmqtt-plugins/rmqtt-counter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ impl Plugin for CounterPlugin {
self.register
.add_priority(Type::MessageDropped, Priority::MAX, Box::new(CounterHandler::new()))
.await;
self.register
.add_priority(Type::MessageNonsubscribed, Priority::MAX, Box::new(CounterHandler::new()))
.await;

Ok(())
}
Expand Down Expand Up @@ -253,6 +256,15 @@ impl Handler for CounterHandler {
Parameter::MessageDropped(_to, _from, _p, _r) => {
self.metrics.messages_dropped_inc(); //@TODO ... elaboration
}
Parameter::MessageNonsubscribed(from) => {
self.metrics.messages_nonsubscribed_inc();
match from.typ() {
FromType::Custom => self.metrics.messages_nonsubscribed_custom_inc(),
FromType::Admin => self.metrics.messages_nonsubscribed_admin_inc(),
FromType::System => self.metrics.messages_nonsubscribed_system_inc(),
FromType::LastWill => self.metrics.messages_nonsubscribed_lastwill_inc(),
}
}

_ => {
log::error!("parameter is: {:?}", param);
Expand Down
25 changes: 16 additions & 9 deletions rmqtt-plugins/rmqtt-http-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,15 +756,22 @@ async fn _publish(
.unwrap_or(p1);

let replys = Runtime::instance().extends.shared().await.forwards(from.clone(), p1).await;
if let Err(droppeds) = replys {
for (to, from, p, reason) in droppeds {
//Message dropped
Runtime::instance()
.extends
.hook_mgr()
.await
.message_dropped(Some(to), from, p, reason)
.await;
match replys {
Ok(0) => {
//hook, message_nonsubscribed
Runtime::instance().extends.hook_mgr().await.message_nonsubscribed(from.clone()).await;
}
Ok(_) => {}
Err(droppeds) => {
for (to, from, p, reason) in droppeds {
//Message dropped
Runtime::instance()
.extends
.hook_mgr()
.await
.message_dropped(Some(to), from, p, reason)
.await;
}
}
}
};
Expand Down
13 changes: 10 additions & 3 deletions rmqtt-plugins/rmqtt-sys-topic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,16 @@ async fn sys_publish(nodeid: NodeId, topic: String, publish_qos: QoS, payload: s
.await
.unwrap_or(p);

let replys = Runtime::instance().extends.shared().await.forwards(from, p).await;
if let Err(e) = replys {
log::warn!("send system message error, {:?}", e);
let replys = Runtime::instance().extends.shared().await.forwards(from.clone(), p).await;
match replys {
Ok(0) => {
//hook, message_nonsubscribed
Runtime::instance().extends.hook_mgr().await.message_nonsubscribed(from).await;
}
Ok(_) => {}
Err(e) => {
log::warn!("send system message error, {:?}", e);
}
}
}
Err(e) => {
Expand Down
21 changes: 17 additions & 4 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ impl Shared for &'static DefaultShared {
}

#[inline]
async fn forwards(&self, from: From, publish: Publish) -> Result<(), Vec<(To, From, Publish, Reason)>> {
async fn forwards(
&self,
from: From,
publish: Publish,
) -> Result<SubscriptionSize, Vec<(To, From, Publish, Reason)>> {
let topic = publish.topic();
let mut relations_map =
match Runtime::instance().extends.router().await.matches(publish.topic()).await {
Expand All @@ -382,22 +386,24 @@ impl Shared for &'static DefaultShared {
}
};

let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();

let this_node_id = Runtime::instance().node.id();
if let Some(relations) = relations_map.remove(&this_node_id) {
self.forwards_to(from, &publish, relations).await?;
}
if !relations_map.is_empty() {
log::warn!("forwards, relations_map:{:?}", relations_map);
}
Ok(())
Ok(subs_size)
}

#[inline]
async fn forwards_and_get_shareds(
&self,
from: From,
publish: Publish,
) -> Result<SubRelationsMap, Vec<(To, From, Publish, Reason)>> {
) -> Result<(SubRelationsMap, SubscriptionSize), Vec<(To, From, Publish, Reason)>> {
let topic = publish.topic();
log::debug!("forwards_and_get_shareds, from: {:?}, topic: {:?}", from, topic.to_string());
let relations_map = match Runtime::instance().extends.router().await.matches(topic).await {
Expand All @@ -408,6 +414,8 @@ impl Shared for &'static DefaultShared {
}
};

let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();

let mut relations = SubRelations::new();
let mut sub_relations_map = SubRelationsMap::default();
for (node_id, rels) in relations_map {
Expand All @@ -428,7 +436,7 @@ impl Shared for &'static DefaultShared {
if !relations.is_empty() {
self.forwards_to(from, &publish, relations).await?;
}
Ok(sub_relations_map)
Ok((sub_relations_map, subs_size))
}

#[inline]
Expand Down Expand Up @@ -1418,6 +1426,11 @@ impl HookManager for &'static DefaultHookManager {
let _ = self.exec(Type::MessageDropped, Parameter::MessageDropped(to, from, publish, reason)).await;
}

///Publish message nonsubscribed
async fn message_nonsubscribed(&self, from: From) {
let _ = self.exec(Type::MessageNonsubscribed, Parameter::MessageNonsubscribed(from)).await;
}

///grpc message received
async fn grpc_message_received(
&self,
Expand Down
7 changes: 7 additions & 0 deletions rmqtt/src/broker/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub trait HookManager: Sync + Send {
///Publish message Dropped
async fn message_dropped(&self, to: Option<To>, from: From, p: Publish, reason: Reason);

///Publish message nonsubscribed
async fn message_nonsubscribed(&self, from: From);

///grpc message received
async fn grpc_message_received(
&self,
Expand Down Expand Up @@ -138,6 +141,7 @@ pub enum Type {
MessageAcked,
MessageDropped,
MessageExpiryCheck,
MessageNonsubscribed,

GrpcMessageReceived,
}
Expand Down Expand Up @@ -167,6 +171,7 @@ impl std::convert::From<&str> for Type {
"message_acked" => Type::MessageAcked,
"message_dropped" => Type::MessageDropped,
"message_expiry_check" => Type::MessageExpiryCheck,
"message_nonsubscribed" => Type::MessageNonsubscribed,

"grpc_message_received" => Type::GrpcMessageReceived,

Expand Down Expand Up @@ -199,6 +204,7 @@ pub enum Parameter<'a> {
MessageAcked(&'a Session, &'a ClientInfo, From, &'a Publish),
MessageDropped(Option<To>, From, Publish, Reason),
MessageExpiryCheck(&'a Session, &'a ClientInfo, From, &'a Publish),
MessageNonsubscribed(From),

GrpcMessageReceived(grpc::MessageType, grpc::Message),
}
Expand Down Expand Up @@ -228,6 +234,7 @@ impl<'a> Parameter<'a> {
Parameter::MessageAcked(_, _, _, _) => Type::MessageAcked,
Parameter::MessageDropped(_, _, _, _) => Type::MessageDropped,
Parameter::MessageExpiryCheck(_, _, _, _) => Type::MessageExpiryCheck,
Parameter::MessageNonsubscribed(_) => Type::MessageNonsubscribed,

Parameter::GrpcMessageReceived(_, _) => Type::GrpcMessageReceived,
}
Expand Down
6 changes: 6 additions & 0 deletions rmqtt/src/broker/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,10 @@ pub struct Metrics {

messages_delivered_retain: AtomicUsize,
messages_acked_retain: AtomicUsize,

messages_nonsubscribed: AtomicUsize,
messages_nonsubscribed_custom: AtomicUsize,
messages_nonsubscribed_admin: AtomicUsize,
messages_nonsubscribed_lastwill: AtomicUsize,
messages_nonsubscribed_system: AtomicUsize,
}

0 comments on commit 719f881

Please sign in to comment.