Skip to content

Commit

Permalink
Add debug_session_channels
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 23, 2023
1 parent 19beb35 commit 1ad046d
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
3 changes: 3 additions & 0 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl SessionState {
pub(crate) async fn start(mut self, keep_alive: u16) -> (Self, Tx) {
log::debug!("{:?} start online event loop", self.id);
let (msg_tx, mut msg_rx) = futures::channel::mpsc::unbounded();
let msg_tx = SessionTx::new(msg_tx);
self.tx.replace(msg_tx.clone());
let mut state = self.clone();

Expand Down Expand Up @@ -151,6 +152,8 @@ impl SessionState {
msg = msg_rx.next() => {
log::debug!("{:?} recv msg: {:?}", state.id, msg);
if let Some(msg) = msg{
#[cfg(feature = "debug")]
Runtime::instance().stats.debug_session_channels.dec();
match msg{
Message::Forward(from, p) => {
if let Err((from, p)) = deliver_queue_tx.send((from, p)).await{
Expand Down
13 changes: 11 additions & 2 deletions rmqtt/src/broker/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ pub struct Stats {
debug_shared_peers: Counter,
#[cfg(feature = "debug")]
debug_subscriptions: usize,
#[cfg(feature = "debug")]
pub debug_session_channels: Counter,
}

impl Stats {
Expand Down Expand Up @@ -170,6 +172,8 @@ impl Stats {
debug_shared_peers: Counter::new(),
#[cfg(feature = "debug")]
debug_subscriptions: 0,
#[cfg(feature = "debug")]
debug_session_channels: Counter::new(),
})
}

Expand Down Expand Up @@ -232,6 +236,8 @@ impl Stats {
debug_shared_peers: self.debug_shared_peers.clone(),
#[cfg(feature = "debug")]
debug_subscriptions,
#[cfg(feature = "debug")]
debug_session_channels: self.debug_session_channels.clone(),
}
}

Expand All @@ -258,6 +264,7 @@ impl Stats {
self.debug_topics_tree_map.extend(other.debug_topics_tree_map);
self.debug_shared_peers.add(&other.debug_shared_peers);
self.debug_subscriptions += other.debug_subscriptions;
self.debug_session_channels.add(&other.debug_session_channels);
}
}

Expand Down Expand Up @@ -303,8 +310,10 @@ impl Stats {
if let Some(obj) = json_val.as_object_mut() {
obj.insert("debug_client_states_map".into(), json!(self.debug_client_states_map));
obj.insert("debug_topics_tree_map".into(), json!(self.debug_topics_tree_map));
obj.insert("debug_shared_peers".into(), json!(self.debug_shared_peers.count()));
obj.insert("debug_subscriptions".into(), json!(self.debug_subscriptions));
obj.insert("debug_shared_peers.count".into(), json!(self.debug_shared_peers.count()));
obj.insert("debug_subscriptions.count".into(), json!(self.debug_subscriptions));
obj.insert("debug_session_channels.count".into(), json!(self.debug_session_channels.count()));
obj.insert("debug_session_channels.max".into(), json!(self.debug_session_channels.max()));
}
}

Expand Down
33 changes: 32 additions & 1 deletion rmqtt/src/broker/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub type IsAdmin = bool;
pub type LimiterName = u16;
pub type CleanStart = bool;

pub type Tx = futures::channel::mpsc::UnboundedSender<Message>;
pub type Tx = SessionTx; //futures::channel::mpsc::UnboundedSender<Message>;
pub type Rx = futures::channel::mpsc::UnboundedReceiver<Message>;

pub type DashSet<V> = dashmap::DashSet<V, ahash::RandomState>;
Expand All @@ -83,6 +83,37 @@ pub type HookUnsubscribeResult = Vec<Option<TopicFilter>>;

pub(crate) const UNDEFINED: &str = "undefined";

#[derive(Clone)]
pub struct SessionTx {
tx: futures::channel::mpsc::UnboundedSender<Message>,
}

impl SessionTx {
pub fn new(tx: futures::channel::mpsc::UnboundedSender<Message>) -> Self {
Self { tx }
}

#[inline]
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}

#[inline]
pub fn unbounded_send(
&self,
msg: Message,
) -> std::result::Result<(), futures::channel::mpsc::TrySendError<Message>> {
match self.tx.unbounded_send(msg) {
Ok(()) => {
#[cfg(feature = "debug")]
Runtime::instance().stats.debug_session_channels.inc();
Ok(())
}
Err(e) => Err(e),
}
}
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ConnectInfo {
V3(Id, ConnectV3),
Expand Down

0 comments on commit 1ad046d

Please sign in to comment.