Skip to content

Commit

Permalink
Optimize code to reduce session memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 22, 2023
1 parent 4f18193 commit 6fff379
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 99 deletions.
2 changes: 1 addition & 1 deletion rmqtt-plugins/rmqtt-http-api/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn build_result(s: Option<Session>, c: Option<ClientInfo>) -> SearchResult
session_present: c.session_present,
expiry_interval,
created_at: s.created_at / 1000,
subscriptions_cnt: s.subscriptions.len(),
subscriptions_cnt: s.subscriptions.len().await,
max_subscriptions: s.listen_cfg.max_subscriptions,
extra_attrs,
last_will,
Expand Down
29 changes: 15 additions & 14 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl LockEntry {
async fn _unsubscribe(&self, id: Id, topic_filter: &str) -> Result<()> {
Runtime::instance().extends.router().await.remove(topic_filter, id).await?;
if let Some(s) = self.session() {
s.subscriptions.remove(topic_filter);
s.subscriptions.remove(topic_filter).await;
}
Ok(())
}
Expand All @@ -75,7 +75,8 @@ impl LockEntry {
{ self.shared.peers.remove_if(&self.id.client_id, |_, entry| &entry.c.id == with_id) }
{
if clear_subscriptions {
for topic_filter in peer.s.subscriptions.to_topic_filters() {
let topic_filters = peer.s.subscriptions.to_topic_filters().await;
for topic_filter in topic_filters {
if let Err(e) = self._unsubscribe(peer.c.id.clone(), &topic_filter).await {
log::warn!(
"{:?} remove._unsubscribe, topic_filter: {}, {:?}",
Expand All @@ -97,7 +98,8 @@ impl LockEntry {
pub async fn _remove(&mut self, clear_subscriptions: bool) -> Option<(Session, Tx, ClientInfo)> {
if let Some((_, peer)) = { self.shared.peers.remove(&self.id.client_id) } {
if clear_subscriptions {
for topic_filter in peer.s.subscriptions.to_topic_filters() {
let topic_filters = peer.s.subscriptions.to_topic_filters().await;
for topic_filter in topic_filters {
if let Err(e) = self._unsubscribe(peer.c.id.clone(), &topic_filter).await {
log::warn!(
"{:?} remove._unsubscribe, topic_filter: {}, {:?}",
Expand Down Expand Up @@ -268,7 +270,7 @@ impl super::Entry for LockEntry {
.await
.add(&sub.topic_filter, self.id(), sub.opts.clone())
.await?;
let prev_opts = peer.s.subscriptions.add(sub.topic_filter.clone(), sub.opts.clone());
let prev_opts = peer.s.subscriptions.add(sub.topic_filter.clone(), sub.opts.clone()).await;
Ok(SubscribeReturn::new_success(sub.opts.qos(), prev_opts))
}

Expand All @@ -286,7 +288,7 @@ impl super::Entry for LockEntry {
{
log::warn!("{:?} unsubscribe, error:{:?}", self.id, e);
}
let remove_ok = peer.s.subscriptions.remove(&unsubscribe.topic_filter).is_some();
let remove_ok = peer.s.subscriptions.remove(&unsubscribe.topic_filter).await.is_some();
Ok(remove_ok)
}

Expand All @@ -312,16 +314,15 @@ impl super::Entry for LockEntry {
if let Some(s) = self.session() {
let subs = s
.subscriptions
.read()
.await
.iter()
.map(|entry| {
let (topic_filter, opts) = entry.pair();
SubsSearchResult {
node_id: self.id.node_id,
clientid: self.id.client_id.clone(),
client_addr: self.id.remote_addr,
topic: TopicFilter::from(topic_filter.as_ref()),
opts: opts.clone(),
}
.map(|(topic_filter, opts)| SubsSearchResult {
node_id: self.id.node_id,
clientid: self.id.client_id.clone(),
client_addr: self.id.remote_addr,
topic: TopicFilter::from(topic_filter.as_ref()),
opts: opts.clone(),
})
.collect::<Vec<_>>();
Some(subs)
Expand Down
19 changes: 9 additions & 10 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ impl SessionState {
let session_expiry_delay = tokio::time::sleep(session_expiry_interval);
tokio::pin!(session_expiry_delay);

let will_delay_interval_delay =
tokio::time::sleep(will_delay_interval.unwrap_or(Duration::MAX));
let will_delay_interval_delay = tokio::time::sleep(will_delay_interval.unwrap_or(Duration::MAX));
tokio::pin!(will_delay_interval_delay);

loop {
Expand Down Expand Up @@ -632,7 +631,7 @@ impl SessionState {
#[inline]
async fn _subscribe(&self, mut sub: Subscribe) -> Result<SubscribeReturn> {
if self.listen_cfg.max_subscriptions > 0
&& (self.subscriptions.len() >= self.listen_cfg.max_subscriptions)
&& (self.subscriptions.len().await >= self.listen_cfg.max_subscriptions)
{
return Err(MqttError::TooManySubscriptions);
}
Expand Down Expand Up @@ -755,7 +754,7 @@ impl SessionState {
log::debug!("{:?} publish: {:?}", self.id, publish);
let mut p = Publish::from(publish);
if let Some(client_topic_aliases) = &self.client_topic_aliases {
p.topic = client_topic_aliases.set_and_get(p.properties.topic_alias, p.topic)?;
p.topic = client_topic_aliases.set_and_get(p.properties.topic_alias, p.topic).await?;
}
self.publish(p).await
}
Expand Down Expand Up @@ -900,7 +899,7 @@ impl SessionState {

//Subscription transfer from previous session
if !clear_subscriptions {
self.subscriptions.extend(offline_info.subscriptions);
self.subscriptions.extend(offline_info.subscriptions).await;
}

//Send previous session unacked messages
Expand Down Expand Up @@ -1005,7 +1004,7 @@ impl Session {
#[inline]
pub async fn to_offline_info(&self) -> SessionOfflineInfo {
let id = self.id.clone();
let subscriptions = self.subscriptions.drain();
let subscriptions = self.subscriptions.drain().await;
let mut offline_messages = Vec::new();
while let Some(item) = self.deliver_queue.pop() {
//@TODO ..., check message expired
Expand Down Expand Up @@ -1056,21 +1055,21 @@ pub struct _SessionInner {
impl Drop for _SessionInner {
fn drop(&mut self) {
Runtime::instance().stats.sessions.dec();
self.subscriptions.clear();
}
}

impl _SessionInner {
#[inline]
pub async fn to_json(&self) -> serde_json::Value {
let count = self.subscriptions.len();
let count = self.subscriptions.len().await;

let subs = self
.subscriptions
.read()
.await
.iter()
.enumerate()
.filter_map(|(i, entry)| {
let (tf, opts) = entry.pair();
.filter_map(|(i, (tf, opts))| {
if i < 100 {
Some(json!({
"topic_filter": tf.to_string(),
Expand Down
Loading

0 comments on commit 6fff379

Please sign in to comment.