Skip to content

Commit

Permalink
Use enumeration types to define reasons for disconnection and message…
Browse files Browse the repository at this point in the history
… drop
  • Loading branch information
rmqtt committed Jul 27, 2023
1 parent 52ab365 commit d08ebe7
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 76 deletions.
2 changes: 1 addition & 1 deletion rmqtt-plugins/rmqtt-http-api/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn build_result(s: Option<Session>, c: Option<ClientInfo>) -> SearchResult
let connected = c.is_connected();
let connected_at = c.connected_at / 1000;
let disconnected_at = c.disconnected_at() / 1000;
let disconnected_reason = c.get_disconnected_reason().await.unwrap_or_default();
let disconnected_reason = c.get_disconnected_reason().await.to_string();
let expiry_interval = if connected {
s.listen_cfg.session_expiry_interval.as_secs() as i64
} else {
Expand Down
4 changes: 2 additions & 2 deletions rmqtt-plugins/rmqtt-http-api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rmqtt::node::{BrokerInfo, NodeInfo, NodeStatus};
use rmqtt::plugin::PluginInfo;
use rmqtt::settings::{deserialize_datetime_option, serialize_datetime_option};
use rmqtt::Result;
use rmqtt::{anyhow, bincode, chrono, serde_json, HashMap, MqttError, QoS, Reason};
use rmqtt::{anyhow, bincode, chrono, serde_json, HashMap, MqttError, QoS};
use rmqtt::{metrics::Metrics, stats::Stats};
use rmqtt::{ClientId, NodeId, Timestamp, TopicFilter, TopicName, UserName};

Expand Down Expand Up @@ -127,7 +127,7 @@ pub struct ClientSearchResult {
pub connected: bool,
pub connected_at: Timestamp,
pub disconnected_at: Timestamp,
pub disconnected_reason: Reason,
pub disconnected_reason: String,
pub keepalive: u16,
pub clean_start: bool,
pub session_present: bool,
Expand Down
6 changes: 3 additions & 3 deletions rmqtt-plugins/rmqtt-web-hook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl Handler for WebHookHandler {
"clientid": client.id.client_id,
"username": client.id.username_ref(),
"disconnected_at": client.disconnected_at(),
"reason": reason,
"reason": reason.to_string(),
"time": now_time
});
vec![(None, body)]
Expand Down Expand Up @@ -620,7 +620,7 @@ impl Handler for WebHookHandler {
"ipaddress": client.id.remote_addr,
"clientid": client.id.client_id,
"username": client.id.username_ref(),
"reason": reason,
"reason": reason.to_string(),
"time": now_time
});
vec![(None, body)]
Expand Down Expand Up @@ -686,7 +686,7 @@ impl Handler for WebHookHandler {
"topic": publish.topic(),
"packet_id": publish.packet_id(),
"payload": base64::encode(publish.payload()),
"reason": reason,
"reason": reason.to_string(),
"pts": publish.create_time(),
"ts": now.timestamp_millis(),
"time": now_time
Expand Down
4 changes: 4 additions & 0 deletions rmqtt/src/broker/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tokio::sync::mpsc::error::SendError;
use tokio::task::JoinError;
use tokio::time::Duration;

use super::types::Reason;

#[derive(Error, Debug)]
pub enum MqttError {
#[error("service unavailable")]
Expand Down Expand Up @@ -43,6 +45,8 @@ pub enum MqttError {
ParseIntError(ParseIntError),
#[error("listener config is error")]
ListenerConfigError,
#[error("{0}")]
Reason(Reason),
#[error("None")]
None,
}
Expand Down
98 changes: 42 additions & 56 deletions rmqtt/src/broker/session.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bytestring::ByteString;
use std::convert::AsRef;
use std::convert::From as _f;
use std::convert::TryFrom;
Expand Down Expand Up @@ -112,7 +113,7 @@ impl SessionState {
tokio::select! {
_ = &mut keep_alive_delay => { //, if !keep_alive_delay.is_elapsed()
log::debug!("{:?} keep alive is timeout, is_elapsed: {:?}", state.id, keep_alive_delay.is_elapsed());
state.client.add_disconnected_reason(Reason::from_static("Timeout(Read/Write)")).await;
state.client.add_disconnected_reason(Reason::ConnectKeepaliveTimeout).await;
break
},
msg = msg_rx.next() => {
Expand All @@ -123,7 +124,7 @@ impl SessionState {
if let Err((from, p)) = deliver_queue_tx.send((from, p)).await{
log::warn!("{:?} deliver_dropped, from: {:?}, {:?}", state.id, from, p);
//hook, message_dropped
Runtime::instance().extends.hook_mgr().await.message_dropped(Some(state.id.clone()), from, p, Reason::from_static("deliver queue is full")).await;
Runtime::instance().extends.hook_mgr().await.message_dropped(Some(state.id.clone()), from, p, Reason::MessageQueueFull).await;
}
},
Message::Kick(sender, by_id, is_admin) => {
Expand All @@ -136,17 +137,16 @@ impl SessionState {
if is_admin {
flags.insert(StateFlags::ByAdminKick);
}
state.client.add_disconnected_reason(Reason::from(format!("Kicked by {:?}, is_admin: {}", by_id, is_admin))).await;
state.client.add_disconnected_reason(Reason::ConnectKicked(is_admin)).await;
break
}else{
log::warn!("{:?} Message::Kick, kick sender is closed, to {:?}, is_admin: {}", state.id, by_id, is_admin);
}
},
Message::Disconnect(d) => {
flags.insert(StateFlags::DisconnectReceived);
state.client.add_disconnected_reason(d.reason()).await;
state.client.set_mqtt_disconnect(d).await;
state.client.add_disconnected_reason("Disconnect(true) message is received".into()).await;

},
Message::Closed(reason) => {
log::debug!("{:?} Closed({}) message received, reason: {}", state.id, flags.contains(StateFlags::DisconnectReceived), reason);
Expand Down Expand Up @@ -235,27 +235,26 @@ impl SessionState {
state.sink.close();

//hook, client_disconnected
let reason = state
.client
.get_disconnected_reason()
.await
.unwrap_or(Reason::from_static("Remote close connect"));
let reason = if state.client.has_disconnected_reason().await {
state.client.get_disconnected_reason().await
} else {
state.client.add_disconnected_reason(Reason::ConnectRemoteClose).await;
Reason::ConnectRemoteClose
};
state.hook.client_disconnected(reason).await;

if flags.contains(StateFlags::Kicked) {
if flags.contains(StateFlags::ByAdminKick) {
state.clean(state.client.get_disconnected_reason().await.unwrap_or_default()).await;
state.clean(state.client.take_disconnected_reason().await).await;
}
} else if state.clean_session().await {
state.clean(state.client.take_disconnected_reason().await).await;
} else {
if state.clean_session().await {
state.clean(state.client.get_disconnected_reason().await.unwrap_or_default()).await;
} else {
//Start offline event loop
Self::offline_start(state.clone(), &mut msg_rx, &deliver_queue_tx, &mut flags).await;
log::debug!("{:?} offline flags: {:?}", state.id, flags);
if !flags.contains(StateFlags::Kicked) {
state.clean(Reason::from_static("session expired")).await;
}
//Start offline event loop
Self::offline_start(state.clone(), &mut msg_rx, &deliver_queue_tx, &mut flags).await;
log::debug!("{:?} offline flags: {:?}", state.id, flags);
if !flags.contains(StateFlags::Kicked) {
state.clean(Reason::SessionExpiration).await;
}
}
});
Expand Down Expand Up @@ -285,7 +284,7 @@ impl SessionState {
if let Err((from, p)) = deliver_queue_tx.send((from, p)).await{
log::warn!("{:?} offline deliver_dropped, from: {:?}, {:?}", state.id, from, p);
//hook, message_dropped
Runtime::instance().extends.hook_mgr().await.message_dropped(Some(state.id.clone()), from, p, Reason::from_static("deliver queue is full")).await;
Runtime::instance().extends.hook_mgr().await.message_dropped(Some(state.id.clone()), from, p, Reason::MessageQueueFull).await;
}
},
Message::Kick(sender, by_id, is_admin) => {
Expand Down Expand Up @@ -326,7 +325,7 @@ impl SessionState {
let res = if let Some(ref tx) = self.tx {
if let Err(e) = tx.unbounded_send(Message::Forward(from, p)) {
if let Message::Forward(from, p) = e.into_inner() {
Err((from, p, "Send Publish message error, Tx is closed"))
Err((from, p, Reason::from("Send Publish message error, Tx is closed")))
} else {
Ok(())
}
Expand All @@ -335,7 +334,7 @@ impl SessionState {
}
} else {
log::warn!("{:?} Message Sender is None", self.id);
Err((from, p, "Message Sender is None"))
Err((from, p, Reason::from("Send Publish message error, Tx is None")))
};

if let Err((from, p, reason)) = res {
Expand All @@ -344,7 +343,7 @@ impl SessionState {
.extends
.hook_mgr()
.await
.message_dropped(Some(self.id.clone()), from, p, Reason::from_static(reason))
.message_dropped(Some(self.id.clone()), from, p, reason)
.await;
}
}
Expand Down Expand Up @@ -414,12 +413,7 @@ impl SessionState {
.extends
.hook_mgr()
.await
.message_dropped(
Some(self.id.clone()),
from,
publish,
Reason::from_static("message is expired"),
)
.message_dropped(Some(self.id.clone()), from, publish, Reason::MessageExpiration)
.await;
return Ok(());
}
Expand Down Expand Up @@ -581,7 +575,9 @@ impl SessionState {
match self.publish(Publish::try_from(publish)?).await {
Err(e) => {
Metrics::instance().client_publish_error_inc();
self.client.add_disconnected_reason(Reason::from(format!("Publish failed, {:?}", e))).await;
self.client
.add_disconnected_reason(Reason::PublishFailed(ByteString::from(e.to_string())))
.await;
Err(e)
}
Ok(false) => {
Expand All @@ -597,7 +593,9 @@ impl SessionState {
match self.publish(Publish::try_from(publish)?).await {
Err(e) => {
Metrics::instance().client_publish_error_inc();
self.client.add_disconnected_reason(Reason::from(format!("Publish failed, {:?}", e))).await;
self.client
.add_disconnected_reason(Reason::PublishFailed(ByteString::from(e.to_string())))
.await;
Err(e)
}
Ok(false) => {
Expand All @@ -623,15 +621,7 @@ impl SessionState {
.extends
.hook_mgr()
.await
.message_dropped(
None,
self.id.clone(),
publish,
Reason::from(format!(
"hook::message_publish_check_acl, publish rejected, disconnect:{}",
disconnect
)),
)
.message_dropped(None, self.id.clone(), publish, Reason::PublishRefused)
.await;
return if disconnect {
Err(MqttError::from(
Expand Down Expand Up @@ -985,11 +975,10 @@ impl ClientInfo {
)),
);

if let Some(reason) = self.get_disconnected_reason().await {
json.insert("disconnected_reason".into(), serde_json::Value::String(reason.to_string()));
} else {
json.insert("disconnected_reason".into(), serde_json::Value::Null);
}
json.insert(
"disconnected_reason".into(),
serde_json::Value::String(self.get_disconnected_reason().await.to_string()),
);

json.insert(
"extra_attrs".into(),
Expand Down Expand Up @@ -1043,20 +1032,17 @@ impl ClientInfo {
}

pub(crate) async fn set_mqtt_disconnect(&self, d: Disconnect) {
if let Some(r) = d.reason() {
self.add_disconnected_reason(r.clone()).await;
}
self.disconnect.write().await.replace(d);
}

#[inline]
pub async fn get_disconnected_reason(&self) -> Option<Reason> {
let reason = self.disconnected_reason.read().await;
if reason.is_empty() {
None
} else {
Some(Reason::from(reason.join(",")))
}
pub async fn get_disconnected_reason(&self) -> Reason {
Reason::Reasons(self.disconnected_reason.read().await.clone())
}

#[inline]
pub async fn take_disconnected_reason(&self) -> Reason {
Reason::Reasons(self.disconnected_reason.write().await.drain(..).collect())
}

#[inline]
Expand Down

0 comments on commit d08ebe7

Please sign in to comment.