Skip to content

Commit

Permalink
Remove max_awaiting_rel and await_rel_timeout, improve messages with …
Browse files Browse the repository at this point in the history
…inflight (QoS=2).
  • Loading branch information
rmqtt committed Aug 24, 2023
1 parent b4db2ff commit 61dbd38
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 227 deletions.
66 changes: 0 additions & 66 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,69 +107,3 @@ pub(crate) struct FixedHeader {
/// including data in the variable header and the payload.
pub(crate) remaining_length: u32,
}

use dequemap::DequeMap;
use std::num::NonZeroU16;
use std::time::Duration;
type TimestampMillis = i64;

#[derive(Default)]
pub(crate) struct AwaitingRelSet {
pub rels: DequeMap<NonZeroU16, TimestampMillis>,
pub max_awaiting: usize,
pub await_timeout: TimestampMillis,
}

impl AwaitingRelSet {
#[inline]
pub(crate) fn new(max_awaiting: usize, await_timeout: Duration) -> Self {
Self {
rels: DequeMap::default(),
max_awaiting,
await_timeout: await_timeout.as_millis() as TimestampMillis,
}
}

#[inline]
pub(crate) fn contains(&self, packet_id: &NonZeroU16) -> bool {
self.rels.contains_key(packet_id)
}

#[inline]
pub(crate) fn is_full(&self) -> bool {
self.max_awaiting > 0 && self.rels.len() >= self.max_awaiting
}

#[inline]
pub(crate) fn remove(&mut self, packet_id: &NonZeroU16) {
self.rels.remove(packet_id);
}

#[inline]
pub(crate) fn pop(&mut self) -> Option<NonZeroU16> {
self.rels.pop_front().map(|(packet_id, _)| packet_id)
}

#[inline]
pub(crate) fn push(&mut self, packet_id: NonZeroU16) {
self.rels.insert(packet_id, chrono::Local::now().timestamp_millis());
}

#[inline]
pub(crate) fn remove_timeouts(&mut self) {
if self.await_timeout == 0 {
return;
}
let now = chrono::Local::now().timestamp_millis();
while let Some((packet_id, t)) = self.rels.front() {
if (now - *t) < self.await_timeout {
break;
}
log::warn!(
"Timeout awating release QoS2 messages found, will be removed, packet id is {}",
*packet_id
);
self.rels.pop_front();
}
}
}
41 changes: 3 additions & 38 deletions src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::cell::{Cell, RefCell};
use std::task::{Context, Poll};
use std::time::Duration;
use std::{future::Future, marker::PhantomData, num::NonZeroU16, pin::Pin, rc::Rc};

use ntex::service::{fn_factory_with_config, Service, ServiceFactory};
Expand All @@ -17,15 +16,12 @@ use super::{
Session,
};
use crate::error::MqttError;
use crate::types::AwaitingRelSet;

/// mqtt3 protocol dispatcher
pub(super) fn factory<St, T, C, E>(
publish: T,
control: C,
inflight: usize,
max_awaiting_rel: usize,
await_rel_timeout: Duration,
) -> impl ServiceFactory<
Config = Session<St>,
Request = codec::Packet,
Expand Down Expand Up @@ -66,8 +62,6 @@ where
cfg,
publish?,
control?,
max_awaiting_rel,
await_rel_timeout,
),
),
)
Expand All @@ -87,7 +81,6 @@ pub(crate) struct Dispatcher<St, T: Service<Error = MqttError<E>>, C, E> {
struct Inner {
sink: MqttSink,
inflight: RefCell<HashSet<NonZeroU16>>,
awaiting_rels: RefCell<AwaitingRelSet>,
}

impl<St, T, C, E> Dispatcher<St, T, C, E>
Expand All @@ -99,8 +92,6 @@ where
session: Session<St>,
publish: T,
control: C,
max_awaiting_rel: usize,
await_rel_timeout: Duration,
) -> Self {
let sink = session.sink().clone();
Self {
Expand All @@ -111,10 +102,6 @@ where
inner: Rc::new(Inner {
sink,
inflight: RefCell::new(HashSet::default()),
awaiting_rels: RefCell::new(AwaitingRelSet::new(
max_awaiting_rel,
await_rel_timeout,
)),
}),
}
}
Expand Down Expand Up @@ -169,33 +156,11 @@ where
if let Some(pid) = packet_id {
// check for duplicated packet id
if !inner.inflight.borrow_mut().insert(pid) {
log::trace!("Duplicated packet id for publish packet: {:?}", pid);
log::warn!("Duplicated packet id for publish packet: {:?}", pid);
return Either::Right(Either::Left(Ready::Err(
MqttError::V3ProtocolError,
)));
}

//qos == 2
if codec::QoS::ExactlyOnce == qos {
let mut awaiting_rels = inner.awaiting_rels.borrow_mut();
if awaiting_rels.contains(&pid) {
log::warn!(
"Duplicated sending of QoS2 message, packet id is {:?}",
pid
);
return Either::Right(Either::Left(Ready::Ok(None)));
}
//Remove the timeout awating release QoS2 messages, if it exists
awaiting_rels.remove_timeouts();
if awaiting_rels.is_full() {
// Too many awating release QoS2 messages, the earliest ones will be removed
if let Some(packet_id) = awaiting_rels.pop() {
log::warn!("Too many awating release QoS2 messages, remove the earliest, packet id is {}", packet_id);
}
}
//Stored message identifier
awaiting_rels.push(pid)
}
}

