Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Errors when updating moquette-broker from 0.15 to 0.16 #705

Closed
Bas83 opened this issue Jan 20, 2023 · 16 comments · Fixed by #708
Closed

Errors when updating moquette-broker from 0.15 to 0.16 #705

Bas83 opened this issue Jan 20, 2023 · 16 comments · Fixed by #708
Assignees

Comments

@Bas83
Copy link

Bas83 commented Jan 20, 2023

Running on JVM 17, yesterday we updated moquette-broker from 0.15 to 0.16 and started seeing these errors in our logs:

Error processing protocol message: PUBLISH

logger name: io.moquette.broker.NewNettyMQTTHandler

java.lang.NullPointerException: Cannot invoke "String.hashCode()" because the return value of "io.moquette.broker.SessionCommand.getSessionId()" is null
	at io.moquette.broker.PostOffice.routeCommand(PostOffice.java:632)
	at io.moquette.broker.MQTTConnection.processPublish(MQTTConnection.java:427)
	at io.moquette.broker.MQTTConnection.handleMessage(MQTTConnection.java:80)
	at io.moquette.broker.NewNettyMQTTHandler.channelRead(NewNettyMQTTHandler.java:58)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        ...
CONNACK send failed, cleanup session and close the connection

logger name: io.moquette.broker.MQTTConnection

io.netty.channel.StacklessClosedChannelException: null
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)

And before ánd after this:

An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
 It usually means the last handler in the pipeline did not handle the exception.

logger name: io.netty.channel.DefaultChannelPipeline

io.netty.channel.StacklessClosedChannelException: null
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)

Also, our consuming application started reporting errors so it wasn't just some heads-ups but things were actually failing.

Mind you, these didn't start happening immediately but took like half a day to appear.

Any idea what this could be?

@andsel andsel self-assigned this Jan 24, 2023
@andsel
Copy link
Collaborator

andsel commented Jan 24, 2023

This happens because routeCommand method receives a null clientId

* */
public RouteResult routeCommand(String clientId, String actionDescription, Callable<String> action) {
SessionCommand cmd = new SessionCommand(clientId, action);
final int targetQueueId = Math.abs(cmd.getSessionId().hashCode()) % this.eventLoops;
.
That clientId is extracted from the packet, do you have any logs around the time the problem manifest?

@Bas83
Copy link
Author

Bas83 commented Jan 24, 2023

@andsel I'm afraid not, definitely not logs of the contents of packets. We have a number of clients integrating with us. Do you think it a pure coincidence that this started happening after this update, by some misbehaving client, or is there new code that could have affected this?

@Bas83
Copy link
Author

Bas83 commented Jan 24, 2023

@andsel This is the order of things happening:
image

@andsel
Copy link
Collaborator

andsel commented Jan 24, 2023

With version 0.16 was introduced event loops to isolate processing of session's requests. Now sessions are sticky to a session event loop. Same loop can handle many sessions.
Every loop is a thread, and we have as many loop as many CPU cores.
Every loop has a command queue, which contains SessionCommands to be executed. So the routing method has the responsibility to create the session command and enqueue to the appropriate queue.

So your problem is not a coincidence, because before this code wasn't present.
The motivation of its introduction is to simplify concurrency management in Moquette.

So the problem could:

  • or the client is misbehaving the client Id
  • or there is a bug in code that doesn't decode or move forward the clientId

@Bas83
Copy link
Author

Bas83 commented Jan 24, 2023

@andsel also we saw many additional warning logs that did not accompany those exceptions, while we didn't see those logs occur before.

image

@Bas83
Copy link
Author

Bas83 commented Jan 24, 2023

That clientId is extracted from the packet

Do you not already know the clientId from the open connection itself? Does it not make sense to get the clientId from a more permanent place to avoid misbehaving clients, if that is a possibility?

@andsel
Copy link
Collaborator

andsel commented Jan 24, 2023

Do you have a plain test form of the logs to share?

@andsel
Copy link
Collaborator

andsel commented Jan 24, 2023

Does it not make sense to get the clientId

ClientId (if present) is passed by the client to the server during the CONNECT, then the broker stores the clientID in MQTTConnection and in Session.

@hylkevds
Copy link
Collaborator

Is it possible for the Publish packet to be handled before the code is run that sets the ClientID on the Channel?

channel.writeAndFlush(ackMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOG.trace("CONNACK sent, channel: {}", channel);
if (!result.session.completeConnection()) {
// send DISCONNECT and close the channel
final MqttMessage disconnectMsg = MqttMessageBuilders.disconnect().build();
channel.writeAndFlush(disconnectMsg).addListener(CLOSE);
LOG.warn("CONNACK is sent but the session created can't transition in CONNECTED state");
} else {
NettyUtils.clientID(channel, clientIdUsed);
connected = true;
// OK continue with sending queued messages and normal flow
if (result.mode == SessionRegistry.CreationModeEnum.REOPEN_EXISTING) {
result.session.sendQueuedMessagesWhileOffline();
}
initializeKeepAliveTimeout(channel, msg, clientIdUsed);
setupInflightResender(channel);
postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg);
}
} else {
bindedSession.disconnect();
sessionRegistry.remove(bindedSession.getClientID());
LOG.error("CONNACK send failed, cleanup session and close the connection", future.cause());
channel.close();
}
}
});

