Skip to content

Commit

Permalink
Optimize the codes
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 29, 2023
1 parent 6cd7803 commit 13766a5
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 30 deletions.
16 changes: 8 additions & 8 deletions rmqtt/src/broker/inflight.rs
Expand Up @@ -80,8 +80,8 @@ pub struct Inflight {
interval: TimestampMillis,
next: Arc<AtomicU16>,
queues: Queues,
on_push_fn: Option<Arc<dyn OnEventFn<()>>>,
on_pop_fn: Option<Arc<dyn OnEventFn<()>>>,
on_push_fn: Option<Arc<dyn OnEventFn>>,
on_pop_fn: Option<Arc<dyn OnEventFn>>,
}

impl Inflight {
Expand All @@ -101,7 +101,7 @@ impl Inflight {
#[inline]
pub fn on_push<F>(mut self, f: F) -> Self
where
F: OnEventFn<()>,
F: OnEventFn,
{
self.on_push_fn = Some(Arc::new(f));
self
Expand All @@ -110,7 +110,7 @@ impl Inflight {
#[inline]
pub fn on_pop<F>(mut self, f: F) -> Self
where
F: OnEventFn<()>,
F: OnEventFn,
{
self.on_pop_fn = Some(Arc::new(f));
self
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Inflight {
pub fn pop_front(&mut self) -> Option<InflightMessage> {
if let Some(msg) = self.queues.pop_front().map(|(_, m)| m) {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&());
f();
}
Some(msg)
} else {
Expand All @@ -190,12 +190,12 @@ impl Inflight {
pub fn push_back(&mut self, m: InflightMessage) {
if let Some(packet_id) = m.publish.packet_id() {
if let Some(f) = self.on_push_fn.as_ref() {
f(&());
f();
}
let old = self.queues.insert(packet_id, m);
if old.is_some() {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&());
f();
}
}
} else {
Expand All @@ -207,7 +207,7 @@ impl Inflight {
pub fn remove(&mut self, packet_id: &PacketId) -> Option<InflightMessage> {
if let Some(msg) = self.queues.remove(packet_id) {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&());
f();
}
Some(msg)
} else {
Expand Down
22 changes: 11 additions & 11 deletions rmqtt/src/broker/queue.rs
Expand Up @@ -32,8 +32,8 @@ pub trait PolicyFn<P>: 'static + Fn(&P) -> Policy {}

impl<T, P> PolicyFn<P> for T where T: 'static + Clone + ?Sized + Fn(&P) -> Policy {}

pub trait OnEventFn<P>: 'static + Sync + Send + Fn(&P) {}
impl<T, P> OnEventFn<P> for T where T: 'static + Sync + Send + Clone + ?Sized + Fn(&P) {}
pub trait OnEventFn: 'static + Sync + Send + Fn() {}
impl<T> OnEventFn for T where T: 'static + Sync + Send + Clone + ?Sized + Fn() {}

#[derive(Clone)]
pub struct Sender<T> {
Expand Down Expand Up @@ -153,8 +153,8 @@ impl Limiter {
pub struct Queue<T> {
cap: usize,
inner: SegQueue<T>,
on_push_fn: Option<Arc<dyn OnEventFn<T>>>,
on_pop_fn: Option<Arc<dyn OnEventFn<T>>>,
on_push_fn: Option<Arc<dyn OnEventFn>>,
on_pop_fn: Option<Arc<dyn OnEventFn>>,
}

impl<T> Drop for Queue<T> {
Expand All @@ -172,16 +172,16 @@ impl<T> Queue<T> {

#[inline]
pub fn on_push<F>(&mut self, f: F)
where
F: OnEventFn<T>,
where
F: OnEventFn,
{
self.on_push_fn = Some(Arc::new(f));
}

#[inline]
pub fn on_pop<F>(&mut self, f: F)
where
F: OnEventFn<T>,
where
F: OnEventFn,
{
self.on_pop_fn = Some(Arc::new(f));
}
Expand All @@ -192,7 +192,7 @@ impl<T> Queue<T> {
return Err(v);
}
if let Some(f) = self.on_push_fn.as_ref() {
f(&v);
f();
}
self.inner.push(v);
Ok(())
Expand All @@ -202,10 +202,10 @@ impl<T> Queue<T> {
pub fn pop(&self) -> Option<T> {
if let Some(v) = self.inner.pop() {
if let Some(f) = self.on_pop_fn.as_ref() {
f(&v);
f();
}
Some(v)
}else{
} else {
None
}
}
Expand Down
20 changes: 9 additions & 11 deletions rmqtt/src/broker/session.rs
Expand Up @@ -976,21 +976,19 @@ impl Session {
let message_retry_interval = listen_cfg.message_retry_interval.as_millis() as TimestampMillis;
let message_expiry_interval = listen_cfg.message_expiry_interval.as_millis() as TimestampMillis;
let mut deliver_queue = MessageQueue::new(max_mqueue_len);
deliver_queue.on_push(|_v|{
deliver_queue.on_push(|| {
Runtime::instance().stats.message_queues.inc();
});
deliver_queue.on_pop(|_v|{
deliver_queue.on_pop(|| {
Runtime::instance().stats.message_queues.dec();
});
let out_inflight = Inflight::new(
max_inflight,
message_retry_interval,
message_expiry_interval,
).on_push(|_|{
Runtime::instance().stats.inflights.inc();
}).on_pop(|_|{
Runtime::instance().stats.inflights.dec();
});
let out_inflight = Inflight::new(max_inflight, message_retry_interval, message_expiry_interval)
.on_push(|| {
Runtime::instance().stats.inflights.inc();
})
.on_pop(|| {
Runtime::instance().stats.inflights.dec();
});

Runtime::instance().stats.sessions.inc();
Self(Arc::new(_SessionInner {
Expand Down

0 comments on commit 13766a5

Please sign in to comment.