Skip to content

Commit

Permalink
Add 'debug_runtime_exec_stats' and 'forwards' member fields.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 25, 2023
1 parent 1ad046d commit 2ec13e9
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions rmqtt/src/broker/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub struct Stats {
pub message_queues: Counter,
pub out_inflights: Counter,
pub in_inflights: Counter,
pub forwards: Counter,

topics_map: HashMap<NodeId, Counter>,
routes_map: HashMap<NodeId, Counter>,
Expand All @@ -142,6 +143,8 @@ pub struct Stats {
debug_subscriptions: usize,
#[cfg(feature = "debug")]
pub debug_session_channels: Counter,
#[cfg(feature = "debug")]
debug_runtime_exec_stats: Option<RuntimeExecStats>,
}

impl Stats {
Expand All @@ -160,6 +163,7 @@ impl Stats {
message_queues: Counter::new(),
out_inflights: Counter::new(),
in_inflights: Counter::new(),
forwards: Counter::new(),

topics_map: HashMap::default(),
routes_map: HashMap::default(),
Expand All @@ -174,6 +178,8 @@ impl Stats {
debug_subscriptions: 0,
#[cfg(feature = "debug")]
debug_session_channels: Counter::new(),
#[cfg(feature = "debug")]
debug_runtime_exec_stats: None,
})
}

Expand Down Expand Up @@ -211,6 +217,8 @@ impl Stats {
self.debug_shared_peers.current_set(shared.sessions_count() as isize);
#[cfg(feature = "debug")]
let debug_subscriptions = shared.subscriptions_count().await;
#[cfg(feature = "debug")]
let debug_runtime_exec_stats = Some(RuntimeExecStats::new());

Self {
handshakings: self.handshakings.clone(),
Expand All @@ -224,6 +232,7 @@ impl Stats {
message_queues: self.message_queues.clone(),
out_inflights: self.out_inflights.clone(),
in_inflights: self.in_inflights.clone(),
forwards: self.forwards.clone(),

topics_map,
routes_map,
Expand All @@ -238,6 +247,8 @@ impl Stats {
debug_subscriptions,
#[cfg(feature = "debug")]
debug_session_channels: self.debug_session_channels.clone(),
#[cfg(feature = "debug")]
debug_runtime_exec_stats,
}
}

Expand All @@ -254,6 +265,7 @@ impl Stats {
self.message_queues.add(&other.message_queues);
self.out_inflights.add(&other.out_inflights);
self.in_inflights.add(&other.in_inflights);
self.forwards.add(&other.forwards);

self.topics_map.extend(other.topics_map);
self.routes_map.extend(other.routes_map);
Expand All @@ -265,6 +277,14 @@ impl Stats {
self.debug_shared_peers.add(&other.debug_shared_peers);
self.debug_subscriptions += other.debug_subscriptions;
self.debug_session_channels.add(&other.debug_session_channels);

if let Some(other) = other.debug_runtime_exec_stats.as_ref() {
if let Some(stats) = self.debug_runtime_exec_stats.as_mut() {
stats.add(other);
} else {
self.debug_runtime_exec_stats.replace(other.clone());
}
}
}
}

Expand Down Expand Up @@ -298,6 +318,8 @@ impl Stats {
"out_inflights.max": self.out_inflights.max(),
"in_inflights.count": self.in_inflights.count(),
"in_inflights.max": self.in_inflights.max(),
"forwards.count": self.forwards.count(),
"forwards.max": self.forwards.max(),

"topics.count": topics.count(),
"topics.max": topics.max(),
Expand All @@ -314,9 +336,43 @@ impl Stats {
obj.insert("debug_subscriptions.count".into(), json!(self.debug_subscriptions));
obj.insert("debug_session_channels.count".into(), json!(self.debug_session_channels.count()));
obj.insert("debug_session_channels.max".into(), json!(self.debug_session_channels.max()));
obj.insert("debug_runtime_exec_stats".into(), json!(self.debug_runtime_exec_stats));
}
}

json_val
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RuntimeExecStats {
active_count: isize,
completed_count: isize,
pending_wakers_count: usize,
waiting_count: isize,
rate: f64,
}

impl RuntimeExecStats {
#[allow(dead_code)]
pub fn new() -> Self {
let exec = &Runtime::instance().exec;
Self {
active_count: exec.active_count(),
completed_count: exec.completed_count(),
pending_wakers_count: exec.pending_wakers_count(),
waiting_count: exec.waiting_count(),
rate: exec.rate(),
}
}

#[allow(dead_code)]
#[inline]
fn add(&mut self, other: &Self) {
self.active_count += other.active_count;
self.completed_count += other.completed_count;
self.pending_wakers_count += other.pending_wakers_count;
self.waiting_count += other.waiting_count;
self.rate += other.rate;
}
}

0 comments on commit 2ec13e9

Please sign in to comment.