Skip to content

Commit

Permalink
Improve MQTT V5 protocol: Topic Alias
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 22, 2023
1 parent 8402f1a commit 250b87e
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 30 deletions.
2 changes: 2 additions & 0 deletions rmqtt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ listener.tcp.external.await_rel_timeout = "5m"
listener.tcp.external.max_subscriptions = 0
#Shared subscription switch, default value: true
listener.tcp.external.shared_subscription = true
#topic alias maximum, default value: 0, topic aliases not enabled.
listener.tcp.external.max_topic_aliases = 16

##--------------------------------------------------------------------
## Internal TCP Listener for MQTT Protocol
Expand Down
25 changes: 21 additions & 4 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1186,11 +1186,12 @@ impl Fitter for DefaultFitter {
fn keep_alive(&self, keep_alive: &mut u16) -> Result<u16> {
if self.client.protocol() == MQTT_LEVEL_5 {
if *keep_alive == 0 {
if self.listen_cfg.allow_zero_keepalive {
*keep_alive = 10;
return if self.listen_cfg.allow_zero_keepalive {
//*keep_alive = 10;
Ok(0)
} else {
return Err(MqttError::from("Keepalive must be greater than 0"));
}
Err(MqttError::from("Keepalive must be greater than 0"))
};
} else if *keep_alive < self.listen_cfg.min_keepalive {
*keep_alive = self.listen_cfg.min_keepalive;
}
Expand Down Expand Up @@ -1280,6 +1281,22 @@ impl Fitter for DefaultFitter {
self.listen_cfg.max_packet_size.as_u32()
}
}

fn max_client_topic_aliases(&self) -> u16 {
if let ConnectInfo::V5(_, _connect) = &self.client.connect_info {
self.listen_cfg.max_topic_aliases
} else {
0
}
}

fn max_server_topic_aliases(&self) -> u16 {
if let ConnectInfo::V5(_, connect) = &self.client.connect_info {
connect.topic_alias_max.min(self.listen_cfg.max_topic_aliases)
} else {
0
}
}
}

struct HookEntry {
Expand Down
6 changes: 6 additions & 0 deletions rmqtt/src/broker/fitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,10 @@ pub trait Fitter: Sync + Send {

///max packet size
fn max_packet_size(&self) -> u32;

///client topic alias maximum
fn max_client_topic_aliases(&self) -> u16;

///server topic alias maximum
fn max_server_topic_aliases(&self) -> u16;
}
52 changes: 47 additions & 5 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub struct SessionState {
pub sink: Sink,
pub hook: Rc<dyn Hook>,
pub deliver_queue_tx: Option<MessageSender>,
pub server_topic_aliases: Option<Rc<ServerTopicAliases>>,
pub client_topic_aliases: Option<Rc<ClientTopicAliases>>,
}

impl fmt::Debug for SessionState {
Expand All @@ -51,8 +53,36 @@ impl fmt::Debug for SessionState {

impl SessionState {
#[inline]
pub(crate) fn new(session: Session, client: ClientInfo, sink: Sink, hook: Rc<dyn Hook>) -> Self {
Self { tx: None, session, client, sink, hook, deliver_queue_tx: None }
pub(crate) fn new(
session: Session,
client: ClientInfo,
sink: Sink,
hook: Rc<dyn Hook>,
server_topic_alias_max: u16,
client_topic_alias_max: u16,
) -> Self {
let server_topic_aliases = if server_topic_alias_max > 0 {
Some(Rc::new(ServerTopicAliases::new(server_topic_alias_max as usize)))
} else {
None
};
let client_topic_aliases = if client_topic_alias_max > 0 {
Some(Rc::new(ClientTopicAliases::new(client_topic_alias_max as usize)))
} else {
None
};
log::debug!("server_topic_aliases: {:?}", server_topic_aliases);
log::debug!("client_topic_aliases: {:?}", client_topic_aliases);
Self {
tx: None,
session,
client,
sink,
hook,
deliver_queue_tx: None,
server_topic_aliases,
client_topic_aliases,
}
}

#[inline]
Expand Down Expand Up @@ -470,7 +500,9 @@ impl SessionState {
let publish = self.hook.message_delivered(from.clone(), &publish).await.unwrap_or(publish);

//send message
self.sink.publish(&publish, expiry_check_res.message_expiry_interval())?; //@TODO ... at exception, send hook and or store message
self.sink
.publish(&publish, expiry_check_res.message_expiry_interval(), self.server_topic_aliases.as_ref())
.await?; //@TODO ... at exception, send hook and or store message

//cache messages to inflight window
let moment_status = match publish.qos() {
Expand Down Expand Up @@ -633,7 +665,7 @@ impl SessionState {

#[inline]
pub async fn publish_v3(&self, publish: &v3::Publish) -> Result<bool> {
match self.publish(Publish::try_from(publish)?).await {
match self.publish(Publish::from(publish)).await {
Err(e) => {
Metrics::instance().client_publish_error_inc();
self.client
Expand All @@ -651,7 +683,7 @@ impl SessionState {

#[inline]
pub async fn publish_v5(&self, publish: &v5::Publish) -> Result<bool> {
match self.publish(Publish::try_from(publish)?).await {
match self._publish_v5(publish).await {
Err(e) => {
Metrics::instance().client_publish_error_inc();
self.client
Expand All @@ -667,6 +699,16 @@ impl SessionState {
}
}

#[inline]
async fn _publish_v5(&self, publish: &v5::Publish) -> Result<bool> {
log::debug!("{:?} publish: {:?}", self.id, publish);
let mut p = Publish::from(publish);
if let Some(client_topic_aliases) = &self.client_topic_aliases {
p.topic = client_topic_aliases.set_and_get(p.properties.topic_alias, p.topic)?;
}
self.publish(p).await
}

#[inline]
async fn publish(&self, publish: Publish) -> Result<bool> {
let from = From::from_custom(self.id.clone());
Expand Down
139 changes: 121 additions & 18 deletions rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::fmt::Display;
use std::net::SocketAddr;
use std::num::{NonZeroU16, NonZeroU32};
use std::ops::Deref;
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -21,7 +22,7 @@ pub use ntex_mqtt::v3::{
MqttSink as MqttSinkV3,
};

use ntex_mqtt::v5::codec::RetainHandling;
use ntex_mqtt::v5::codec::{PublishAckReason, RetainHandling};
pub use ntex_mqtt::v5::{
self, codec::Connect as ConnectV5, codec::ConnectAckReason as ConnectAckReasonV5,
codec::Disconnect as DisconnectV5, codec::DisconnectReasonCode, codec::LastWill as LastWillV5,
Expand Down Expand Up @@ -962,10 +963,15 @@ impl Sink {
}

#[inline]
pub(crate) fn publish(&self, p: &Publish, message_expiry_interval: Option<NonZeroU32>) -> Result<()> {
pub(crate) async fn publish(
&self,
p: &Publish,
message_expiry_interval: Option<NonZeroU32>,
server_topic_aliases: Option<&Rc<ServerTopicAliases>>,
) -> Result<()> {
let pkt = match self {
Sink::V3(_) => p.into_v3(),
Sink::V5(_) => p.into_v5(message_expiry_interval),
Sink::V5(_) => p.into_v5(message_expiry_interval, server_topic_aliases).await,
};
self.send(pkt)
}
Expand Down Expand Up @@ -1116,11 +1122,9 @@ impl<'a> std::convert::TryFrom<LastWill<'a>> for Publish {
}
}

impl std::convert::TryFrom<&v3::Publish> for Publish {
type Error = MqttError;

impl std::convert::From<&v3::Publish> for Publish {
#[inline]
fn try_from(p: &v3::Publish) -> std::result::Result<Self, Self::Error> {
fn from(p: &v3::Publish) -> Self {
let query = p.query();
let p_props = if !query.is_empty() {
let user_props = url::form_urlencoded::parse(query.as_bytes())
Expand All @@ -1132,7 +1136,7 @@ impl std::convert::TryFrom<&v3::Publish> for Publish {
PublishProperties::default()
};

Ok(Self {
Self {
dup: p.dup(),
retain: p.retain(),
qos: p.qos(),
Expand All @@ -1142,16 +1146,14 @@ impl std::convert::TryFrom<&v3::Publish> for Publish {

properties: p_props,
create_time: chrono::Local::now().timestamp_millis(),
})
}
}
}

impl std::convert::TryFrom<&v5::Publish> for Publish {
type Error = MqttError;

impl std::convert::From<&v5::Publish> for Publish {
#[inline]
fn try_from(p: &v5::Publish) -> std::result::Result<Self, Self::Error> {
Ok(Self {
fn from(p: &v5::Publish) -> Self {
Self {
dup: p.dup(),
retain: p.retain(),
qos: p.qos(),
Expand All @@ -1161,7 +1163,7 @@ impl std::convert::TryFrom<&v5::Publish> for Publish {

properties: PublishProperties::from(p.packet().properties.clone()),
create_time: chrono::Local::now().timestamp_millis(),
})
}
}
}

Expand All @@ -1180,18 +1182,31 @@ impl Publish {
}

#[inline]
pub fn into_v5(&self, message_expiry_interval: Option<NonZeroU32>) -> Packet {
pub async fn into_v5(
&self,
message_expiry_interval: Option<NonZeroU32>,
server_topic_aliases: Option<&Rc<ServerTopicAliases>>,
) -> Packet {
let (topic, alias) = {
if let Some(server_topic_aliases) = server_topic_aliases {
server_topic_aliases.get(self.topic.clone()).await
} else {
(Some(self.topic.clone()), None)
}
};
log::debug!("topic: {:?}, alias: {:?}", topic, alias);
let mut p = v5::codec::Publish {
dup: self.dup,
retain: self.retain,
qos: self.qos,
topic: self.topic.clone(),
topic: topic.unwrap_or_default(),
packet_id: self.packet_id,
payload: self.payload.clone(),
properties: self.properties.clone().into(),
};
p.properties.message_expiry_interval = message_expiry_interval;
log::debug!("p.properties.subscription_ids: {:?}", p.properties.subscription_ids);
p.properties.topic_alias = alias;
log::debug!("p.properties: {:?}", p.properties);
Packet::V5(v5::codec::Packet::Publish(p))
}

Expand Down Expand Up @@ -1946,6 +1961,94 @@ impl Display for Reason {
}
}

#[derive(Debug)]
pub struct ServerTopicAliases {
max_topic_aliases: usize,
aliases: tokio::sync::Mutex<HashMap<TopicName, NonZeroU16>>,
}

impl ServerTopicAliases {
#[inline]
pub fn new(max_topic_aliases: usize) -> Self {
ServerTopicAliases { max_topic_aliases, aliases: tokio::sync::Mutex::new(HashMap::default()) }
}

#[inline]
pub async fn get(&self, topic: TopicName) -> (Option<TopicName>, Option<NonZeroU16>) {
if self.max_topic_aliases == 0 {
return (Some(topic), None);
}
let mut aliases = self.aliases.lock().await;
if let Some(alias) = aliases.get(&topic) {
return (None, Some(*alias));
}
let len = aliases.len();
if len >= self.max_topic_aliases {
return (Some(topic), None);
}
let alias = match NonZeroU16::try_from((len + 1) as u16) {
Ok(alias) => alias,
Err(_) => {
unreachable!()
}
};
aliases.insert(topic.clone(), alias);
(Some(topic), Some(alias))
}
}

#[derive(Debug)]
pub struct ClientTopicAliases {
max_topic_aliases: usize,
aliases: DashMap<NonZeroU16, TopicName>,
}

impl ClientTopicAliases {
#[inline]
pub fn new(max_topic_aliases: usize) -> Self {
ClientTopicAliases { max_topic_aliases, aliases: DashMap::default() }
}

#[inline]
pub fn set_and_get(&self, alias: Option<NonZeroU16>, topic: TopicName) -> Result<TopicName> {
match (alias, topic.len()) {
(Some(alias), 0) => {
self.aliases.get(&alias).ok_or_else(|| {
MqttError::PublishAckReason(
PublishAckReason::ImplementationSpecificError,
ByteString::from(
"implementation specific error, the ‘topic‘ associated with the ‘alias‘ was not found",
),
)
}).map(|topic|topic.value().clone())
}
(Some(alias), _) => {
let len = self.aliases.len();
self.aliases.entry(alias).and_modify(|topic_mut| {
*topic_mut = topic.clone()
}).or_try_insert_with(|| {
if len >= self.max_topic_aliases {
Err(MqttError::PublishAckReason(
PublishAckReason::ImplementationSpecificError,
ByteString::from(
format!("implementation specific error, the number of topic aliases exceeds the limit ({})", self.max_topic_aliases),
),
))
} else {
Ok(topic.clone())
}
})?;
Ok(topic)
}
(None, 0) => Err(MqttError::PublishAckReason(
PublishAckReason::ImplementationSpecificError,
ByteString::from("implementation specific error, ‘alias’ and ‘topic’ are both empty"),
)),
(None, _) => Ok(topic),
}
}
}

#[test]
fn test_reason() {
assert_eq!(Reason::ConnectKicked(false).is_kicked(false), true);
Expand Down
2 changes: 1 addition & 1 deletion rmqtt/src/broker/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ async fn _handshake<Io: 'static>(
hook.session_created().await;
}

let (state, tx) = SessionState::new(session, client, Sink::V3(sink), hook).start(keep_alive).await;
let (state, tx) = SessionState::new(session, client, Sink::V3(sink), hook, 0, 0).start(keep_alive).await;
if let Err(e) = entry.set(state.session.clone(), tx, state.client.clone()).await {
return Ok(refused_ack(
handshake,
Expand Down

0 comments on commit 250b87e

Please sign in to comment.