Skip to content

Commit

Permalink
Improve MQTT V5 protocol: Assigned client identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 20, 2023
1 parent 4822e23 commit 0e08d49
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions rmqtt/src/broker/v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::net::SocketAddr;

use ntex_mqtt::v5;
use ntex_mqtt::v5::codec::{Auth, DisconnectReasonCode};
use uuid::Uuid;

use crate::broker::executor::get_handshake_exec;
use crate::broker::{inflight::MomentStatus, types::*};
Expand Down Expand Up @@ -36,7 +37,7 @@ async fn refused_ack<Io>(
#[inline]
pub async fn handshake<Io: 'static>(
listen_cfg: Listener,
handshake: v5::Handshake<Io>,
mut handshake: v5::Handshake<Io>,
remote_addr: SocketAddr,
local_addr: SocketAddr,
) -> Result<v5::HandshakeAck<Io, SessionState>, MqttError> {
Expand All @@ -48,6 +49,14 @@ pub async fn handshake<Io: 'static>(
listen_cfg
);

let assigned_client_id = if handshake.packet().client_id.is_empty() {
handshake.packet_mut().client_id =
ClientId::from(Uuid::new_v4().as_simple().encode_lower(&mut Uuid::encode_buffer()).to_owned());
true
} else {
false
};

let id = Id::new(
Runtime::instance().node.id(),
Some(local_addr),
Expand All @@ -59,7 +68,7 @@ pub async fn handshake<Io: 'static>(
Runtime::instance().stats.handshakings.max_max(handshake.handshakings());

let exec = get_handshake_exec(local_addr.port(), listen_cfg.clone());
match exec.spawn(_handshake(id.clone(), listen_cfg, handshake)).await {
match exec.spawn(_handshake(id.clone(), listen_cfg, handshake, assigned_client_id)).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => {
log::warn!("{:?} Connection Refused, handshake error, reason: {:?}", id, e);
Expand All @@ -77,6 +86,7 @@ pub async fn _handshake<Io: 'static>(
id: Id,
listen_cfg: Listener,
mut handshake: v5::Handshake<Io>,
is_assigned_client_id: bool,
) -> Result<v5::HandshakeAck<Io, SessionState>, MqttError> {
let connect_info = ConnectInfo::V5(id.clone(), Box::new(handshake.packet().clone()));

Expand Down Expand Up @@ -227,6 +237,7 @@ pub async fn _handshake<Io: 'static>(
let max_packet_size = state.fitter.max_packet_size();
let shared_subscription_available =
Runtime::instance().extends.shared_subscription().await.is_supported(&state.listen_cfg);
let assigned_client_id = if is_assigned_client_id { Some(state.id.client_id.clone()) } else { None };
Ok(handshake.ack(state).keep_alive(keep_alive).with(|ack: &mut v5::codec::ConnectAck| {
ack.session_present = session_present;
ack.server_keepalive_sec = Some(server_keepalive_sec);
Expand All @@ -235,12 +246,11 @@ pub async fn _handshake<Io: 'static>(
ack.max_qos = Some(max_qos);
ack.retain_available = Some(retain_available);
ack.max_packet_size = Some(max_packet_size);
//ack.assigned_client_id = None; //@TODO ... If the client ID is assigned by the broker, the server needs to return the client ID to the terminal.
ack.assigned_client_id = assigned_client_id;
ack.topic_alias_max = 0; //@TODO ...
ack.wildcard_subscription_available = Some(true);
ack.subscription_identifiers_available = Some(false);
ack.shared_subscription_available = Some(shared_subscription_available);

log::debug!("{:?} handshake.ack: {:?}", id, ack);
}))
}
Expand Down

0 comments on commit 0e08d49

Please sign in to comment.