Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature disconnects #744

Merged
merged 17 commits into from
Jun 3, 2023
Merged

Feature disconnects #744

merged 17 commits into from
Jun 3, 2023

Conversation

source-c
Copy link
Contributor

@source-c source-c commented Apr 29, 2023

Also fixed an issue of race that caused exception on connect, that is very easy to reproduce:

import paho.mqtt.client as mqtt
mqttBroker = "127.0.0.1"

client = mqtt.Client(client_id=b"Temperature_Inside")
client.connect(mqttBroker)
res = client.publish("/TEMPERATURE", randNumber)

that causes:

20:33:56.213 [nioEventLoopGroup-13-1] DEBUG io.moquette.broker.MQTTConnection -- Received MQTT message, type: CONNECT
20:33:56.214 [nioEventLoopGroup-13-1] DEBUG io.moquette.broker.PostOffice -- Routing cmd [CONN] for session [Temperature_Inside] to event processor 7
20:33:56.215 [nioEventLoopGroup-13-1] DEBUG io.moquette.broker.metrics.MQTTMessageLogger -- C->B PUBLISH <null> to topics </TEMPERATURE>
20:33:56.215 [nioEventLoopGroup-13-1] DEBUG io.moquette.broker.MQTTConnection -- Received MQTT message, type: PUBLISH
20:33:56.216 [nioEventLoopGroup-13-1] ERROR io.moquette.broker.NewNettyMQTTHandler -- Error processing protocol message: PUBLISH
java.lang.NullPointerException: Cannot invoke "String.hashCode()" because the return value of "io.moquette.broker.SessionCommand.getSessionId()" is null
        at io.moquette.broker.PostOffice.routeCommand(PostOffice.java:632)
        at io.moquette.broker.MQTTConnection.processPublish(MQTTConnection.java:427)
        at io.moquette.broker.MQTTConnection.handleMessage(MQTTConnection.java:80)
        at io.moquette.broker.NewNettyMQTTHandler.channelRead(NewNettyMQTTHandler.java:58)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
....

The disconnection feature works like this (traces):

======> MQTT Clients: ({:id Temperature_Inside, :address 127.0.0.1, :port 54641})
======> MQTT Advanced Broker disconnecting .........
20:05:45.234 [main] DEBUG com.dkdhub.mqtt_broker.AdvancedBroker -- Will disconnect client Temperature_Inside
20:05:45.234 [main] DEBUG io.moquette.broker.SessionRegistry -- Disconnecting client: Temperature_Inside
20:05:45.235 [main] DEBUG io.moquette.broker.SessionRegistry -- Remove session state for client Temperature_Inside
20:05:45.235 [main] DEBUG io.moquette.broker.SessionRegistry -- Client Temperature_Inside successfully disconnected from broker
20:05:45.235 [main] DEBUG com.dkdhub.mqtt_broker.AdvancedBroker -- Disconnect result: true
======> MQTT Clients: ()
20:05:45.236 [nioEventLoopGroup-3-1] INFO io.moquette.broker.metrics.MQTTMessageLogger -- Channel Inactive
20:05:45.236 [nioEventLoopGroup-3-1] DEBUG io.moquette.broker.MQTTConnection -- Notifying connection lost event
20:05:45.237 [nioEventLoopGroup-3-1] DEBUG io.moquette.broker.SessionEventLoopGroup -- Routing cmd [CONN LOST] for session [Temperature_Inside] to event processor 7
20:05:45.237 [Session Executor 7] DEBUG io.moquette.broker.MQTTConnection -- Cleaning Temperature_Inside
20:05:45.237 [Session Executor 7] DEBUG io.moquette.interception.BrokerInterceptor -- Notifying unexpected MQTT client disconnection to interceptor CId=Temperature_Inside, username=null, interceptorId=3456
20:05:45.238 [pool-2-thread-1] DEBUG basic -- MQTT: client  #object[clojure.core$_PLUS_ 0x46f9cda1 clojure.core$_PLUS_@46f9cda1] Temperature_Inside #object[clojure.core$_PLUS_ 0x46f9cda1 clojure.core$_PLUS_@46f9cda1]  disconnected (lost)

We maintain a clojure library that wraps moquette, so the tests are written in clojure (actually this functionality is at trunk now and awaits for upstream updates), however they are way simple, so you may take a look by yourself - https://github.com/dkdhub/clj-mqtt-broker/blob/3d59a2244afec43ae2a3522ac9173ae683a9dc3d/test/basic.clj#L129

Looking forward to hearing from you!

@source-c
Copy link
Contributor Author

source-c commented May 1, 2023

@andsel may I ask you to take a look at this please? Or to assign this request to someone who may address this?

