Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.14: IllegalReferenceCountException: refCnt: 0 #573

Closed
hylkevds opened this issue Feb 4, 2021 · 10 comments · Fixed by #584 or #600
Closed

0.14: IllegalReferenceCountException: refCnt: 0 #573

hylkevds opened this issue Feb 4, 2021 · 10 comments · Fixed by #584 or #600

Comments

@hylkevds
Copy link
Collaborator

hylkevds commented Feb 4, 2021

This one is probably hard to track down. I've only seen it happen once in the last few days.
It did cause the connection to the client to be dropped.
I'm currently using the in-memory persistence store.

Expected behavior

No Exceptions

Actual behavior

10:02:45.602 [nioEventLoopGroup-3-1] ERROR i.m.broker.NewNettyMQTTHandler - Unexpected exception while processing MQTT message. Closing Netty channel. CId=Aggregator
io.netty.util.IllegalReferenceCountException: refCnt: 0
	at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1490)
	at io.netty.buffer.AbstractByteBuf.duplicate(AbstractByteBuf.java:1201)
	at io.netty.buffer.AbstractByteBuf.retainedDuplicate(AbstractByteBuf.java:1207)
	at io.moquette.broker.Session.resendInflightNotAcked(Session.java:308)
	at io.moquette.broker.MQTTConnection.resendNotAckedPublishes(MQTTConnection.java:502)
	at io.moquette.broker.NewNettyMQTTHandler.userEventTriggered(NewNettyMQTTHandler.java:112)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117)
	at io.netty.handler.codec.ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
	at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:117)
	at io.moquette.broker.MoquetteIdleTimeoutHandler.userEventTriggered(MoquetteIdleTimeoutHandler.java:48)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:346)
	at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:332)
	at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:324)
	at io.moquette.broker.InflightResender.resendNotAcked(InflightResender.java:160)
	at io.moquette.broker.InflightResender.access$100(InflightResender.java:32)
	at io.moquette.broker.InflightResender$WriterIdleTimeoutTask.run(InflightResender.java:58)
	at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

Steps to reproduce

See below

Minimal yet complete reproducer code (or URL to code) or complete log file

None yet

Moquette MQTT version

0.14 (with the patches from #568, #570, #572)

JVM version (e.g. java -version)

OpenJDK 11

OS version (e.g. uname -a)

Linux / Docker openjdk:11

@hylkevds
Copy link
Collaborator Author

hylkevds commented Feb 4, 2021

Ok, I can reproduce it:

  • Have a client that never sends a QOS1 Ack (Set a breakpoint in Paho:ComsCallback.java:422 )
  • Send a message
  • Wait for the resend to happen

I noticed it doesn't happen when using broker.internalPublish(mqttPublishMessage, clientId);, but there the opposite happens, unless I do a payload.release(); right after the internalPublish:

ERROR  i.n.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:96)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
	io.netty.buffer.ByteBufUtil.writeUtf8(ByteBufUtil.java:514)
	tests.moquettetests.Publisher.publishDirect(Publisher.java:92)
	tests.moquettetests.Publisher.run(Publisher.java:76)
	java.base/java.lang.Thread.run(Thread.java:834)

@hylkevds
Copy link
Collaborator Author

Do I understand the idea behind the ByteBuf correct in that a method that receives or creates a ByteBuf should either

  • Pass it on further
  • Release it

And if the method passes it on more than once, it should retain it for each case?

If that is how it should work, I can see if I can find the missing call to retain() or release()

hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Feb 17, 2021
The main place where ByteBufs were not released was the inflightWindow.
Also improves two postoffice methods that passed the payload and retain
information twice, once direct and once in the message.
@hylkevds
Copy link
Collaborator Author

I think PR #576 fixes the ByteBuf retain/release problems. One of the most important ones was the inflightWindow messages that were not released and retained properly. Of course, these fixes affected lots of tests that played loose with the buffers too.

@hylkevds
Copy link
Collaborator Author

hylkevds commented Feb 18, 2021

PR #464 partially fixed the problem and also adds a test, but that test never runs, because the name of the class the test is in does not end with Test (it ends with Tests) and thus the test class never runs.

hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue Feb 18, 2021
@andsel
Copy link
Collaborator

andsel commented May 8, 2021

The reasoning on buffer retain and release is that:

  • once a piece of code consumes the buffer, for example sending to a channel it should release.
  • if there is a piece of code that forward a buffer to n channels then it should create n duplicate buffers, one for each channel and call retain to have the duplicated buffer's refCount to 1.

@andsel
Copy link
Collaborator

andsel commented May 8, 2021

As you suggested I think the problem rise when a PublishedMessage is inserted into inflightWindow and then sent to the client with mqttConnection.sendPublish

inflightWindow.put(packetId, new SessionRegistry.PublishedMessage(topic, qos, payload));
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
MqttPublishMessage publishMsg = MQTTConnection.notRetainedPublishWithMessageId(topic.toString(), qos,
payload, packetId);
mqttConnection.sendPublish(publishMsg);

In this case the send consumes the payload buffer count, and during a resend due to a missed ACK from client,

final ByteBuf payload = msg.payload;
final ByteBuf copiedPayload = payload.retainedDuplicate();
the payload is invalid because reached 0 refCount in the previous send.

The solution could be to handle correctly the insertions/removes into inflightWindow, for each put the payload should be reatainedDuplicate and in remove from the map the payload should be release()-ed

@andsel
Copy link
Collaborator

andsel commented May 8, 2021

This problem could also happen for Qos2 processing, because also that uses the inflightWindow

hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 8, 2021
The main place where ByteBufs were not released was the inflightWindow.
@hylkevds
Copy link
Collaborator Author

hylkevds commented May 8, 2021

I've recreated PR #576, and split the postoffice cleanup into #598

Qos2 should now also be properly reference-counted, though the inflightWindow handling has other issues than just reference counting, which have separate PRs (#582, #584 & #588).

@hylkevds
Copy link
Collaborator Author

hylkevds commented May 8, 2021

By the way, those other PRs will probably need to be rebased as well, once you get to them. Since all these fixes are on a relatively small section of the code, it's impossible to have the commits not influence each other.

hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 9, 2021
@hylkevds
Copy link
Collaborator Author

hylkevds commented May 9, 2021

I've rebased the other PRs. These three are directly related to this issue:

hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 22, 2021
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 22, 2021
andsel pushed a commit that referenced this issue May 23, 2021
- Add unit test to check a second publish is sent if the not PUBACK is received in `FLIGHT_BEFORE_RESEND_MS` interval.
- Fixed ServerLowLevel tests not running.

* Added a test for #573
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 24, 2021
Moved buffer release/retain calls to the outside interfaces of moquette.
hylkevds added a commit to FraunhoferIOSB/moquette that referenced this issue May 24, 2021
Moved buffer release/retain calls to the outside interfaces of moquette.
andsel pushed a commit that referenced this issue Jun 6, 2021
Moved buffer release/retain calls to the outside interfaces of moquette.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants