Skip to content

Commit

Permalink
Count the number of subscriptions through containers
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 23, 2023
1 parent 6c0a46a commit 5854651
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 0 deletions.
5 changes: 5 additions & 0 deletions rmqtt-plugins/rmqtt-cluster-broadcast/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,11 @@ impl Shared for &'static ClusterShared {
replys
}

#[inline]
async fn subscriptions_count(&self) -> usize {
self.inner.subscriptions_count().await
}

#[inline]
fn get_grpc_clients(&self) -> GrpcClients {
self.grpc_clients.clone()
Expand Down
4 changes: 4 additions & 0 deletions rmqtt-plugins/rmqtt-cluster-raft/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ impl Shared for &'static ClusterShared {
self.inner.query_subscriptions(q).await
}

#[inline]
async fn subscriptions_count(&self) -> usize {
self.inner.subscriptions_count().await
}
#[inline]
fn get_grpc_clients(&self) -> GrpcClients {
self.grpc_clients.clone()
Expand Down
10 changes: 10 additions & 0 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,16 @@ impl Shared for &'static DefaultShared {
async fn query_subscriptions(&self, q: SubsSearchParams) -> Vec<SubsSearchResult> {
self._query_subscriptions(&q).await
}

#[inline]
async fn subscriptions_count(&self) -> usize {
futures::future::join_all(
self.peers.iter().map(|entry| async move { entry.s.subscriptions.len().await }),
)
.await
.iter()
.sum()
}
}

pub struct DefaultIter<'a> {
Expand Down
2 changes: 2 additions & 0 deletions rmqtt/src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ pub trait Shared: Sync + Send {
///
async fn query_subscriptions(&self, q: SubsSearchParams) -> Vec<SubsSearchResult>;

async fn subscriptions_count(&self) -> usize;

///This node is not included
#[inline]
fn get_grpc_clients(&self) -> GrpcClients {
Expand Down

0 comments on commit 5854651

Please sign in to comment.