Small note in regards to CONNECT+PUBLISH issue: the root cause is that the implementation mainly relies on NettyUtils.clientID(channel) call, that requires channel to be associated with the connection. At the moment of CONNECT the new association creation is implemented by firing out the FutureTask instance and since immediate publishing is not restricted by MQTT spec - the message can not be processed and should be ignored. It is enough to return PostOffice.RouteResult.failed for the case. That oneline fix is embedded into this request.

Thank you.

@andsel andsel self-requested a review May 3, 2023 07:20
@andsel
Copy link
Collaborator

andsel commented May 3, 2023

Hi @source-c
thanks for your contribution. To ease the review, discussion an tracking of the issues I would ask a couple things.
Please:

@source-c
Copy link
Contributor Author

source-c commented May 3, 2023

Thank you, @andsel. Both PRs were modified in accordance to your comments.

@andsel
Copy link
Collaborator

andsel commented May 3, 2023

Thanks @source-c I'll review in the next days, thank you for contributing to the project. I'll comment on them, if I have any doubts.

@andsel
Copy link
Collaborator

andsel commented May 6, 2023

@source-c #743 has been merged on main so you can rebase this, please

@source-c
Copy link
Contributor Author

Hi there! @andsel, I see ubuntu-18.04 have were all failed with no run, twice. Is there a chance for this commit to be merged to the main without that?

@andsel
Copy link
Collaborator

andsel commented May 17, 2023

Hi @source-c yes it's possible, but I have to review it and had no time in the past weeks :-( I'll try to do it during next weekend

@source-c
Copy link
Contributor Author

Hi @source-c yes it's possible, but I have to review it and had no time in the past weeks :-( I'll try to do it during next weekend

Oh, fine then. Thank you!

@andsel
Copy link
Collaborator

andsel commented May 19, 2023

Hi @source-c
I tried to understand the original statement problem, the one that report an NPE on command routing.
From the comment is not clear why the publish should be processed before the connect terminates his association.
In Moquette all MQTT messages are wrapped as commands and executed by an event loop.
If in one TCP connection a client send a CONNECT and a PUBLISH the two are serialized, so the connect correctly associates the clientId to the Netty's channel.

I tried to reproduce with following (run it with jbang), but haven't luck to reproduce your issue.

///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5
package io.moquette.testembedded;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class TestClient {
    public static void main(String[] args) throws MqttException {
        MqttClientPersistence dataStore = new MemoryPersistence();
        MqttClient client = new MqttClient("tcp://localhost:1883", "TestClient", dataStore);

        client.connect();
        client.publish("/temperature", new byte[] {0x14}, 0, false);
        client.disconnect();
        client.close();
    }
}

Please, could you expand your explanation of the problem?

@source-c
Copy link
Contributor Author

source-c commented May 19, 2023

Seems, you cannot reproduce that because of implementation differences:

org.eclipse.paho.client.mqttv3.MqttClient:

aClient.connect(options, null, null).waitForCompletion(getTimeToWait());

when paho.mqtt.client does just:

self.connect_async(host, port, keepalive, bind_address, bind_port, clean_start, properties)

you see there is no await at pythonic client (which is used very broadly).
So, when one uses such approach:

client = mqtt.Client(client_id=cid)
client.connect(mqttBroker)
res = client.publish(topic, value)

both CONNECT and PUBLISH may (and will) go almost simultaniously.
On the other hand we have association creation at broker, that yields FutureTask called by CONNECT that came.
For the next packet containing PUBLISH, broker searches for client id (at processPublish) using getClientId() which is in turn just a wrapper around the code:

NettyUtils.clientID(channel);

and this channel in turn is yet not completely initialized.
As the result we are getting an NPE there.

@andsel
Copy link
Collaborator

andsel commented May 20, 2023

I've tried also with Java Paho Async client and HiveMQ async, but neither of the two exposes such behavior.
Reading the spec fro MQTT 3.1.1 it doesn't prohibit such to send other control packets before having received the CONN_ACK, requirement [MQTT-3.1.4-5]

Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet; 
Clients need not wait for a CONNACK Packet to arrive from the Server. 
If the Server rejects the CONNECT, it MUST NOT process any data sent by the Client after the 
CONNECT Packet [MQTT-3.1.4-5].

although it describes this non normative comment:

Clients typically wait for a CONNACK Packet, However, if the Client exploits its freedom to send Control Packets before it receives a CONNACK, it might simplify the Client implementation as it does not have to police the connected state. The Client accepts that any data that it sends before it receives a CONNACK packet from the Server will not be processed if the Server rejects the connection.

So it seems that if the broker receives the PUB before sending the CONN_ACK, and the connection is accepted, then it has to process the PUB as if it's a valid.

I think that to implement such behavior we should move the assignment of clientId to channel, which happens in the session loop while processing the connect

NettyUtils.clientID(channel, clientId);
, to he point before enqueuing the command:
final String sessionId = clientId;

