Skip to content

Commit

Permalink
Fix: "Max QOS in Connect ACK Protocol Question" #54
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Feb 7, 2024
1 parent 930f95e commit f0ba628
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
12 changes: 4 additions & 8 deletions rmqtt-bin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ async fn listen(name: String, listen_cfg: &Listener) -> Result<()> {
let max_inflight = listen_cfg.max_inflight.get() as usize;
let handshake_timeout = listen_cfg.handshake_timeout();
let max_size = listen_cfg.max_packet_size.as_u32();
let max_qos = listen_cfg.max_qos_allowed;
ntex::server::Server::build()
.bind(name, listen_cfg.addr, move || {
MqttServer::new()
Expand Down Expand Up @@ -163,7 +162,7 @@ async fn listen(name: String, listen_cfg: &Listener) -> Result<()> {
.receive_max(max_inflight as u16)
.handshake_timeout(handshake_timeout)
.max_size(max_size)
.max_qos(max_qos)
// .max_qos(max_qos)
//.max_topic_alias(max_topic_alias),
.publish(fn_factory_with_config(|session: v5::Session<SessionState>| {
ok::<_, MqttError>(fn_service(move |req| publish_v5(session.clone(), req)))
Expand Down Expand Up @@ -218,7 +217,6 @@ async fn listen_tls(name: String, listen_cfg: &Listener) -> Result<()> {
let max_inflight = listen_cfg.max_inflight.get() as usize;
let handshake_timeout = listen_cfg.handshake_timeout();
let max_size = listen_cfg.max_packet_size.as_u32();
let max_qos = listen_cfg.max_qos_allowed;
ntex::server::Server::build()
.bind(name, listen_cfg.addr, move || {
pipeline_factory(tls_acceptor.clone())
Expand Down Expand Up @@ -283,7 +281,7 @@ async fn listen_tls(name: String, listen_cfg: &Listener) -> Result<()> {
.receive_max(max_inflight as u16)
.handshake_timeout(handshake_timeout)
.max_size(max_size)
.max_qos(max_qos)
// .max_qos(max_qos)
//.max_topic_alias(max_topic_alias)
.publish(fn_factory_with_config(|session: v5::Session<SessionState>| {
ok::<_, MqttError>(fn_service(move |req| {
Expand Down Expand Up @@ -328,7 +326,6 @@ async fn listen_ws(name: String, listen_cfg: &Listener) -> Result<()> {
let max_inflight = listen_cfg.max_inflight.get() as usize;
let handshake_timeout = listen_cfg.handshake_timeout();
let max_size = listen_cfg.max_packet_size.as_u32();
let max_qos = listen_cfg.max_qos_allowed;
ntex::server::Server::build()
.bind(name, listen_cfg.addr, move || {
pipeline_factory(ws::WSServer::new(Duration::from_secs(handshake_timeout as u64))).and_then(
Expand Down Expand Up @@ -385,7 +382,7 @@ async fn listen_ws(name: String, listen_cfg: &Listener) -> Result<()> {
.receive_max(max_inflight as u16)
.handshake_timeout(handshake_timeout)
.max_size(max_size)
.max_qos(max_qos)
// .max_qos(max_qos)
//.max_topic_alias(max_topic_alias),
.publish(fn_factory_with_config(|session: v5::Session<SessionState>| {
ok::<_, MqttError>(fn_service(move |req| publish_v5(session.clone(), req)))
Expand Down Expand Up @@ -441,7 +438,6 @@ async fn listen_wss(name: String, listen_cfg: &Listener) -> Result<()> {
let max_inflight = listen_cfg.max_inflight.get() as usize;
let handshake_timeout = listen_cfg.handshake_timeout();
let max_size = listen_cfg.max_packet_size.as_u32();
let max_qos = listen_cfg.max_qos_allowed;
ntex::server::Server::build()
.bind(name, listen_cfg.addr, move || {
pipeline_factory(tls_acceptor.clone())
Expand Down Expand Up @@ -504,7 +500,7 @@ async fn listen_wss(name: String, listen_cfg: &Listener) -> Result<()> {
.receive_max(max_inflight as u16)
.handshake_timeout(handshake_timeout)
.max_size(max_size)
.max_qos(max_qos)
// .max_qos(max_qos)
//.max_topic_alias(max_topic_alias)
.publish(fn_factory_with_config(|session: v5::Session<SessionState>| {
ok::<_, MqttError>(fn_service(move |req| publish_v5(session.clone(), req)))
Expand Down
10 changes: 7 additions & 3 deletions rmqtt/src/broker/v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn _handshake<Io: 'static>(
is_assigned_client_id: bool,
) -> Result<v5::HandshakeAck<Io, SessionState>, MqttError> {
let connect_info = Arc::new(ConnectInfo::V5(id.clone(), Box::new(handshake.packet().clone())));

log::debug!("handshake.packet(): {:?}", handshake.packet());
//hook, client connect
let _user_props = Runtime::instance().extends.hook_mgr().await.client_connect(&connect_info).await;

Expand Down Expand Up @@ -274,7 +274,11 @@ pub async fn _handshake<Io: 'static>(
let id = state.id.clone();
let session_expiry_interval_secs = packet.session_expiry_interval_secs;
let server_keepalive_sec = packet.keep_alive;
let max_qos = state.listen_cfg().max_qos_allowed;
let max_qos = if state.listen_cfg().max_qos_allowed == QoS::ExactlyOnce {
None
} else {
Some(state.listen_cfg().max_qos_allowed)
};
let retain_available = Runtime::instance().extends.retain().await.is_supported(state.listen_cfg());
let max_server_packet_size = state.listen_cfg().max_packet_size.as_u32();
let shared_subscription_available =
Expand All @@ -285,7 +289,7 @@ pub async fn _handshake<Io: 'static>(
ack.server_keepalive_sec = Some(server_keepalive_sec);
ack.session_expiry_interval_secs = session_expiry_interval_secs;
ack.receive_max = Some(max_inflight);
ack.max_qos = Some(max_qos);
ack.max_qos = max_qos;
ack.retain_available = Some(retain_available);
ack.max_packet_size = Some(max_server_packet_size);
ack.assigned_client_id = assigned_client_id;
Expand Down

0 comments on commit f0ba628

Please sign in to comment.