(Actual setting in line 215 and 223)

Maybe we should move the call to NettyUtils.clientID(channel, clientIdUsed); directly in executeConnect and not in that ChannelFutureListener?

@Bas83
Copy link
Author

Bas83 commented Jan 24, 2023

Do you have a plain test form of the logs to share?

I had to do some redaction but this is what I have:

{
	"message": "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.",
	"attributes": {
		"@timestamp": "2023-01-20T02:07:56.862Z",
		"level": "WARN",
		"thread_name": "nioEventLoopGroup-3-2",
		"level_value": 30000,
		"logger": {
			"name": "io.netty.channel.DefaultChannelPipeline"
		},
		"@version": "1",
		"logger_name": "io.netty.channel.DefaultChannelPipeline",
		"stack_trace": "io.netty.channel.StacklessClosedChannelException: null\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)\n",
		"env": "production"
	}
}

{
	"message": "Error processing protocol message: PUBLISH",
	"attributes": {
		"msg": {
			"type": "[PUBLISH]"
		},
		"@timestamp": "2023-01-20T02:07:56.903Z",
		"level": "ERROR",
		"thread_name": "nioEventLoopGroup-3-1",
		"level_value": 40000,
		"logger": {
			"name": "io.moquette.broker.NewNettyMQTTHandler"
		},
		"@version": "1",
		"channel": "[id: 0xa08d1d44, L:/10.0.2.68:1883 - R:/10.0.2.243:45852]",
		"logger_name": "io.moquette.broker.NewNettyMQTTHandler",
		"stack_trace": "java.lang.NullPointerException: Cannot invoke "String.hashCode()" because the return value of "io.moquette.broker.SessionCommand.getSessionId()" is null
	at io.moquette.broker.PostOffice.routeCommand(PostOffice.java:632)
	at io.moquette.broker.MQTTConnection.processPublish(MQTTConnection.java:427)
	at io.moquette.broker.MQTTConnection.handleMessage(MQTTConnection.java:80)
	at io.moquette.broker.NewNettyMQTTHandler.channelRead(NewNettyMQTTHandler.java:58)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.moquette.broker.metrics.MQTTMessageLogger.channelRead(MQTTMessageLogger.java:47)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.moquette.broker.metrics.MessageMetricsHandler.channelRead(MessageMetricsHandler.java:50)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.moquette.broker.metrics.BytesMetricsHandler.channelRead(BytesMetricsHandler.java:51)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)",
		"env": "production"
	}
}