In this case:

  • if the CONNECT result with a positive result, then the PUBLISH is routed to the correct session loop and given the serialization of the commands, when the session loop start to process the PUBLISH all the session data is setup and both the session and the connection have valid data.
  • if the CONNECT result with a negative result, and closes the channel, when the session loop encounter any command (PUB, SUB etc) related to that channel/clientId, it doesn't have a channel/session created and should log an error or warning.

Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a request to change the layout of the code, plus I would ask to cover this with a unit test that verifies the behaviors:

  • dropping a connection for a non existing session.
  • dropping a connection for an existing session.

However it's not clear to me why SessionRegistry.closeSession is not enough for your use case.

BTW in the suggestion i removed

client.cleanUp();
pool.remove(clientId);

because are implicit in the call to purgeSessionState

distribution/pom.xml Outdated Show resolved Hide resolved
@source-c
Copy link
Contributor Author

I think that to implement such behavior we should move the assignment of clientId to channel, which happens in the session loop while processing the connect

moquette/broker/src/main/java/io/moquette/broker/MQTTConnection.java

Line 217 in ec4e84a
NettyUtils.clientID(channel, clientId);
, to he point before enqueuing the command:

moquette/broker/src/main/java/io/moquette/broker/MQTTConnection.java

Line 183 in ec4e84a
final String sessionId = clientId;

In this case:

  • if the CONNECT result with a positive result, then the PUBLISH is routed to the correct session loop and given the serialization of the commands, when the session loop start to process the PUBLISH all the session data is setup and both the session and the connection have valid data.
  • if the CONNECT result with a negative result, and closes the channel, when the session loop encounter any command (PUB, SUB etc) related to that channel/clientId, it doesn't have a channel/session created and should log an error or warning.

Of course, implementing that in such a way follows the spec more precisely. But it doesn't neglect that empty clientId case should be either be covered at the routing. At least to identify a case of collision/race if any. Also, I suppose changing that behaviour is to be done separately to avoid mess, does it?

@source-c
Copy link
Contributor Author

However it's not clear to me why SessionRegistry.closeSession is not enough for your use case.

There are a lot use cases when forcing disconnect is used under hard pressure and in accordance to security policies. Some of use cases already discussed at #747 BTW.

@andsel
Copy link
Collaborator

andsel commented May 20, 2023

changing that behaviour is to be done separately to avoid mess, does it?

Yes, it's better separate in another PR

@andsel
Copy link
Collaborator

andsel commented May 20, 2023

There are a lot use cases when forcing disconnect

I understand, but it's not clear to me if the intended effect of the new method should be like a standard close; which shutdown the MQTTConnection and clean the Session status (means topic subscriptions and queues) if needed (clean session = false).
Or does it has to remove all session status also if the session was a non clean session?
In the second case every reference to the session is brutally removed, while in the first is like a normal disconnection flow.

@source-c
Copy link
Contributor Author

source-c commented May 20, 2023

In the second case every reference to the session is brutally removed, while in the first is like a normal disconnection flow.

Unfortunatelly, second case is most prefered. We should remove all the session internals disregading its state. The state might be not clean, the channel might not be operational, server (broker) might be overwhelmed by unwanted (sometimes broken) requests. Forcing session drop (i.e. kicking off the client) we perserve server stability among other reasons, trying to keep it operational for all other clients.

@source-c
Copy link
Contributor Author

source-c commented May 20, 2023

However you are right - this should be decided by end user, so I remade cleanup contract and now session cleanup is to be specified by outer parameter explicitly.

@andsel andsel self-requested a review May 21, 2023 18:22
Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @source-c , I'm not a big fan of boolean flag parameter to customize the behavior of a method, I prefer to create 2 different methods that better express the intention.
On public API I avoid the feature flag, on internal if it generates more work and code duplication I could accept.

We are almost ready to go, please write a unit test to cover introduced functionality or let me know if you want I wrote it :-) adding some commits to this PR.

broker/src/main/java/io/moquette/broker/Server.java Outdated Show resolved Hide resolved
source-c and others added 3 commits May 22, 2023 11:32
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
Co-authored-by: Andrea Selva <selva.andre@gmail.com>
@source-c
Copy link
Contributor Author

source-c commented May 22, 2023

We are almost ready to go, please write a unit test to cover introduced functionality or let me know if you want I wrote it :-) adding some commits to this PR.

May I ask you to do it yourself please? It seems like we have quite different styling views, so I have no doubts your implementations will be much more clear than mine. :)
Thank you!

@andsel andsel merged commit 9c4769a into moquette-io:main Jun 3, 2023
4 checks passed
andsel added a commit to andsel/moquette that referenced this pull request Jun 3, 2023
@andsel andsel mentioned this pull request Jun 3, 2023
andsel added a commit that referenced this pull request Jun 3, 2023
@andsel andsel added the v0.17.0 label Aug 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

How to forcefully disconnect the client from the broker?
2 participants