Skip to content

Commit

Permalink
Fix session expiration time anomalies
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Jul 28, 2023
1 parent 4704b11 commit e685253
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 52 deletions.
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-http-api/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ async fn build_result(s: Option<Session>, c: Option<ClientInfo>) -> SearchResult
let disconnected_at = c.disconnected_at() / 1000;
let disconnected_reason = c.get_disconnected_reason().await.to_string();
let expiry_interval = if connected {
s.listen_cfg.session_expiry_interval.as_secs() as i64
s.fitter.session_expiry_interval().await.as_secs() as i64
} else {
s.listen_cfg.session_expiry_interval.as_secs() as i64
s.fitter.session_expiry_interval().await.as_secs() as i64
- (chrono::Local::now().timestamp() - disconnected_at)
};
let inflight = s.inflight_win.read().await.len();
Expand Down
24 changes: 18 additions & 6 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,8 @@ impl DefaultFitterManager {

impl FitterManager for &'static DefaultFitterManager {
#[inline]
fn get(&self, client: ClientInfo, id: Id, listen_cfg: Listener) -> std::rc::Rc<dyn Fitter> {
std::rc::Rc::new(DefaultFitter::new(client, id, listen_cfg))
fn get(&self, client: ClientInfo, id: Id, listen_cfg: Listener) -> Box<dyn Fitter> {
Box::new(DefaultFitter::new(client, id, listen_cfg))
}
}

Expand Down Expand Up @@ -1187,11 +1187,23 @@ impl Fitter for DefaultFitter {
}

#[inline]
fn session_expiry_interval(&self) -> Duration {
if let ConnectInfo::V5(_, connect) = &self.client.connect_info {
Duration::from_secs(connect.session_expiry_interval_secs.unwrap_or_default() as u64)
async fn session_expiry_interval(&self) -> Duration {
let expiry_interval = || {
if let ConnectInfo::V5(_, connect) = &self.client.connect_info {
Duration::from_secs(connect.session_expiry_interval_secs.unwrap_or_default() as u64)
} else {
self.listen_cfg.session_expiry_interval
}
};

if let Some(Disconnect::V5(d)) = self.client.disconnect.read().await.as_ref() {
if let Some(interval_secs) = d.session_expiry_interval_secs {
Duration::from_secs(interval_secs as u64)
} else {
expiry_interval()
}
} else {
self.listen_cfg.session_expiry_interval
expiry_interval()
}
}

Expand Down
4 changes: 2 additions & 2 deletions rmqtt/src/broker/fitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::Result;

#[async_trait]
pub trait FitterManager: Sync + Send {
fn get(&self, client: ClientInfo, id: Id, listen_cfg: Listener) -> std::rc::Rc<dyn Fitter>;
fn get(&self, client: ClientInfo, id: Id, listen_cfg: Listener) -> Box<dyn Fitter>;
}

#[async_trait]
Expand All @@ -28,7 +28,7 @@ pub trait Fitter: Sync + Send {
fn max_inflight(&self) -> std::num::NonZeroU16;

///session expiry interval
fn session_expiry_interval(&self) -> Duration;
async fn session_expiry_interval(&self) -> Duration;

///max packet size
fn max_packet_size(&self) -> u32;
Expand Down
44 changes: 17 additions & 27 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::convert::AsRef;
use std::convert::From as _f;
use std::convert::TryFrom;
use std::fmt;
use std::num::NonZeroU16;
use std::ops::Deref;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
Expand Down Expand Up @@ -32,7 +33,6 @@ pub struct SessionState {
pub sink: Sink,
pub hook: Rc<dyn Hook>,
pub deliver_queue_tx: Option<MessageSender>,
pub fitter: Rc<dyn Fitter>,
}

impl fmt::Debug for SessionState {
Expand All @@ -50,14 +50,8 @@ impl fmt::Debug for SessionState {

impl SessionState {
#[inline]
pub(crate) fn new(
session: Session,
client: ClientInfo,
sink: Sink,
hook: Rc<dyn Hook>,
fitter: Rc<dyn Fitter>,
) -> Self {
Self { tx: None, session, client, sink, hook, deliver_queue_tx: None, fitter }
pub(crate) fn new(session: Session, client: ClientInfo, sink: Sink, hook: Rc<dyn Hook>) -> Self {
Self { tx: None, session, client, sink, hook, deliver_queue_tx: None }
}

#[inline]
Expand Down Expand Up @@ -276,10 +270,14 @@ impl SessionState {
deliver_queue_tx: &MessageSender,
flags: &mut StateFlags,
) {
log::debug!("{:?} start offline event loop", state.id);
log::debug!(
"{:?} start offline event loop, session_expiry_interval: {:?}",
state.id,
state.fitter.session_expiry_interval().await
);

//state.client.disconnect
let session_expiry_delay = tokio::time::sleep(state.fitter.session_expiry_interval());
let session_expiry_delay = tokio::time::sleep(state.fitter.session_expiry_interval().await);
tokio::pin!(session_expiry_delay);

loop {
Expand Down Expand Up @@ -315,6 +313,7 @@ impl SessionState {
},
_ => {
log::info!("{:?} offline receive message is {:?}", state.id, msg);
break;
}
}
}else{
Expand Down Expand Up @@ -774,25 +773,12 @@ impl SessionState {
Ok(())
}

#[inline]
async fn session_expiry_interval(&self) -> Duration {
if let Some(Disconnect::V5(d)) = self.client.disconnect.read().await.as_ref() {
if let Some(interval_secs) = d.session_expiry_interval_secs {
Duration::from_secs(interval_secs as u64)
} else {
self.fitter.session_expiry_interval()
}
} else {
self.fitter.session_expiry_interval()
}
}

#[inline]
async fn clean_session(&self) -> bool {
if let ConnectInfo::V3(_, conn_info) = &self.client.connect_info {
conn_info.clean_session
} else {
self.session_expiry_interval().await.as_secs() == 0
self.fitter.session_expiry_interval().await.is_zero()
}
}
}
Expand Down Expand Up @@ -837,16 +823,19 @@ impl Session {
#[inline]
pub(crate) fn new(
id: Id,
fitter: Box<dyn Fitter>,
listen_cfg: Listener,
max_mqueue_len: usize,
max_inflight: usize,
max_inflight: NonZeroU16,
created_at: TimestampMillis,
) -> Self {
let max_mqueue_len = fitter.max_mqueue_len();
let max_inflight = max_inflight.get() as usize;
let message_retry_interval = listen_cfg.message_retry_interval.as_millis() as TimestampMillis;
let message_expiry_interval = listen_cfg.message_expiry_interval.as_millis() as TimestampMillis;
Runtime::instance().stats.sessions.inc();
Self(Arc::new(_SessionInner {
id,
fitter,
listen_cfg,
subscriptions: SessionSubs::new(),
deliver_queue: Arc::new(MessageQueue::new(max_mqueue_len)),
Expand Down Expand Up @@ -901,6 +890,7 @@ impl std::fmt::Debug for Session {

pub struct _SessionInner {
pub id: Id,
pub fitter: Box<dyn Fitter>,
pub listen_cfg: Listener,
//Current subscription for this session
pub subscriptions: SessionSubs,
Expand Down
14 changes: 4 additions & 10 deletions rmqtt/src/broker/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,10 @@ async fn _handshake<Io: 'static>(
let created_at =
if let Some(ref offline_info) = offline_info { offline_info.created_at } else { connected_at };

let session = Session::new(
id,
listen_cfg,
fitter.max_mqueue_len(),
fitter.max_inflight().get() as usize,
created_at,
);
let max_inflight = fitter.max_inflight();
let session = Session::new(id, fitter, listen_cfg, max_inflight, created_at);

let keep_alive = match fitter.keep_alive(&mut packet.keep_alive) {
let keep_alive = match session.fitter.keep_alive(&mut packet.keep_alive) {
Ok(keep_alive) => keep_alive,
Err(e) => {
return Ok(refused_ack(
Expand All @@ -177,8 +172,7 @@ async fn _handshake<Io: 'static>(
hook.session_created().await;
}

let (state, tx) =
SessionState::new(session, client, Sink::V3(sink), hook, fitter).start(keep_alive).await;
let (state, tx) = SessionState::new(session, client, Sink::V3(sink), hook).start(keep_alive).await;
if let Err(e) = entry.set(state.session.clone(), tx, state.client.clone()).await {
return Ok(refused_ack(
handshake,
Expand Down
8 changes: 3 additions & 5 deletions rmqtt/src/broker/v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ pub async fn _handshake<Io: 'static>(
if let Some(ref offline_info) = offline_info { offline_info.created_at } else { connected_at };

let max_inflight = fitter.max_inflight();
let session =
Session::new(id, listen_cfg, fitter.max_mqueue_len(), max_inflight.get() as usize, created_at);
let session = Session::new(id, fitter, listen_cfg, max_inflight, created_at);

let keep_alive = match fitter.keep_alive(&mut packet.keep_alive) {
let keep_alive = match session.fitter.keep_alive(&mut packet.keep_alive) {
Ok(keep_alive) => keep_alive,
Err(e) => {
return Ok(refused_ack(
Expand All @@ -185,8 +184,7 @@ pub async fn _handshake<Io: 'static>(
hook.session_created().await;
}

let (state, tx) =
SessionState::new(session, client, Sink::V5(sink), hook, fitter).start(keep_alive).await;
let (state, tx) = SessionState::new(session, client, Sink::V5(sink), hook).start(keep_alive).await;

if let Err(e) = entry.set(state.session.clone(), tx, state.client.clone()).await {
return Ok(refused_ack(
Expand Down

0 comments on commit e685253

Please sign in to comment.