{
	"message": "Closed client channel due to exception in processing",
	"attributes": {
		"msg": {
			"type": "[PUBLISH]"
		},
		"@timestamp": "2023-01-20T02:07:56.905Z",
		"level": "INFO",
		"thread_name": "nioEventLoopGroup-3-1",
		"level_value": 20000,
		"logger": {
			"name": "io.moquette.broker.NewNettyMQTTHandler"
		},
		"@version": "1",
		"channel": "[id: 0xa08d1d44, L:/10.0.2.68:1883 - R:/10.0.2.243:45852]",
		"logger_name": "io.moquette.broker.NewNettyMQTTHandler",
		"env": "production"
	}
}

{
	"message": "CONNACK send failed, cleanup session and close the connection",
	"attributes": {
		"@timestamp": "2023-01-20T02:07:57Z",
		"level": "ERROR",
		"thread_name": "nioEventLoopGroup-3-1",
		"level_value": 40000,
		"logger": {
			"name": "io.moquette.broker.MQTTConnection"
		},
		"@version": "1",
		"logger_name": "io.moquette.broker.MQTTConnection",
		"stack_trace": "io.netty.channel.StacklessClosedChannelException: null\n\tat io.netty.channel.AbstractChannel$AbstractUnsafe.write(Object, ChannelPromise)(Unknown Source)\n",
		"env": "production"
		}
}

@andsel
Copy link
Collaborator

andsel commented Jan 24, 2023

@hylkevds

Maybe we should move the call to NettyUtils.clientID(channel, clientIdUsed); directly in executeConnect and not in that ChannelFutureListener?

Yes that actually would be safer, however I don't think it's related to this. The publish on the new connection is done after it set the clientId in the channel's attributes:

result.session.sendQueuedMessagesWhileOffline();

The idea was to set up all connection related stuff after we the broker has a working connection with the client, so setting the clientId into the attributes after the broker was successful in sending the CONNACK.

@hylkevds
Copy link
Collaborator

That's for sending queued messages with cleanSession=false.
But what happens when the client sends a new publish before the connect handshake is finished? Before that ChannelFutureListener has fired... is that possible?

@andsel
Copy link
Collaborator

andsel commented Jan 25, 2023

If cleanSession=true means that the client is not yet subscribed to any topic.If I recall correctly the MQTT specification says that a client is not allowed to send any other message other than a DISCONNECT until it receives a CONNACK.
@hylkevds your case could manifest when a client sends a CONNECT, send a SUBSCRIBE before the receiving CONNACK, and another client sends PUBLISH that match the topic.
Now I don't recall exactly if the protocol prohibits to send SUBSCRIBE/PUBSLISH after a CONNECT and before a CONNACK. In case Moquette should enforce it, it should close the connection if the connecting client is sending a PUB on an MQTTConnect which is not yet transitioned to connected state (aka connected = true;).

@hylkevds
Copy link
Collaborator

If I understand the exception right, in this case it is the client itself that publishes, not another client.

So is it possible for a client to send a PUBLISH before the CONNACK has gone fully through? Considering that from the client perspective the CONNACK may have been sent, but not processed yet due to our short buffering?

@andsel
Copy link
Collaborator

andsel commented Jan 25, 2023

From [MQTT-3.1.4-5] in MQTT 3.1.1 spec http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet; 
Clients need not wait for a CONNACK Packet to arrive from the Server. If the Server rejects the CONNECT, 
it MUST NOT process any data sent by the Client after the CONNECT Packet [MQTT-3.1.4-5]

Non normative comment
Clients typically wait for a CONNACK Packet, However, if the Client exploits its freedom to send 
Control Packets before it receives a CONNACK, it might simplify the Client implementation as 
it does not have to police the connected state. The Client accepts that any data that it sends before 
it receives a CONNACK packet from the Server will not be processed if the Server rejects the connection.

So this means that a PUB could be sent by the client after the CONNECT and before it receives the CONNACK. So this means that the clientId has to be setup in the Channel attributes before the server reply with a CONNACK.

@andsel
Copy link
Collaborator

andsel commented Jan 25, 2023

So this is a bug and the solution is to move the clientID assignment into Channel's attributes before the CONNACK message is written to the Channel itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants