Skip to content

Commit

Permalink
Optimize the codes
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Feb 29, 2024
1 parent 3b8e3ff commit da08d5e
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
10 changes: 5 additions & 5 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,7 @@ impl DefaultSharedSubscription {
impl SharedSubscription for &'static DefaultSharedSubscription {}

pub struct DefaultRetainStorage {
messages: RwLock<RetainTree<TimedValue<Retain>>>,
pub messages: RwLock<RetainTree<TimedValue<Retain>>>,
}

impl DefaultRetainStorage {
Expand All @@ -1157,7 +1157,7 @@ impl DefaultRetainStorage {
}

#[inline]
pub async fn remove_expired_messages(&self) {
pub async fn remove_expired_messages(&self) -> usize {
let mut messages = self.messages.write().await;
messages.retain(usize::MAX, |tv| {
if tv.is_expired() {
Expand All @@ -1166,7 +1166,7 @@ impl DefaultRetainStorage {
} else {
true
}
});
})
}

#[inline]
Expand Down Expand Up @@ -1226,12 +1226,12 @@ impl RetainStorage for &'static DefaultRetainStorage {
}

#[inline]
fn count(&self) -> isize {
async fn count(&self) -> isize {
Runtime::instance().stats.retaineds.count()
}

#[inline]
fn max(&self) -> isize {
async fn max(&self) -> isize {
Runtime::instance().stats.retaineds.max()
}
}
Expand Down
4 changes: 2 additions & 2 deletions rmqtt/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,10 @@ pub trait RetainStorage: Sync + Send {
async fn get(&self, topic_filter: &TopicFilter) -> Result<Vec<(TopicName, Retain)>>;

///
fn count(&self) -> isize;
async fn count(&self) -> isize;

///
fn max(&self) -> isize;
async fn max(&self) -> isize;
}

#[async_trait]
Expand Down
6 changes: 6 additions & 0 deletions rmqtt/src/broker/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ impl Stats {
self.message_storages.max_max(message_mgr.max().await);
}

{
let retain = Runtime::instance().extends.retain().await;
self.retaineds.current_set(retain.count().await);
self.retaineds.max_max(retain.max().await);
}

#[cfg(feature = "debug")]
let shared = Runtime::instance().extends.shared().await;

Expand Down
6 changes: 5 additions & 1 deletion rmqtt/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ impl Node {
.unwrap();
let runner = async {
if let Err(e) = Server::new().listen_and_serve().await {
log::error!("listen and serve failure, {:?}", e);
log::error!(
"listen and serve failure, {:?}, laddr: {:?}",
e,
Runtime::instance().settings.rpc.server_addr
);
}
};
rt.block_on(runner)
Expand Down

0 comments on commit da08d5e

Please sign in to comment.