Skip to content

Commit

Permalink
Improve MQTT V5 protocol: Subscription Identifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 21, 2023
1 parent 5c1a72f commit 27b76db
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 80 deletions.
3 changes: 1 addition & 2 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use rmqtt::{async_trait::async_trait, log, SubscriptionSize};
use rmqtt::{
broker::{
hook::{Handler, HookResult, Parameter, ReturnType},
types::{From, Publish},
SubRelationsMap,
types::{From, Publish, SubRelationsMap},
},
grpc::{Message, MessageReply},
Id, Runtime,
Expand Down
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use rmqtt::{async_trait::async_trait, itertools, log, once_cell, serde_json};
use rmqtt::{
broker::{
default::DefaultRouter,
types::{Id, NodeId, Route, SubscriptionOptions, TopicName},
Router, SubRelationsMap,
types::{Id, NodeId, Route, SubRelationsMap, SubscriptionOptions, TopicName},
Router,
},
grpc::{GrpcClients, Message, MessageBroadcaster, MessageReply, MessageSender, MessageType},
stats::Counter,
Expand Down
93 changes: 60 additions & 33 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use rmqtt::{
session::{ClientInfo, Session, SessionOfflineInfo},
types::{
ClientId, From, Id, IsAdmin, IsOnline, NodeId, Publish, Reason, SessionStatus, SharedGroup,
SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn, SubscriptionSize, To,
TopicFilter, Tx, Unsubscribe,
SubRelations, SubRelationsMap, SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn,
SubscriptionIdentifier, SubscriptionSize, To, TopicFilter, Tx, Unsubscribe,
},
Entry, Shared, SubRelations, SubRelationsMap,
Entry, Shared,
},
grpc::{GrpcClients, Message, MessageBroadcaster, MessageReply, MessageType},
MqttError, Result, Runtime,
Expand Down Expand Up @@ -234,29 +234,34 @@ impl Shared for &'static ClusterShared {
log::debug!("forwards, from: {:?}, topic: {:?}", from, topic.to_string());

//Matching subscriptions
let (relations, shared_relations, subs_size) =
match Runtime::instance().extends.router().await.matches(from.id.clone(), topic).await {
Ok(mut relations_map) => {
let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();
let mut relations = SubRelations::new();
let mut shared_relations = Vec::new();
for (node_id, rels) in relations_map.drain() {
for (topic_filter, client_id, qos, group) in rels {
if let Some(group) = group {
//pub type SharedSubRelations = HashMap<TopicFilterString, Vec<(SharedGroup, NodeId, ClientId, QoS, IsOnline)>>;
shared_relations.push((topic_filter, node_id, client_id, qos, group));
} else {
relations.push((topic_filter, client_id, qos, None));
}
let (relations, shared_relations, subs_size) = match Runtime::instance()
.extends
.router()
.await
.matches(from.id.clone(), topic)
.await
{
Ok(mut relations_map) => {
let subs_size: SubscriptionSize = relations_map.iter().map(|(_, subs)| subs.len()).sum();
let mut relations = SubRelations::new();
let mut shared_relations = Vec::new();
for (node_id, rels) in relations_map.drain() {
for (topic_filter, client_id, opts, sub_ids, group) in rels {
if let Some(group) = group {
//pub type SharedSubRelations = HashMap<TopicFilterString, Vec<(SharedGroup, NodeId, ClientId, QoS, IsOnline)>>;
shared_relations.push((topic_filter, node_id, client_id, opts, sub_ids, group));
} else {
relations.push((topic_filter, client_id, opts, sub_ids, None));
}
}
(relations, shared_relations, subs_size)
}
Err(e) => {
log::warn!("forwards, from:{:?}, topic:{:?}, error: {:?}", from, topic, e);
(Vec::new(), Vec::new(), 0)
}
};
(relations, shared_relations, subs_size)
}
Err(e) => {
log::warn!("forwards, from:{:?}, topic:{:?}, error: {:?}", from, topic, e);
(Vec::new(), Vec::new(), 0)
}
};

log::debug!("relations: {}, shared_relations:{}", relations.len(), shared_relations.len());

Expand All @@ -281,22 +286,43 @@ impl Shared for &'static ClusterShared {

type SharedSubGroups = HashMap<
TopicFilter, //key is TopicFilter
HashMap<SharedGroup, Vec<(NodeId, ClientId, SubscriptionOptions, Option<IsOnline>)>>,
HashMap<
SharedGroup,
Vec<(
NodeId,
ClientId,
SubscriptionOptions,
Option<Vec<SubscriptionIdentifier>>,
Option<IsOnline>,
)>,
>,
>;
type SharedRelation =
(TopicFilter, NodeId, ClientId, SubscriptionOptions, (SharedGroup, IsOnline));
type SharedRelation = (
TopicFilter,
NodeId,
ClientId,
SubscriptionOptions,
Option<Vec<SubscriptionIdentifier>>,
(SharedGroup, IsOnline),
);

#[allow(clippy::mutable_key_type)]
let mut shared_sub_groups: SharedSubGroups = HashMap::default();

let add_one_to_shared_sub_groups =
|shared_groups: &mut SharedSubGroups, shared_rel: SharedRelation| {
let (topic_filter, node_id, client_id, opts, (group, is_online)) = shared_rel;
let (topic_filter, node_id, client_id, opts, sub_ids, (group, is_online)) = shared_rel;
if let Some(groups) = shared_groups.get_mut(&topic_filter) {
groups.entry(group).or_default().push((node_id, client_id, opts, Some(is_online)));
groups.entry(group).or_default().push((
node_id,
client_id,
opts,
sub_ids,
Some(is_online),
));
} else {
let mut groups = HashMap::default();
groups.insert(group, vec![(node_id, client_id, opts, Some(is_online))]);
groups.insert(group, vec![(node_id, client_id, opts, sub_ids, Some(is_online))]);
shared_groups.insert(topic_filter, groups);
}
};
Expand All @@ -321,11 +347,11 @@ impl Shared for &'static ClusterShared {
);
all_subs_size += subs_size;
for (node_id, rels) in o_relations_map.drain() {
for (topic_filter, client_id, opts, group) in rels {
for (topic_filter, client_id, opts, sub_ids, group) in rels {
if let Some(group) = group {
add_one_to_shared_sub_groups(
&mut shared_sub_groups,
(topic_filter, node_id, client_id, opts, group),
(topic_filter, node_id, client_id, opts, sub_ids, group),
);
}
}
Expand All @@ -351,11 +377,12 @@ impl Shared for &'static ClusterShared {
if let Some((idx, _is_online)) =
Runtime::instance().extends.shared_subscription().await.choice(subs).await
{
let (node_id, client_id, qos, _is_online) = subs.remove(idx);
let (node_id, client_id, opts, sub_ids, _is_online) = subs.remove(idx);
node_shared_subs.entry(node_id).or_default().push((
topic_filter.clone(),
client_id,
qos,
opts,
sub_ids,
None,
));
}
Expand Down
6 changes: 3 additions & 3 deletions rmqtt-plugins/rmqtt-cluster-raft/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use rmqtt::{
default::DefaultRouter,
topic::TopicTree,
types::{
ClientId, Id, IsOnline, NodeId, Route, SubscriptionOptions, TimestampMillis, TopicFilter,
TopicName,
ClientId, Id, IsOnline, NodeId, Route, SubRelationsMap, SubscriptionOptions, TimestampMillis,
TopicFilter, TopicName,
},
Router, SubRelationsMap,
Router,
},
stats::Counter,
MqttError, Result,
Expand Down
7 changes: 4 additions & 3 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ use rmqtt::{
default::DefaultShared,
session::{ClientInfo, Session, SessionOfflineInfo},
types::{
From, Id, IsAdmin, NodeId, NodeName, Publish, Reason, SessionStatus, SubsSearchParams,
SubsSearchResult, Subscribe, SubscribeReturn, SubscriptionSize, To, Tx, Unsubscribe,
From, Id, IsAdmin, NodeId, NodeName, Publish, Reason, SessionStatus, SubRelations,
SubRelationsMap, SubsSearchParams, SubsSearchResult, Subscribe, SubscribeReturn,
SubscriptionSize, To, Tx, Unsubscribe,
},
Entry, Shared, SubRelations, SubRelationsMap,
Entry, Shared,
},
grpc::{Message, MessageReply, MessageType},
MqttError, Result, Runtime,
Expand Down
47 changes: 31 additions & 16 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,16 +428,17 @@ impl Shared for &'static DefaultShared {
let mut relations = SubRelations::new();
let mut sub_relations_map = SubRelationsMap::default();
for (node_id, rels) in relations_map {
for (topic_filter, client_id, qos, group) in rels {
for (topic_filter, client_id, opts, sub_ids, group) in rels {
if let Some(group) = group {
sub_relations_map.entry(node_id).or_default().push((
topic_filter,
client_id,
qos,
opts,
sub_ids,
Some(group),
));
} else {
relations.push((topic_filter, client_id, qos, None));
relations.push((topic_filter, client_id, opts, sub_ids, None));
}
}
}
Expand All @@ -457,7 +458,7 @@ impl Shared for &'static DefaultShared {
) -> Result<(), Vec<(To, From, Publish, Reason)>> {
let mut errs = Vec::new();

for (topic_filter, client_id, opts, _) in relations.drain(..) {
for (topic_filter, client_id, opts, sub_ids, _) in relations.drain(..) {
let retain = if let Some(retain_as_published) = opts.retain_as_published() {
//MQTT V5: Retain As Publish
if retain_as_published {
Expand All @@ -468,11 +469,13 @@ impl Shared for &'static DefaultShared {
} else {
false
};

let mut p = publish.clone();
p.dup = false;
p.retain = retain;
p.qos = p.qos.less_value(opts.qos());
p.packet_id = None;
p.properties.subscription_ids = sub_ids;
let (tx, to) = if let Some((tx, to)) = self.tx(&client_id) {
(tx, to)
} else {
Expand Down Expand Up @@ -615,14 +618,20 @@ impl DefaultRouter {
#[allow(clippy::type_complexity)]
#[inline]
pub async fn _matches(&self, this_id: Id, topic_name: &TopicName) -> Result<SubRelationsMap> {
let mut subs: SubRelationsMap = HashMap::default();
let mut collector_map: SubscriptioRelationsCollectorMap = HashMap::default();
let topic = Topic::from_str(topic_name)?;
for (topic_filter, _node_ids) in self.topics.read().await.matches(&topic).iter() {
let topic_filter = topic_filter.to_topic_filter();

let mut groups: HashMap<
SharedGroup,
Vec<(NodeId, ClientId, SubscriptionOptions, Option<IsOnline>)>,
Vec<(
NodeId,
ClientId,
SubscriptionOptions,
Option<Vec<SubscriptionIdentifier>>,
Option<IsOnline>,
)>,
> = HashMap::default();

if let Some(rels) = self.relations.get(&topic_filter) {
Expand All @@ -639,15 +648,16 @@ impl DefaultRouter {
id.node_id,
client_id.clone(),
opts.clone(),
None,
Some(router.is_online(id.node_id, client_id).await),
));
} else {
subs.entry(id.node_id).or_default().push((
topic_filter.clone(),
collector_map.entry(id.node_id).or_default().add(
&topic_filter,
client_id.clone(),
opts.clone(),
None,
))
);
}
}
}
Expand All @@ -658,19 +668,24 @@ impl DefaultRouter {
if let Some((idx, is_online)) =
Runtime::instance().extends.shared_subscription().await.choice(&s_subs).await
{
let (node_id, client_id, opts, _) = s_subs.remove(idx);
subs.entry(node_id).or_default().push((
topic_filter.clone(),
client_id.clone(),
let (node_id, client_id, opts, _, _) = s_subs.remove(idx);
collector_map.entry(node_id).or_default().add(
&topic_filter,
client_id,
opts,
Some((group, is_online)),
))
);
}
}
}

log::debug!("{:?} this_subs: {:?}", topic_name, subs);
Ok(subs)
let mut rels_map: SubRelationsMap = HashMap::default();
for (node_id, collector) in collector_map {
rels_map.insert(node_id, collector.into());
}

log::debug!("{:?} this_subs: {:?}", topic_name, rels_map);
Ok(rels_map)
}

#[inline]
Expand Down
21 changes: 9 additions & 12 deletions rmqtt/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::broker::types::*;
use crate::grpc::GrpcClients;
use crate::settings::listener::Listener;
use crate::stats::Counter;
use crate::{ClientId, Id, NodeId, QoS, Result, Runtime, TopicFilter};
use crate::{ClientId, Id, NodeId, Result, Runtime, TopicFilter};

type HashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;

Expand Down Expand Up @@ -118,15 +118,6 @@ pub trait Shared: Sync + Send {
}
}

//key is TopicFilter
pub type SharedSubRelations = HashMap<TopicFilter, Vec<(SharedGroup, NodeId, ClientId, QoS, IsOnline)>>;
//In other nodes
pub type OtherSubRelations = HashMap<NodeId, Vec<TopicFilter>>;

pub type SubRelations = Vec<(TopicFilter, ClientId, SubscriptionOptions, Option<(SharedGroup, IsOnline)>)>;
pub type SubRelationsMap = HashMap<NodeId, SubRelations>;
pub type ClearSubscriptions = bool;

#[async_trait]
pub trait Router: Sync + Send {
///
Expand Down Expand Up @@ -189,7 +180,13 @@ pub trait SharedSubscription: Sync + Send {
#[inline]
async fn choice(
&self,
ncs: &[(NodeId, ClientId, SubscriptionOptions, Option<IsOnline>)],
ncs: &[(
NodeId,
ClientId,
SubscriptionOptions,
Option<Vec<SubscriptionIdentifier>>,
Option<IsOnline>,
)],
) -> Option<(usize, IsOnline)> {
if ncs.is_empty() {
return None;
Expand All @@ -198,7 +195,7 @@ pub trait SharedSubscription: Sync + Send {
let mut tmp_ncs = ncs
.iter()
.enumerate()
.map(|(idx, (node_id, client_id, _, is_online))| (idx, node_id, client_id, is_online))
.map(|(idx, (node_id, client_id, _, _, is_online))| (idx, node_id, client_id, is_online))
.collect::<Vec<_>>();

while !tmp_ncs.is_empty() {
Expand Down

0 comments on commit 27b76db

Please sign in to comment.