Skip to content

Commit

Permalink
Add config item 'allow_zero_keepalive', a value of zero indicates dis…
Browse files Browse the repository at this point in the history
…abling the keep-alive feature, where the server doesn't need to disconnect due to client inactivity. default: true
  • Loading branch information
rmqtt committed Aug 18, 2023
1 parent 26315b5 commit e253f4c
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 27 deletions.
4 changes: 4 additions & 0 deletions rmqtt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ listener.tcp.external.allow_anonymous = true
#Minimum allowable keepalive value for mqtt connection,
#less than this value will reject the connection, default: 0, unit: seconds
listener.tcp.external.min_keepalive = 0
#A value of zero indicates disabling the keep-alive feature, where the server
#doesn't need to disconnect due to client inactivity, default: true
listener.tcp.external.allow_zero_keepalive = true

# > 0.5, Keepalive * backoff * 2
listener.tcp.external.keepalive_backoff = 0.75
#Flight window size. The flight window is used to store the unanswered QoS 1 and QoS 2 messages
Expand Down
35 changes: 24 additions & 11 deletions rmqtt/src/broker/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ impl super::Entry for LockEntry {
}

if let Some((s, _, _)) = self._remove(clear_subscriptions).await {
Ok(Some(s.to_offline_info().await))
if clean_start {
Ok(None)
} else {
Ok(Some(s.to_offline_info().await))
}
} else {
Ok(None)
}
Expand Down Expand Up @@ -1154,19 +1158,28 @@ impl Fitter for DefaultFitter {
#[inline]
fn keep_alive(&self, keep_alive: &mut u16) -> Result<u16> {
if *keep_alive == 0 {
return Err(MqttError::from("Keepalive must be greater than 0"));
}
if *keep_alive < self.listen_cfg.min_keepalive {
if self.client.protocol() == MQTT_LEVEL_5 {
*keep_alive = self.listen_cfg.min_keepalive;
if self.listen_cfg.allow_zero_keepalive {
Ok(0)
} else {
Err(MqttError::from("Keepalive must be greater than 0"))
}
} else {
if *keep_alive < self.listen_cfg.min_keepalive {
if self.client.protocol() == MQTT_LEVEL_5 {
*keep_alive = self.listen_cfg.min_keepalive;
} else {
return Err(MqttError::from(format!(
"Keepalive is too small, cannot be less than {}",
self.listen_cfg.min_keepalive
)));
}
}
if *keep_alive < 6 {
Ok(*keep_alive + 3)
} else {
return Err(MqttError::from(format!(
"Keepalive is too small, cannot be less than {}",
self.listen_cfg.min_keepalive
)));
Ok(((*keep_alive as f32 * self.listen_cfg.keepalive_backoff) * 2.0) as u16)
}
}
Ok(((*keep_alive as f32 * self.listen_cfg.keepalive_backoff) * 2.0) as u16)
}

#[inline]
Expand Down
38 changes: 22 additions & 16 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,33 @@ impl SessionState {

#[inline]
pub(crate) async fn start(mut self, keep_alive: u16) -> (Self, Tx) {
log::debug!("{:?} start online event loop", self.id);
let (msg_tx, mut msg_rx) = futures::channel::mpsc::unbounded();
self.tx.replace(msg_tx.clone());
let mut state = self.clone();

let keep_alive_interval = if keep_alive == 0 {
Duration::from_secs(u32::MAX as u64)
} else {
Duration::from_secs(keep_alive as u64)
};
log::debug!("{:?} keep_alive_interval is {:?}", state.id, keep_alive_interval);
let keep_alive_delay = tokio::time::sleep(keep_alive_interval);

let deliver_timeout_delay = tokio::time::sleep(Duration::from_secs(60));

let limiter = {
let (burst, replenish_n_per) = state.fitter.mqueue_rate_limit();
Limiter::new(burst, replenish_n_per)
};

let mut flags = StateFlags::empty();

log::debug!("{:?} there are {} offline messages ...", state.id, state.deliver_queue.len());

ntex::rt::spawn(async move {
log::debug!("{:?} there are {} offline messages ...", state.id, state.deliver_queue.len());
Runtime::instance().stats.connections.inc();

let limiter = {
let (burst, replenish_n_per) = state.fitter.mqueue_rate_limit();
Limiter::new(burst, replenish_n_per)
};
let (deliver_queue_tx, mut deliver_queue_rx) = limiter.channel(state.deliver_queue.clone());
//When the message queue is full, the message dropping policy is implemented
let deliver_queue_tx = deliver_queue_tx.policy(|(_, p): &(From, Publish)| -> Policy {
Expand All @@ -78,18 +94,8 @@ impl SessionState {
});
state.deliver_queue_tx.replace(deliver_queue_tx.clone());

let mut flags = StateFlags::empty();
log::debug!("{:?} start online event loop", state.id);
let keep_alive_interval = if keep_alive < 10 {
Duration::from_secs(10)
} else {
Duration::from_secs((keep_alive + 10) as u64)
};
log::debug!("{:?} keep_alive_interval is {:?}", state.id, keep_alive_interval);
let keep_alive_delay = tokio::time::sleep(keep_alive_interval);
tokio::pin!(keep_alive_delay);

let deliver_timeout_delay = tokio::time::sleep(Duration::from_secs(60));
tokio::pin!(deliver_timeout_delay);

loop {
Expand Down Expand Up @@ -153,6 +159,7 @@ impl SessionState {
break
},
Message::Keepalive => {
log::debug!("{:?} Message::Keepalive ... ", state.id);
keep_alive_delay.as_mut().reset(Instant::now() + keep_alive_interval);
},
Message::Subscribe(sub, reply_tx) => {
Expand Down Expand Up @@ -313,7 +320,6 @@ impl SessionState {
},
_ => {
log::info!("{:?} offline receive message is {:?}", state.id, msg);
break;
}
}
}else{
Expand Down
7 changes: 7 additions & 0 deletions rmqtt/src/settings/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub struct ListenerInner {
//deserialize_with = "deserialize_duration"
)]
pub min_keepalive: u16,
#[serde(default = "ListenerInner::allow_zero_keepalive_default")]
pub allow_zero_keepalive: bool,
#[serde(default = "ListenerInner::keepalive_backoff_default")]
pub keepalive_backoff: f32,
#[serde(default = "ListenerInner::max_inflight_default")]
Expand Down Expand Up @@ -246,6 +248,7 @@ impl Default for ListenerInner {
idle_timeout: ListenerInner::idle_timeout_default(),
allow_anonymous: ListenerInner::allow_anonymous_default(),
min_keepalive: ListenerInner::min_keepalive_default(),
allow_zero_keepalive: ListenerInner::allow_zero_keepalive_default(),
keepalive_backoff: ListenerInner::keepalive_backoff_default(),
max_inflight: ListenerInner::max_inflight_default(),
handshake_timeout: ListenerInner::handshake_timeout_default(),
Expand Down Expand Up @@ -317,6 +320,10 @@ impl ListenerInner {
0
}
#[inline]
fn allow_zero_keepalive_default() -> bool {
true
}
#[inline]
fn keepalive_backoff_default() -> f32 {
0.75
}
Expand Down

0 comments on commit e253f4c

Please sign in to comment.