Skip to content

Commit

Permalink
Improve MQTT V5 protocol: Message Expiry Interval
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 19, 2023
1 parent 59f2766 commit 23132f4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 42 deletions.
54 changes: 29 additions & 25 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,27 +1164,22 @@ impl Fitter for DefaultFitter {
} else {
return Err(MqttError::from("Keepalive must be greater than 0"));
}
} else {
if *keep_alive < self.listen_cfg.min_keepalive {
*keep_alive = self.listen_cfg.min_keepalive;
}
} else if *keep_alive < self.listen_cfg.min_keepalive {
*keep_alive = self.listen_cfg.min_keepalive;
}
} else {
if *keep_alive == 0 {
return if self.listen_cfg.allow_zero_keepalive {
Ok(0)
} else {
Err(MqttError::from("Keepalive must be greater than 0"))
};
} else if *keep_alive == 0 {
return if self.listen_cfg.allow_zero_keepalive {
Ok(0)
} else {
if *keep_alive < self.listen_cfg.min_keepalive {
return Err(MqttError::from(format!(
"Keepalive is too small, cannot be less than {}",
self.listen_cfg.min_keepalive
)));
}
}
Err(MqttError::from("Keepalive must be greater than 0"))
};
} else if *keep_alive < self.listen_cfg.min_keepalive {
return Err(MqttError::from(format!(
"Keepalive is too small, cannot be less than {}",
self.listen_cfg.min_keepalive
)));
}

