Skip to content

Commit

Permalink
When the system is busy, restrict the rate of processing and deliveri…
Browse files Browse the repository at this point in the history
…ng messages.
  • Loading branch information
rmqtt committed Oct 16, 2023
1 parent 79019fa commit 159b137
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 19 deletions.
27 changes: 20 additions & 7 deletions rmqtt/src/broker/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,26 @@ pub async fn publish(state: v3::Session<SessionState>, pub_msg: v3::PublishMessa

match pub_msg {
v3::PublishMessage::Publish(publish) => {
if let Err(e) = state.publish_v3(&publish).await {
log::error!(
"{:?} Publish failed, reason: {}",
state.id,
state.client.get_disconnected_reason().await
);
return Err(e);
let publish_fut = async move {
if let Err(e) = state.publish_v3(&publish).await {
log::error!(
"{:?} Publish failed, reason: {}",
state.id,
state.client.get_disconnected_reason().await
);
Err(e)
} else {
Ok(())
}
};
if Runtime::instance().is_busy() {
Runtime::local_exec()
.spawn(publish_fut)
.result()
.await
.map_err(|e| MqttError::from(e.to_string()))??;
} else {
publish_fut.await?;
}
}
v3::PublishMessage::PublishAck(packet_id) => {
Expand Down
40 changes: 28 additions & 12 deletions rmqtt/src/broker/v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::convert::From as _f;
use std::net::SocketAddr;

use ntex_mqtt::v5;
use ntex_mqtt::v5::codec::{Auth, DisconnectReasonCode};
use ntex_mqtt::v5::codec::{Auth, DisconnectReasonCode, PublishAckReason};
use ntex_mqtt::v5::PublishAck;
use ntex_mqtt::v5::PublishResult;
use rust_box::task_exec_queue::LocalSpawnExt;
use uuid::Uuid;

Expand Down Expand Up @@ -376,27 +378,41 @@ pub async fn publish(

let _ = state.send(Message::Keepalive);

match &pub_msg {
match pub_msg {
v5::PublishMessage::Publish(publish) => {
if let Err(e) = state.publish_v5(publish).await {
log::error!(
"{:?} Publish failed, reason: {:?}",
state.id,
state.client.get_disconnected_reason().await
);
return Err(e);
let publish_fut = async move {
if let Err(e) = state.publish_v5(&publish).await {
log::error!(
"{:?} Publish failed, reason: {}",
state.id,
state.client.get_disconnected_reason().await
);
Err(e)
} else {
Ok(())
}
};
if Runtime::instance().is_busy() {
Runtime::local_exec()
.spawn(publish_fut)
.result()
.await
.map_err(|e| MqttError::from(e.to_string()))??;
} else {
publish_fut.await?;
}
return Ok(PublishResult::PublishAck(PublishAck::new(PublishAckReason::Success)));
}
v5::PublishMessage::PublishAck(ack) => {
v5::PublishMessage::PublishAck(ref ack) => {
if let Some(iflt_msg) = state.inflight_win.write().await.remove(&ack.packet_id.get()) {
//hook, message_ack
state.hook.message_acked(iflt_msg.from, &iflt_msg.publish).await;
}
}
v5::PublishMessage::PublishReceived(ack) => {
v5::PublishMessage::PublishReceived(ref ack) => {
state.inflight_win.write().await.update_status(&ack.packet_id.get(), MomentStatus::UnComplete);
}
v5::PublishMessage::PublishComplete(ack2) => {
v5::PublishMessage::PublishComplete(ref ack2) => {
if let Some(iflt_msg) = state.inflight_win.write().await.remove(&ack2.packet_id.get()) {
//hook, message_ack
state.hook.message_acked(iflt_msg.from, &iflt_msg.publish).await;
Expand Down

0 comments on commit 159b137

Please sign in to comment.