Skip to content

Commit

Permalink
Improve MQTT V5 protocol: Subscription Options
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 20, 2023
1 parent 23132f4 commit bb710db
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 225 deletions.
3 changes: 2 additions & 1 deletion rmqtt-plugins/rmqtt-acl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ impl Handler for AclHandler {
(
false,
Some(HookResult::SubscribeAclResult(SubscribeAclResult::new_success(
subscribe.qos,
subscribe.opts.qos(),
None,
))),
)
} else {
Expand Down
5 changes: 4 additions & 1 deletion rmqtt-plugins/rmqtt-auth-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,10 @@ impl Handler for AuthHandler {
return match acl_res {
ResponseResult::Allow(_) => (
false,
Some(HookResult::SubscribeAclResult(SubscribeAclResult::new_success(subscribe.qos))),
Some(HookResult::SubscribeAclResult(SubscribeAclResult::new_success(
subscribe.opts.qos(),
None,
))),
),
ResponseResult::Deny => (
false,
Expand Down
18 changes: 6 additions & 12 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use itertools::Itertools;
use once_cell::sync::OnceCell;

use rmqtt::stats::Counter;
use rmqtt::{async_trait::async_trait, itertools, log, once_cell, serde_json};
use rmqtt::{
broker::{
default::DefaultRouter,
types::{Id, NodeId, QoS, Route, SharedGroup, TopicName},
types::{Id, NodeId, Route, SubscriptionOptions, TopicName},
Router, SubRelationsMap,
},
grpc::{GrpcClients, Message, MessageBroadcaster, MessageReply, MessageSender, MessageType},
stats::Counter,
HashMap, Result, TopicFilter,
};

Expand All @@ -35,14 +35,8 @@ impl ClusterRouter {
#[async_trait]
impl Router for &'static ClusterRouter {
#[inline]
async fn add(
&self,
topic_filter: &str,
id: Id,
qos: QoS,
shared_group: Option<SharedGroup>,
) -> Result<()> {
self.inner.add(topic_filter, id, qos, shared_group).await
async fn add(&self, topic_filter: &str, id: Id, opts: SubscriptionOptions) -> Result<()> {
self.inner.add(topic_filter, id, opts).await
}

#[inline]
Expand All @@ -51,8 +45,8 @@ impl Router for &'static ClusterRouter {
}

#[inline]
async fn matches(&self, topic: &TopicName) -> Result<SubRelationsMap> {
self.inner.matches(topic).await
async fn matches(&self, id: Id, topic: &TopicName) -> Result<SubRelationsMap> {
self.inner.matches(id, topic).await
}

///Check online or offline
Expand Down
21 changes: 11 additions & 10 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::convert::From as _f;
use once_cell::sync::OnceCell;

use rmqtt::grpc::MessageSender;
use rmqtt::{ahash, async_trait::async_trait, futures, log, once_cell, tokio};
use rmqtt::{ahash, async_trait::async_trait, futures, log, once_cell, tokio, SubscriptionOptions};
use rmqtt::{
broker::{
default::DefaultShared,
session::{ClientInfo, Session, SessionOfflineInfo},
types::{
ClientId, From, Id, IsAdmin, IsOnline, NodeId, Publish, QoS, Reason, SessionStatus, SharedGroup,
ClientId, From, Id, IsAdmin, IsOnline, NodeId, Publish, Reason, SessionStatus, SharedGroup,
SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn, SubscriptionSize, To,
TopicFilter, Tx, Unsubscribe,
},
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Shared for &'static ClusterShared {

//Matching subscriptions
let (relations, shared_relations, subs_size) =
match Runtime::instance().extends.router().await.matches(topic).await {
match Runtime::instance().extends.router().await.matches(from.id.clone(), topic).await {
Ok(mut relations_map) => {
let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();
let mut relations = SubRelations::new();
Expand Down Expand Up @@ -281,21 +281,22 @@ impl Shared for &'static ClusterShared {

type SharedSubGroups = HashMap<
TopicFilter, //key is TopicFilter
HashMap<SharedGroup, Vec<(NodeId, ClientId, QoS, Option<IsOnline>)>>,
HashMap<SharedGroup, Vec<(NodeId, ClientId, SubscriptionOptions, Option<IsOnline>)>>,
>;
type SharedRelation = (TopicFilter, NodeId, ClientId, QoS, (SharedGroup, IsOnline));
type SharedRelation =
(TopicFilter, NodeId, ClientId, SubscriptionOptions, (SharedGroup, IsOnline));

#[allow(clippy::mutable_key_type)]
let mut shared_sub_groups: SharedSubGroups = HashMap::default();

let add_one_to_shared_sub_groups =
|shared_groups: &mut SharedSubGroups, shared_rel: SharedRelation| {
let (topic_filter, node_id, client_id, qos, (group, is_online)) = shared_rel;
let (topic_filter, node_id, client_id, opts, (group, is_online)) = shared_rel;
if let Some(groups) = shared_groups.get_mut(&topic_filter) {
groups.entry(group).or_default().push((node_id, client_id, qos, Some(is_online)));
groups.entry(group).or_default().push((node_id, client_id, opts, Some(is_online)));
} else {
let mut groups = HashMap::default();
groups.insert(group, vec![(node_id, client_id, qos, Some(is_online))]);
groups.insert(group, vec![(node_id, client_id, opts, Some(is_online))]);
shared_groups.insert(topic_filter, groups);
}
};
Expand All @@ -320,11 +321,11 @@ impl Shared for &'static ClusterShared {
);
all_subs_size += subs_size;
for (node_id, rels) in o_relations_map.drain() {
for (topic_filter, client_id, qos, group) in rels {
for (topic_filter, client_id, opts, group) in rels {
if let Some(group) = group {
add_one_to_shared_sub_groups(
&mut shared_sub_groups,
(topic_filter, node_id, client_id, qos, group),
(topic_filter, node_id, client_id, opts, group),
);
}
}
Expand Down
6 changes: 3 additions & 3 deletions rmqtt-plugins/rmqtt-cluster-raft/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use rmqtt_raft::Status;

use rmqtt::broker::types::{Id, NodeId, QoS, SharedGroup};
use rmqtt::Result;
use rmqtt::broker::types::{Id, NodeId};
use rmqtt::{anyhow, bincode};
use rmqtt::{Result, SubscriptionOptions};

use super::Mailbox;

Expand All @@ -12,7 +12,7 @@ pub enum Message<'a> {
Connected { id: Id },
Disconnected { id: Id },
SessionTerminated { id: Id },
Add { topic_filter: &'a str, id: Id, qos: QoS, shared_group: Option<SharedGroup> },
Add { topic_filter: &'a str, id: Id, opts: SubscriptionOptions },
Remove { topic_filter: &'a str, id: Id },
//get client node id
GetClientNodeId { client_id: &'a str },
Expand Down
49 changes: 14 additions & 35 deletions rmqtt-plugins/rmqtt-cluster-raft/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ use rmqtt_raft::{Error, Mailbox, Result as RaftResult, Store};
use tokio::sync::RwLock;

use rmqtt::rust_box::task_exec_queue::SpawnExt;
use rmqtt::stats::Counter;
use rmqtt::{
ahash, anyhow, async_trait::async_trait, bincode, chrono, dashmap, log, once_cell, serde_json, tokio,
MqttError,
};
use rmqtt::{
broker::{
default::DefaultRouter,
topic::TopicTree,
types::{
ClientId, Id, IsOnline, NodeId, QoS, Route, SharedGroup, TimestampMillis, TopicFilter, TopicName,
ClientId, Id, IsOnline, NodeId, Route, SubscriptionOptions, TimestampMillis, TopicFilter,
TopicName,
},
Router, SubRelationsMap,
},
Result,
stats::Counter,
MqttError, Result,
};

use crate::task_exec_queue;
Expand Down Expand Up @@ -114,22 +114,10 @@ impl ClusterRouter {
#[async_trait]
impl Router for &'static ClusterRouter {
#[inline]
async fn add(
&self,
topic_filter: &str,
id: Id,
qos: QoS,
shared_group: Option<SharedGroup>,
) -> Result<()> {
log::debug!(
"[Router.add] topic_filter: {:?}, id: {:?}, qos: {:?}, shared_group: {:?}",
topic_filter,
id,
qos,
shared_group
);

let msg = Message::Add { topic_filter, id, qos, shared_group }.encode()?;
async fn add(&self, topic_filter: &str, id: Id, opts: SubscriptionOptions) -> Result<()> {
log::debug!("[Router.add] topic_filter: {:?}, id: {:?}, opts: {:?}", topic_filter, id, opts);

let msg = Message::Add { topic_filter, id, opts }.encode()?;
let mailbox = self.raft_mailbox().await;
let _ = async move { mailbox.send(msg).await.map_err(anyhow::Error::new) }
.spawn(task_exec_queue())
Expand Down Expand Up @@ -165,8 +153,8 @@ impl Router for &'static ClusterRouter {
}

#[inline]
async fn matches(&self, topic: &TopicName) -> Result<SubRelationsMap> {
self.inner.matches(topic).await
async fn matches(&self, id: Id, topic: &TopicName) -> Result<SubRelationsMap> {
self.inner.matches(id, topic).await
}

///Check online or offline
Expand Down Expand Up @@ -309,18 +297,9 @@ impl Store for &'static ClusterRouter {
}
});
}
Message::Add { topic_filter, id, qos, shared_group } => {
log::debug!(
"[Router.add] topic_filter: {:?}, id: {:?}, qos: {:?}, shared_group: {:?}",
topic_filter,
id,
qos,
shared_group
);
self.inner
.add(topic_filter, id, qos, shared_group)
.await
.map_err(|e| Error::Other(Box::new(e)))?;
Message::Add { topic_filter, id, opts } => {
log::debug!("[Router.add] topic_filter: {:?}, id: {:?}, opts: {:?}", topic_filter, id, opts);
self.inner.add(topic_filter, id, opts).await.map_err(|e| Error::Other(Box::new(e)))?;
}
Message::Remove { topic_filter, id } => {
log::debug!("[Router.remove] topic_filter: {:?}, id: {:?}", topic_filter, id,);
Expand Down Expand Up @@ -386,7 +365,7 @@ impl Store for &'static ClusterRouter {

let (topics, relations, client_states, topics_count, relations_count): (
TopicTree<()>,
Vec<(TopicFilter, HashMap<ClientId, (Id, QoS, Option<SharedGroup>)>)>,
Vec<(TopicFilter, HashMap<ClientId, (Id, SubscriptionOptions)>)>,
Vec<(ClientId, ClientStatus)>,
Counter,
Counter,
Expand Down
21 changes: 13 additions & 8 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,19 @@ impl Shared for &'static ClusterShared {
log::debug!("[forwards] from: {:?}, publish: {:?}", from, publish);

let topic = publish.topic();
let mut relations_map =
match Runtime::instance().extends.router().await.matches(publish.topic()).await {
Ok(relations_map) => relations_map,
Err(e) => {
log::warn!("forwards, from:{:?}, topic:{:?}, error: {:?}", from, topic, e);
SubRelationsMap::default()
}
};
let mut relations_map = match Runtime::instance()
.extends
.router()
.await
.matches(from.id.clone(), publish.topic())
.await
{
Ok(relations_map) => relations_map,
Err(e) => {
log::warn!("forwards, from:{:?}, topic:{:?}, error: {:?}", from, topic, e);
SubRelationsMap::default()
}
};

let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();

Expand Down
4 changes: 1 addition & 3 deletions rmqtt-plugins/rmqtt-sys-topic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,7 @@ impl Handler for SystemTopicHandler {
"clientid": client.id.client_id,
"username": client.id.username_ref(),
"topic": subscribe.topic_filter,
"opts": json!({
"qos": subscribe.qos.value()
}),
"opts": subscribe.opts.to_json(),
"time": now_time
});
let topic =
Expand Down
8 changes: 2 additions & 6 deletions rmqtt-plugins/rmqtt-web-hook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,7 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username_ref(),
"topic": subscribe.topic_filter,
"opts": json!({
"qos": subscribe.qos.value()
}),
"opts": subscribe.opts.to_json(),
"time": now_time
});
Some((Some(subscribe.topic_filter.clone()), body))
Expand All @@ -539,9 +537,7 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username_ref(),
"topic": subscribe.topic_filter,
"opts": json!({
"qos": subscribe.qos.value()
}),
"opts": subscribe.opts.to_json(),
"time": now_time
});
Some((Some(subscribe.topic_filter.clone()), body))
Expand Down
Loading

0 comments on commit bb710db

Please sign in to comment.