Skip to content

Commit

Permalink
Add 'expiry_interval'
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Mar 29, 2024
1 parent 330fed9 commit 5439cc6
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 30 deletions.
9 changes: 4 additions & 5 deletions rmqtt-plugins/rmqtt-retainer/src/ram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rmqtt::{
};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

pub(crate) struct RamRetainer {
pub(crate) inner: &'static DefaultRetainStorage,
Expand All @@ -37,17 +38,15 @@ impl RamRetainer {
#[async_trait]
impl RetainStorage for &'static RamRetainer {
///topic - concrete topic
async fn set(&self, topic: &TopicName, retain: Retain) -> Result<()> {
async fn set(&self, topic: &TopicName, retain: Retain, expiry_interval: Option<Duration>) -> Result<()> {
if !self.retain_enable.load(Ordering::SeqCst) {
log::error!("{}", ERR_NOT_SUPPORTED);
return Ok(());
}

let (max_retained_messages, max_payload_size, expiry_interval) = {
let (max_retained_messages, max_payload_size) = {
let cfg = self.cfg.read().await;
let expiry_interval =
if cfg.expiry_interval.is_zero() { None } else { Some(cfg.expiry_interval) };
(cfg.max_retained_messages, *cfg.max_payload_size, expiry_interval)
(cfg.max_retained_messages, *cfg.max_payload_size)
};

if retain.publish.payload.len() > max_payload_size {
Expand Down
21 changes: 7 additions & 14 deletions rmqtt-plugins/rmqtt-retainer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,9 @@ pub struct RetainerInner {
impl RetainerInner {
#[inline]
async fn _batch_store(&self, msgs: Vec<Msg>) -> Result<()> {
let (max_retained_messages, max_payload_size, cfg_expiry_interval) = {
let (max_retained_messages, max_payload_size) = {
let cfg = self.cfg.read().await;
let expiry_interval = if cfg.expiry_interval.is_zero() {
None
} else {
Some(cfg.expiry_interval.as_millis() as i64)
};
(cfg.max_retained_messages as usize, *cfg.max_payload_size, expiry_interval)
(cfg.max_retained_messages as usize, *cfg.max_payload_size)
};

let mut count = 0;
Expand Down Expand Up @@ -205,11 +200,8 @@ impl RetainerInner {
}

//add retain messagge
let expiry_interval_millis = if let Some(expiry_interval) = expiry_interval {
Some(expiry_interval.as_millis() as i64)
} else {
cfg_expiry_interval
};
let expiry_interval_millis =
expiry_interval.map(|expiry_interval| expiry_interval.as_millis() as i64);

let expiry_time_at = expiry_interval_millis
.map(|expiry_interval_millis| timestamp_millis() + expiry_interval_millis);
Expand Down Expand Up @@ -378,15 +370,16 @@ impl RetainerInner {
#[async_trait]
impl RetainStorage for &'static Retainer {
///topic - concrete topic
async fn set(&self, topic: &TopicName, retain: Retain) -> Result<()> {
async fn set(&self, topic: &TopicName, retain: Retain, expiry_interval: Option<Duration>) -> Result<()> {
if !self.retain_enable.load(Ordering::SeqCst) {
log::error!("{}", ERR_NOT_SUPPORTED);
return Ok(());
}

let res = self
.msg_tx
.clone()
.send((topic.clone(), retain, None))
.send((topic.clone(), retain, expiry_interval))
.timeout(futures_time::time::Duration::from_millis(3500))
.await
.map_err(|e| anyhow!(e));
Expand Down
7 changes: 6 additions & 1 deletion rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,12 @@ impl DefaultRetainStorage {
#[async_trait]
impl RetainStorage for &'static DefaultRetainStorage {
#[inline]
async fn set(&self, _topic: &TopicName, _retain: Retain) -> Result<()> {
async fn set(
&self,
_topic: &TopicName,
_retain: Retain,
_expiry_interval: Option<Duration>,
) -> Result<()> {
log::warn!("Please use the \"rmqtt-retainer\" plugin as the main program no longer supports retain messages.");
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion rmqtt/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub trait RetainStorage: Sync + Send {
}

///topic - concrete topic
async fn set(&self, topic: &TopicName, retain: Retain) -> Result<()>;
async fn set(&self, topic: &TopicName, retain: Retain, expiry_interval: Option<Duration>) -> Result<()>;

///topic_filter - Topic filter
async fn get(&self, topic_filter: &TopicFilter) -> Result<Vec<(TopicName, Retain)>>;
Expand Down
28 changes: 19 additions & 9 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,14 @@ impl SessionState {
log::debug!("process_last_will, publish: {:?}", p);

let listen_cfg = self.listen_cfg();
let (message_storage_available, message_expiry_interval) =
if Runtime::instance().extends.message_mgr().await.enable() {
(true, Some(self.fitter.message_expiry_interval(&p)))

let message_storage_available = Runtime::instance().extends.message_mgr().await.enable();

let message_expiry_interval =
if message_storage_available || (listen_cfg.retain_available && p.retain()) {
Some(self.fitter.message_expiry_interval(&p))
} else {
(false, None)
None
};

Self::forwards(
Expand Down Expand Up @@ -1026,11 +1029,14 @@ impl SessionState {
}

let listen_cfg = self.listen_cfg();
let (message_storage_available, message_expiry_interval) =
if Runtime::instance().extends.message_mgr().await.enable() {
(true, Some(self.fitter.message_expiry_interval(&publish)))

let message_storage_available = Runtime::instance().extends.message_mgr().await.enable();

let message_expiry_interval =
if message_storage_available || (listen_cfg.retain_available && publish.retain()) {
Some(self.fitter.message_expiry_interval(&publish))
} else {
(false, None)
None
};

Self::forwards(
Expand Down Expand Up @@ -1065,7 +1071,11 @@ impl SessionState {
.extends
.retain()
.await
.set(publish.topic(), Retain { msg_id, from: from.clone(), publish: publish.clone() })
.set(
publish.topic(),
Retain { msg_id, from: from.clone(), publish: publish.clone() },
message_expiry_interval,
)
.await?;
}

Expand Down

0 comments on commit 5439cc6

Please sign in to comment.