Skip to content

Commit

Permalink
Add statistics for message_queues and inflights.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 29, 2023
1 parent 152e2d3 commit 086c57c
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 13 deletions.
58 changes: 54 additions & 4 deletions rmqtt/src/broker/inflight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use rust_box::dequemap::DequeMap;

use crate::broker::queue::OnEventFn;
use crate::broker::types::{
From, Packet, PacketId, PacketV3, PacketV5, Publish, PublishAck2, PublishAck2Reason, TimestampMillis,
UserProperties,
Expand Down Expand Up @@ -79,13 +80,40 @@ pub struct Inflight {
interval: TimestampMillis,
next: Arc<AtomicU16>,
queues: Queues,
on_push_fn: Option<Arc<dyn OnEventFn<()>>>,
on_pop_fn: Option<Arc<dyn OnEventFn<()>>>,
}

impl Inflight {
#[inline]
pub fn new(cap: usize, retry_interval: TimestampMillis, expiry_interval: TimestampMillis) -> Self {
let interval = Self::interval(retry_interval, expiry_interval);
Self { cap, interval, next: Arc::new(AtomicU16::new(1)), queues: Queues::default() }
Self {
cap,
interval,
next: Arc::new(AtomicU16::new(1)),
queues: Queues::default(),
on_push_fn: None,
on_pop_fn: None,
}
}

#[inline]
pub fn on_push<F>(mut self, f: F) -> Self
where
F: OnEventFn<()>,
{
self.on_push_fn = Some(Arc::new(f));
self
}

#[inline]
pub fn on_pop<F>(mut self, f: F) -> Self
where
F: OnEventFn<()>,
{
self.on_pop_fn = Some(Arc::new(f));
self
}

#[inline]
Expand Down Expand Up @@ -139,7 +167,14 @@ impl Inflight {

#[inline]
pub fn pop_front(&mut self) -> Option<InflightMessage> {
self.queues.pop_front().map(|(_, m)| m)
if let Some(msg) = self.queues.pop_front().map(|(_, m)| m) {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&());
}
Some(msg)
} else {
None
}
}

#[inline]
Expand All @@ -154,15 +189,30 @@ impl Inflight {
#[inline]
pub fn push_back(&mut self, m: InflightMessage) {
if let Some(packet_id) = m.publish.packet_id() {
self.queues.insert(packet_id, m);
if let Some(f) = self.on_push_fn.as_ref() {
f(&());
}
let old = self.queues.insert(packet_id, m);
if old.is_some() {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&());
}
}
} else {
log::warn!("packet_id is None, inflight message: {:?}", m);
}
}

#[inline]
pub fn remove(&mut self, packet_id: &PacketId) -> Option<InflightMessage> {
self.queues.remove(packet_id)
if let Some(msg) = self.queues.remove(packet_id) {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&());
}
Some(msg)
} else {
None
}
}

#[inline]
Expand Down
35 changes: 33 additions & 2 deletions rmqtt/src/broker/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub trait PolicyFn<P>: 'static + Fn(&P) -> Policy {}

impl<T, P> PolicyFn<P> for T where T: 'static + Clone + ?Sized + Fn(&P) -> Policy {}

pub trait OnEventFn<P>: 'static + Sync + Send + Fn(&P) {}
impl<T, P> OnEventFn<P> for T where T: 'static + Sync + Send + Clone + ?Sized + Fn(&P) {}

