Skip to content

Commit

Permalink
Disconnect at pub qos > server max qos
Browse files Browse the repository at this point in the history
- "If the Server included a Maximum QoS in its CONNACK response
to a Client and it receives a PUBLISH packet with a QoS greater than this
then it uses DISCONNECT with Reason Code 0x9B (QoS not supported)"
- only affects mqtt v5, server max qos is 1
  • Loading branch information
ChunyiLyu committed Mar 3, 2023
1 parent 290974d commit 3f7f5cf
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
20 changes: 17 additions & 3 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Expand Up @@ -263,7 +263,7 @@ process_connect(State0) ->

-spec process_packet(mqtt_packet(), state()) ->
{ok, state()} |
{stop, disconnect, state()} |
{stop, {disconnect, client_initiated | server_initiated}, state()} |
{error, Reason :: term(), state()}.
process_packet(Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = Type}},
State = #state{auth_state = #auth_state{}})
Expand All @@ -272,7 +272,7 @@ process_packet(Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = Type}},

-spec process_request(packet_type(), mqtt_packet(), state()) ->
{ok, state()} |
{stop, disconnect, state()} |
{stop, {disconnect, client_initiated | server_initiated}, state()} |
{error, Reason :: term(), state()}.
process_request(?PUBACK,
#mqtt_packet{variable = #mqtt_packet_puback{packet_id = PacketId}},
Expand All @@ -293,6 +293,20 @@ process_request(?PUBACK,
{ok, State}
end;

%% MQTT 5 spec 3.3.1.2 QoS
%% If the Server included a Maximum QoS in its CONNACK response
%% to a Client and it receives a PUBLISH packet with a QoS greater than this
%% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported).
process_request(?PUBLISH,
#mqtt_packet{fixed = #mqtt_packet_fixed{qos = ?QOS_2}},
State = #state{cfg = #cfg{
proto_ver = ?MQTT_PROTO_V5,
client_id = ClientID}}) ->
?LOG_INFO("Terminating MQTT connection. QoS not supported, client ID: ~s, "
"protocol version: ~p, QoS: ~p",
[ClientID, ?MQTT_PROTO_V5, ?QOS_2]),
send_disconnect(?RC_QOS_NOT_SUPPORTED, State),
{stop, {disconnect, server_initiated}, State};
process_request(?PUBLISH,
#mqtt_packet{
fixed = #mqtt_packet_fixed{qos = Qos,
Expand Down Expand Up @@ -424,7 +438,7 @@ process_request(?PINGREQ, #mqtt_packet{}, State = #state{cfg = #cfg{client_id =

process_request(?DISCONNECT, #mqtt_packet{}, State) ->
?LOG_DEBUG("Received a DISCONNECT"),
{stop, disconnect, State}.
{stop, {disconnect, client_initiated}, State}.

check_protocol_version(ProtoVersion) ->
case lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)) of
Expand Down
5 changes: 4 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Expand Up @@ -362,7 +362,10 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
{error, Reason, ProcState1} ->
?LOG_ERROR("MQTT protocol error on connection ~ts: ~tp", [ConnName, Reason]),
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
{stop, disconnect, ProcState1} ->
{stop, {disconnect, server_initiated} = Reason, ProcState1} ->
?LOG_ERROR("MQTT protocol error on connection ~ts: ~tp", [ConnName, Reason]),
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
{stop, {disconnect, client_initiated}, ProcState1} ->
{stop, normal, {_SendWill = false, pstate(State, ProcState1)}}
end
end;
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbitmq_mqtt/test/v5_SUITE.erl
Expand Up @@ -39,7 +39,8 @@ cluster_size_1_tests() ->
client_set_max_packet_size_connack,
client_set_max_packet_size_invalid,
message_expiry_interval,
message_expiry_interval_will_message
message_expiry_interval_will_message,
client_publish_qos2
].

cluster_size_3_tests() ->
Expand Down Expand Up @@ -205,6 +206,14 @@ message_expiry_interval_will_message(Config) ->
assert_nothing_received(),
ok = emqtt:disconnect(Sub2).

client_publish_qos2(Config) ->
Topic = ClientId = atom_to_binary(?FUNCTION_NAME),
{C, Connect} = start_client(ClientId, Config, 0, []),
ok, {_, Props} = Connect(C),
?assertEqual(1, maps:get('Maximum-QoS', Props)),
error, {_, Response} = emqtt:publish(C, Topic, #{}, <<"msg">>, [{qos, 2}]),
?assertEqual({disconnected, 155, #{}}, Response).

satisfy_bazel(_Config) ->
ok.

Expand Down
5 changes: 4 additions & 1 deletion deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Expand Up @@ -285,7 +285,10 @@ handle_data1(Data, State = #state{socket = Socket,
proc_state = ProcState1});
{error, Reason, _} ->
stop_mqtt_protocol_error(State, Reason, ConnName);
{stop, disconnect, ProcState1} ->
{stop, {disconnect, server_initiated}, _} ->
self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect},
{[], State};
{stop, {disconnect, client_initiated}, ProcState1} ->
stop({_SendWill = false, State#state{proc_state = ProcState1}})
end
end;
Expand Down

0 comments on commit 3f7f5cf

Please sign in to comment.