Skip to content

Commit

Permalink
"retain message" is only supported through the "rmqtt-retainer" plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Feb 23, 2024
1 parent a0703b3 commit c3418eb
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rmqtt-plugins/rmqtt-retainer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl Handler for RetainHandler {
}
match msg {
Message::GetRetains(topic_filter) => {
let new_acc = match self.retainer.inner().get(topic_filter).await {
let new_acc = match self.retainer.inner().get_message(topic_filter).await {
Ok(retains) => {
HookResult::GrpcMessageReply(Ok(MessageReply::GetRetains(retains)))
}
Expand Down
6 changes: 3 additions & 3 deletions rmqtt-plugins/rmqtt-retainer/src/retainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ impl Retainer {
}

#[inline]
pub(crate) fn inner(&self) -> Box<dyn RetainStorage> {
Box::new(self.inner)
pub(crate) fn inner(&self) -> &'static DefaultRetainStorage {
self.inner
}

#[inline]
Expand Down Expand Up @@ -70,7 +70,7 @@ impl RetainStorage for &'static Retainer {

///topic_filter - Topic filter
async fn get(&self, topic_filter: &TopicFilter) -> Result<Vec<(TopicName, Retain)>> {
let mut retains = self.inner.get(topic_filter).await?;
let mut retains = self.inner.get_message(topic_filter).await?;
let grpc_clients = Runtime::instance().extends.shared().await.get_grpc_clients();
if grpc_clients.is_empty() {
return Ok(retains);
Expand Down
25 changes: 16 additions & 9 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,17 +1189,9 @@ impl DefaultRetainStorage {
}
Ok(())
}
}

#[async_trait]
impl RetainStorage for &'static DefaultRetainStorage {
#[inline]
async fn set(&self, topic: &TopicName, retain: Retain) -> Result<()> {
self.set_with_timeout(topic, retain, None).await
}

#[inline]
async fn get(&self, topic_filter: &TopicFilter) -> Result<Vec<(TopicName, Retain)>> {
pub async fn get_message(&self, topic_filter: &TopicFilter) -> Result<Vec<(TopicName, Retain)>> {
let topic = Topic::from_str(topic_filter)?;
let retains = self
.messages
Expand All @@ -1217,6 +1209,21 @@ impl RetainStorage for &'static DefaultRetainStorage {
.collect::<Vec<(TopicName, Retain)>>();
Ok(retains)
}
}

#[async_trait]
impl RetainStorage for &'static DefaultRetainStorage {
#[inline]
async fn set(&self, _topic: &TopicName, _retain: Retain) -> Result<()> {
log::warn!("Please use the \"rmqtt-retainer\" plugin as the main program no longer supports retain messages.");
Ok(())
}

#[inline]
async fn get(&self, _topic_filter: &TopicFilter) -> Result<Vec<(TopicName, Retain)>> {
log::warn!("Please use the \"rmqtt-retainer\" plugin as the main program no longer supports retain messages.");
Ok(Vec::new())
}

#[inline]
fn count(&self) -> isize {
Expand Down

0 comments on commit c3418eb

Please sign in to comment.