#[derive(Clone)]
pub struct Sender<T> {
tx: mpsc::Sender<()>,
Expand Down Expand Up @@ -150,6 +153,8 @@ impl Limiter {
pub struct Queue<T> {
cap: usize,
inner: SegQueue<T>,
on_push_fn: Option<Arc<dyn OnEventFn<T>>>,
on_pop_fn: Option<Arc<dyn OnEventFn<T>>>,
}

impl<T> Drop for Queue<T> {
Expand All @@ -162,21 +167,47 @@ impl<T> Drop for Queue<T> {
impl<T> Queue<T> {
#[inline]
pub fn new(cap: usize) -> Self {
Self { cap, inner: SegQueue::new() }
Self { cap, inner: SegQueue::new(), on_push_fn: None, on_pop_fn: None }
}

#[inline]
pub fn on_push<F>(&mut self, f: F)
where
F: OnEventFn<T>,
{
self.on_push_fn = Some(Arc::new(f));
}

#[inline]
pub fn on_pop<F>(&mut self, f: F)
where
F: OnEventFn<T>,
{
self.on_pop_fn = Some(Arc::new(f));
}

#[inline]
pub fn push(&self, v: T) -> Result<(), T> {
if self.inner.len() > self.cap {
return Err(v);
}
if let Some(f) = self.on_push_fn.as_ref() {
f(&v);
}
self.inner.push(v);
Ok(())
}

#[inline]
pub fn pop(&self) -> Option<T> {
self.inner.pop()
if let Some(v) = self.inner.pop() {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&v);
}
Some(v)
}else{
None
}
}

#[inline]
Expand Down
27 changes: 20 additions & 7 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ impl SessionState {
},
Message::Disconnect(d) => {
flags.insert(StateFlags::DisconnectReceived);
state.client.add_disconnected_reason(d.reason()).await;
state.client.set_mqtt_disconnect(d).await;
},
Message::Closed(reason) => {
Expand Down Expand Up @@ -976,18 +975,31 @@ impl Session {
let max_inflight = max_inflight.get() as usize;
let message_retry_interval = listen_cfg.message_retry_interval.as_millis() as TimestampMillis;
let message_expiry_interval = listen_cfg.message_expiry_interval.as_millis() as TimestampMillis;
let mut deliver_queue = MessageQueue::new(max_mqueue_len);
deliver_queue.on_push(|_v|{
Runtime::instance().stats.message_queues.inc();
});
deliver_queue.on_pop(|_v|{
Runtime::instance().stats.message_queues.dec();
});
let out_inflight = Inflight::new(
max_inflight,
message_retry_interval,
message_expiry_interval,
).on_push(|_|{
Runtime::instance().stats.inflights.inc();
}).on_pop(|_|{
Runtime::instance().stats.inflights.dec();
});

Runtime::instance().stats.sessions.inc();
Self(Arc::new(_SessionInner {
id,
fitter,
listen_cfg,
subscriptions: SessionSubs::new(),
deliver_queue: Arc::new(MessageQueue::new(max_mqueue_len)),
inflight_win: Arc::new(RwLock::new(Inflight::new(
max_inflight,
message_retry_interval,
message_expiry_interval,
))),
deliver_queue: Arc::new(deliver_queue),
inflight_win: Arc::new(RwLock::new(out_inflight)),
created_at,
}))
}
Expand Down Expand Up @@ -1187,6 +1199,7 @@ impl ClientInfo {
}

pub(crate) async fn set_mqtt_disconnect(&self, d: Disconnect) {
self.add_disconnected_reason(d.reason()).await;
self.disconnect.write().await.replace(d);
}

Expand Down
13 changes: 13 additions & 0 deletions rmqtt/src/broker/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub struct Stats {
pub subscriptions: Counter,
pub subscriptions_shared: Counter,
pub retaineds: Counter,
pub message_queues: Counter,
pub inflights: Counter,

topics_map: HashMap<NodeId, Counter>,
routes_map: HashMap<NodeId, Counter>,
Expand All @@ -150,6 +152,8 @@ impl Stats {
subscriptions: Counter::new(),
subscriptions_shared: Counter::new(),
retaineds: Counter::new(),
message_queues: Counter::new(),
inflights: Counter::new(),

topics_map: HashMap::default(),
routes_map: HashMap::default(),
Expand Down Expand Up @@ -201,6 +205,8 @@ impl Stats {
subscriptions: self.subscriptions.clone(),
subscriptions_shared: self.subscriptions_shared.clone(),
retaineds: self.retaineds.clone(), //retained messages
message_queues: self.message_queues.clone(),
inflights: self.inflights.clone(),

topics_map,
routes_map,
Expand All @@ -224,6 +230,8 @@ impl Stats {
self.subscriptions.add(&other.subscriptions);
self.subscriptions_shared.add(&other.subscriptions_shared);
self.retaineds.add(&other.retaineds);
self.message_queues.add(&other.message_queues);
self.inflights.add(&other.inflights);

self.topics_map.extend(other.topics_map);
self.routes_map.extend(other.routes_map);
Expand Down Expand Up @@ -260,6 +268,11 @@ impl Stats {
"retained.count": self.retaineds.count(),
"retained.max": self.retaineds.max(),

"message_queues.count": self.message_queues.count(),
"message_queues.max": self.message_queues.max(),
"inflights.count": self.inflights.count(),
"inflights.max": self.inflights.max(),

"topics.count": topics.count(),
"topics.max": topics.max(),
"routes.count": routes.count(),
Expand Down

0 comments on commit 086c57c

Please sign in to comment.