if *keep_alive < 6 {
Ok(*keep_alive + 3)
} else {
Expand Down Expand Up @@ -1667,23 +1662,32 @@ impl Hook for DefaultHook {
}

#[inline]
async fn message_expiry_check(&self, from: From, publish: &Publish) -> MessageExpiry {
async fn message_expiry_check(&self, from: From, publish: &Publish) -> MessageExpiryCheckResult {
log::debug!("{:?} publish: {:?}", self.s.id, publish);
let result = self
.manager
.exec(Type::MessageExpiryCheck, Parameter::MessageExpiryCheck(&self.s, &self.c, from, publish))
.await;
log::debug!("{:?} result: {:?}", self.s.id, result);
if let Some(HookResult::MessageExpiry) = result {
return true;
return MessageExpiryCheckResult::Expiry;
}

let expiry_interval = self.s.listen_cfg.message_expiry_interval.as_millis() as i64;
let expiry_interval = publish
.properties
.message_expiry_interval
.map(|i| (i.get() * 1000) as i64)
.unwrap_or_else(|| self.s.listen_cfg.message_expiry_interval.as_millis() as i64);
log::debug!("{:?} expiry_interval: {:?}", self.s.id, expiry_interval);
if expiry_interval == 0 {
return false;
return MessageExpiryCheckResult::Remaining(None);
}
if (chrono::Local::now().timestamp_millis() - publish.create_time()) < expiry_interval {
return false;
let remaining = chrono::Local::now().timestamp_millis() - publish.create_time();
if remaining < expiry_interval {
return MessageExpiryCheckResult::Remaining(NonZeroU32::new(
((expiry_interval - remaining) / 1000) as u32,
));
}
true
MessageExpiryCheckResult::Expiry
}
}
2 changes: 1 addition & 1 deletion rmqtt/src/broker/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub trait Hook: Sync + Send {
async fn message_acked(&self, from: From, publish: &Publish);

///message expiry check
async fn message_expiry_check(&self, from: From, publish: &Publish) -> MessageExpiry;
async fn message_expiry_check(&self, from: From, publish: &Publish) -> MessageExpiryCheckResult;
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Deserialize, Serialize)]
Expand Down
12 changes: 6 additions & 6 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,8 @@ impl SessionState {
#[inline]
pub async fn deliver(&self, from: From, mut publish: Publish) -> Result<()> {
//hook, message_expiry_check
let expiry = self.hook.message_expiry_check(from.clone(), &publish).await;

if expiry {
let expiry_check_res = self.hook.message_expiry_check(from.clone(), &publish).await;
if expiry_check_res.is_expiry() {
Runtime::instance()
.extends
.hook_mgr()
Expand All @@ -470,7 +469,7 @@ impl SessionState {
let publish = self.hook.message_delivered(from.clone(), &publish).await.unwrap_or(publish);

//send message
self.sink.publish(publish.clone())?; //@TODO ... at exception, send hook and or store message
self.sink.publish(&publish, expiry_check_res.message_expiry_interval())?; //@TODO ... at exception, send hook and or store message

//cache messages to inflight window
let moment_status = match publish.qos() {
Expand All @@ -497,9 +496,10 @@ impl SessionState {
self.forward(iflt_msg.from, iflt_msg.publish).await;
}
MomentStatus::UnComplete => {
let expiry = self.hook.message_expiry_check(iflt_msg.from.clone(), &iflt_msg.publish).await;
let expiry_check_res =
self.hook.message_expiry_check(iflt_msg.from.clone(), &iflt_msg.publish).await;

if expiry {
if expiry_check_res.is_expiry() {
log::warn!(
"{:?} MQTT::PublishComplete is not received, from: {:?}, message: {:?}",
self.id,
Expand Down
45 changes: 35 additions & 10 deletions rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,28 @@ pub enum AuthResult {
NotAuthorized,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MessageExpiryCheckResult {
Expiry,
Remaining(Option<NonZeroU32>),
}

impl MessageExpiryCheckResult {
#[inline]
pub fn is_expiry(&self) -> bool {
matches!(self, Self::Expiry)
}

#[inline]
pub fn message_expiry_interval(&self) -> Option<NonZeroU32> {
match self {
Self::Expiry => None,
Self::Remaining(i) => *i,
}
}
}

#[inline]
pub fn parse_topic_filter(
topic_filter: &ByteString,
shared_subscription_supported: bool,
Expand Down Expand Up @@ -321,11 +343,13 @@ pub struct Subscribe {
}

impl Subscribe {
#[inline]
pub fn from_v3(topic_filter: &ByteString, qos: QoS, shared_subscription_supported: bool) -> Result<Self> {
let (topic_filter, shared_group) = parse_topic_filter(topic_filter, shared_subscription_supported)?;
Ok(Subscribe { topic_filter, qos, shared_group })
}

#[inline]
pub fn from_v5(
topic_filter: &ByteString,
opt: &SubscriptionOptions,
Expand Down Expand Up @@ -616,10 +640,10 @@ impl Sink {
}

#[inline]
pub(crate) fn publish(&self, p: Publish) -> Result<()> {
pub(crate) fn publish(&self, p: &Publish, message_expiry_interval: Option<NonZeroU32>) -> Result<()> {
let pkt = match self {
Sink::V3(_) => p.into_v3(),
Sink::V5(_) => p.into_v5(),
Sink::V5(_) => p.into_v5(message_expiry_interval),
};
self.send(pkt)
}
Expand Down Expand Up @@ -821,29 +845,30 @@ impl std::convert::TryFrom<&v5::Publish> for Publish {

impl Publish {
#[inline]
pub fn into_v3(self) -> Packet {
pub fn into_v3(&self) -> Packet {
let p = v3::codec::Publish {
dup: self.dup,
retain: self.retain,
qos: self.qos,
topic: self.topic,
topic: self.topic.clone(),
packet_id: self.packet_id,
payload: self.payload,
payload: self.payload.clone(),
};
Packet::V3(v3::codec::Packet::Publish(p))
}

#[inline]
pub fn into_v5(self) -> Packet {
let p = v5::codec::Publish {
pub fn into_v5(&self, message_expiry_interval: Option<NonZeroU32>) -> Packet {
let mut p = v5::codec::Publish {
dup: self.dup,
retain: self.retain,
qos: self.qos,
topic: self.topic,
topic: self.topic.clone(),
packet_id: self.packet_id,
payload: self.payload,
properties: self.properties.into(),
payload: self.payload.clone(),
properties: self.properties.clone().into(),
};
p.properties.message_expiry_interval = message_expiry_interval;
Packet::V5(v5::codec::Packet::Publish(p))
}

Expand Down

0 comments on commit 23132f4

Please sign in to comment.