Either::Left(PublishResponse {
Expand All @@ -219,7 +184,7 @@ where
}

codec::Packet::PublishRelease { packet_id } => {
self.inner.awaiting_rels.borrow_mut().remove(&packet_id);
self.inner.inflight.borrow_mut().remove(&packet_id);
Either::Right(Either::Left(Ready::Ok(Some(codec::Packet::PublishComplete {
packet_id,
}))))
Expand Down Expand Up @@ -309,9 +274,9 @@ where

match this.result {
PublishResult::PublishAck(Some(packet_id), qos) => {
this.inner.inflight.borrow_mut().remove(packet_id);
match qos {
codec::QoS::AtLeastOnce => {
this.inner.inflight.borrow_mut().remove(packet_id);
Poll::Ready(Ok(Some(codec::Packet::PublishAck {
packet_id: *packet_id,
})))
Expand Down
24 changes: 0 additions & 24 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ pub struct MqttServer<Io, St, C: ServiceFactory, Cn: ServiceFactory, P: ServiceF
inflight: usize,
handshake_timeout: u16,
disconnect_timeout: u16,
max_awaiting_rel: usize,
await_rel_timeout: Duration,
pool: Rc<MqttSinkPool>,
_t: PhantomData<(Io, St)>,
}
Expand Down Expand Up @@ -62,8 +60,6 @@ where
inflight: 16,
handshake_timeout: 0,
disconnect_timeout: 3000,
max_awaiting_rel: 0,
await_rel_timeout: Duration::default(),
pool: Default::default(),
_t: PhantomData,
}
Expand Down Expand Up @@ -124,18 +120,6 @@ where
self
}

///
pub fn max_awaiting_rel(mut self, val: usize) -> Self {
self.max_awaiting_rel = val;
self
}

///
pub fn await_rel_timeout(mut self, val: Duration) -> Self {
self.await_rel_timeout = val;
self
}

/// Service to handle control packets
///
/// All control packets are processed sequentially, max buffered
Expand All @@ -158,8 +142,6 @@ where
inflight: self.inflight,
handshake_timeout: self.handshake_timeout,
disconnect_timeout: self.disconnect_timeout,
max_awaiting_rel: self.max_awaiting_rel,
await_rel_timeout: self.await_rel_timeout,
pool: self.pool,
_t: PhantomData,
}
Expand All @@ -181,8 +163,6 @@ where
inflight: self.inflight,
handshake_timeout: self.handshake_timeout,
disconnect_timeout: self.disconnect_timeout,
max_awaiting_rel: self.max_awaiting_rel,
await_rel_timeout: self.await_rel_timeout,
pool: self.pool,
_t: PhantomData,
}
Expand Down Expand Up @@ -217,8 +197,6 @@ where
publish,
control,
self.inflight,
self.max_awaiting_rel,
self.await_rel_timeout,
),
|req: DispatchItem<Rc<MqttShared>>, srv| match req {
DispatchItem::Item(req) => Either::Left(srv.call(req)),
Expand Down Expand Up @@ -275,8 +253,6 @@ where
publish,
control,
self.inflight,
self.max_awaiting_rel,
self.await_rel_timeout,
),
|req: DispatchItem<Rc<MqttShared>>, srv| match req {
DispatchItem::Item(req) => Either::Left(srv.call(req)),
Expand Down

0 comments on commit 61dbd38

Please sign in to comment.