Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dropping session queues leaks netty ByteBufs, resulting in OOM #608

Closed
jbutler opened this issue Jul 20, 2021 · 74 comments
Closed

Dropping session queues leaks netty ByteBufs, resulting in OOM #608

jbutler opened this issue Jul 20, 2021 · 74 comments

Comments

@jbutler
Copy link
Contributor

jbutler commented Jul 20, 2021

Expected behavior

Steps described below should not result in Moquette OOM

Actual behavior

After a couple hours, Moquette is unable to allocate memory for incoming PUBLISH messages.

[nioEventLoopGroup-3-6] ERROR NewNettyMQTTHandler exceptionCaught 93  - Unexpected exception while processing MQTT message. Closing Netty channel. CId=pubclient
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 4294967296, max: 4294967296)
	at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:776)
	at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:731)
	at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645)
	at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621)
	at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204)
	at io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:188)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:138)
	at io.netty.buffer.PoolArena.allocate(PoolArena.java:128)
	at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:378)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
	at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
	at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
	at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:150)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce

OOM can be reproduced using two clients.

Client A publishes to topic at QoS 1 at a high TPS

Client B subscribes to topic at QoS 1.

Client B is a slow device. This can either be a slow physical device (e.g. Raspberry Pi) or simulate a slow device by artificially delaying PUBACKs. Client B will connect and disconnect, alternating between clean and persistent sessions.

To accelerate reproduction, I also reduced INFLIGHT_WINDOW_SIZE from 10 to 1.

Minimal yet complete reproducer code (or URL to code) or complete log file

Publisher code:

#!/usr/bin/python3

import time
import paho.mqtt.client as mqtt

mqttc = mqtt.Client(client_id="pubclient")
mqttc.connect("localhost", 1883, 60)
mqttc.loop_start()

while True:
    for i in range(100):
        mqttc.publish("my/topic", "a" * 1024 * 4, 1)
    time.sleep(.1)

Subscriber code:

#!/usr/bin/python3

import time
import paho.mqtt.client as mqtt

mqttc = mqtt.Client()
clean_session=True
while True:
    mqttc.reinitialise(client_id="sub_client", clean_session=clean_session)
    clean_session = not clean_session

    mqttc.connect("localhost", 1883, 60)
    mqttc.loop_start()
    mqttc.subscribe("my/topic", 1)

    time.sleep(5)
    mqttc.disconnect()
    mqttc.loop_stop()
    time.sleep(.5)

I suspect this leak occurs when a client connects using a persistent session, disconnect, and reconnects using a clean session. When this happens, the SessionRegistry drops the session queue. Dropping the session queue then leaks all messages in the queue, unless I'm missing something.

Thoughts?

Moquette MQTT version

0.14

JVM version (e.g. java -version)

openjdk version "1.8.0_282"
OpenJDK Runtime Environment (AdoptOpenJDK)(build 1.8.0_282-b08)
OpenJDK 64-Bit Server VM (AdoptOpenJDK)(build 25.282-b08, mixed mode)

OS version (e.g. uname -a)

Darwin 19.6.0 Darwin Kernel Version 19.6.0

@jbutler
Copy link
Contributor Author

jbutler commented Jul 30, 2021

I've made a change to address this issue. I need to do some more testing, but hope to send a PR shortly.

@hylkevds also interested in your opinion on this since you've been chasing down memory leaks. This, IMO, is a significant one.

My change involves simplifying session creation logic, and adding a terminateSession() method to the Session class.

  • Simplify Session creation logic

    • Clean sessions always result in a new Session and session queue. This guarantees that a Session is truly clean
      • Related, if a client connects with clean_session=false and messages get added to the queue, then that client client connects with clean_session=true, it will still receive old queued messages. This is a bug
    • Persistent sessions will have a new Session/queue created if none exists. If one does exist, the old Session is resumed. If the Session is connected, then the connection is dropped
    • SessionRegistry no longer maintains queues and it no longer needs to worry about copying Session state or dropping queues
      • Related, the dropQueuesForClient method leaks all buffers in the queue
  • Session termination logic

    • Releases messages in sessionQueue
    • Releases messages in inflightWindow

Working changes in https://github.com/jbutler/moquette/tree/fix/session-management

@hylkevds
Copy link
Collaborator

Are you still testing with 0.14? Since that one has known memory leaks. Also 0.15 has one know bytebuf leak left, and a known leak in that subscriptions are not cleaned from sessions on unsubscribe, so you should apply PR #609 and #612.

I've done some testing with clients that reconnect every 15 seconds, alternating clean session and persistent sessions and I've not found obvious leaks yet. One tricky bit is that when making a heap dump, it should be done after the clients connected and disconnected with clean sessions, otherwise it's expected that the server stores a lot of stuff :)

The session-creation code does seem more complex than it needs to, but I've not traced it out yet for correctness.

Still, more testing to be done. The HiveMQ client generates lots of warnings when alternating clean sessions and persistent sessions, so something is not quite right yet.

@jbutler
Copy link
Contributor Author

jbutler commented Jul 31, 2021

I've reproduced the issue on the mainline. The easiest way is to use a modified client that does not send PUBACKs. This results in the inflight window filling up, and then subsequent messages are stored in the session queue.

Everything in the inflight window is leaked when the Session objects are GC'd. "Case 2" in the SessionRegistry results in the session queue being dropped from the map, which means that a new session queue is created the next time a connection with that client ID is made. So, the sequence of events leading up to the session queue being leaked is a bit more complex, but is still present. The sequence would be:

  1. client creates persistent session
  2. client disconnects
  3. client connects with clean session
  4. client disconnects

Interestingly enough, the clean session in step 3 above also re-uses the previous Session object. So that may be why the HiveMQ client generates warnings. Moquette is resuming a previous session rather than creating a new one. Changing the session_present flag to false and using newSession instead of oldSession may solve that issue for you. You could try pulling my change and see if it makes a difference.

Anyway, I will try pulling in your subscription fix. I need to iterate on my PR for this issue still. My change appears to fix this issue in my simple testing, but I know there will still be problems if sendPublishOnSessionAtQos is called while a Session is being terminated.

@hylkevds
Copy link
Collaborator

I had a bit of a look into the SessionRegistry, and one direct issue: It drops the session queue without releasing the messages from it...

Also, unsubscribe does not remove the subscription from the session:

private void unsubscribe(Session session) {

And, as you mentioned, there are the inflightWindow, inflightTimeouts and qos2Receiving that are not emptied, and inflightSlots is not reset either if the session object is re-used.

@hylkevds
Copy link
Collaborator

hylkevds commented Aug 2, 2021

I've added my take on the refactor of SessionRegistry in #619
One important factor I noticed is that new sessions share session queues with old sessions, due to:

queues.computeIfAbsent(clientId, (String cli) -> queueRepository.createQueue(cli, clean));

This means we actually have to be careful when cleaning up the old session. We can't just throw out the queue.

@jbutler
Copy link
Contributor Author

jbutler commented Aug 2, 2021

Thanks for taking a look. I'm not sure if you took a look at the change I made. I actually took things one step further and completely got rid of queue sharing.

private final ConcurrentMap<String, Queue<SessionRegistry.EnqueuedMessage>> queues = new ConcurrentHashMap<>();

Rather than copying session state into a new Session object, I've opted to re-use the old Session when resuming a session. A new Session is only created in the other cases. Each newly created session gets a brand new queue. IMO that's much easier to reason about.

Another thing to mention: cleaning up Session state is not as simple as just calling a "clean" method when replacing an old Session. There is another race that we need to protect against. We need to ensure that messages cannot be published to the Session while we are tearing it down. For example, a message could be added to inflightWindow as soon as we clean it, and that message will still be leaked. E.g.

public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload) {

sendPublishQos1(topic, qos, payload);

EnqueuedMessage old = inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload));

I've gotten around this by adding reference counting to the Session object. I can't think of a better way to stop incoming messages that doesn't involve locking / synchronizing between threads, which would obviously hurt performance.

Let me send out a PR instead of linking back to my dev branch. It will be easier to discuss there

@hylkevds
Copy link
Collaborator

hylkevds commented Aug 2, 2021

I've not looked too closely at your version yet, mainly trying to understand the current code first.

Good point about the race with the session clean up. There can be many threads adding messages to the Session, and they already use an AtomicInteger for the inflightSlots to see if they can directly send, or need to add to a queue. The Session status is also an AtomicReference.
I suppose one option would be to add a new status flag to indicate a Session is terminated to indicate no new messages may be added any more.
The other option would be to use a separate clean up thread that waits long enough to be certain that the Session object is no longer held by any other thread, before freeing all its resources.

@jbutler
Copy link
Contributor Author

jbutler commented Aug 2, 2021

I thought about adding a new TERMINATED state. However, you still need a lock for that to work otherwise you have a TOCTOU issue. And that hurts performance.

Delayed clean up could work. Reference counting is pretty simple though. Basically just extend AbstractReferenceCounted and then have public methods first call a tryRetain() method to prevent session cleanup while the operation is being handled.

class Session extends AbstractReferenceCounted {

    ....

    // Called when the Session reference count reaches 0
    @Override
    protected void deallocate() {
        terminateSession();
    }

    private boolean tryRetain() {
        try {
            retain();
        } catch (IllegalStateException e) {
            LOG.debug("Session has already been deallocated");
            return false;
        }
        return true;
    }

    ....

    public void sendPublishOnSessionAtQos(Topic topic, MqttQoS qos, ByteBuf payload) {
        // Add reference to this Session to prevent it from being deallocated
        // while we are processing this Publish message.
        if (!tryRetain()) {
            return;
        }

        switch (qos) {
            ....
        }

        release();
    }

The last thing I'm trying to work through is how to best disconnect the old connection. Closing the inbound channel associated with the old connection triggers handleConnectionLost which calls into the SessionRegistry to either remove or disconnect the binded Session.

void handleConnectionLost() {
String clientID = NettyUtils.clientID(channel);
if (clientID == null || clientID.isEmpty()) {
return;
}
LOG.info("Notifying connection lost event");
if (bindedSession.hasWill()) {
postOffice.fireWill(bindedSession.getWill());
}
if (bindedSession.isClean()) {
LOG.debug("Remove session for client");
sessionRegistry.remove(bindedSession);
} else {
bindedSession.disconnect();
}

Of course, this would be bad! Easiest thing might be to just add some sort of detachSession() method to MQTTConnection so that we can call connection.dropConnection() without triggering a call back in to the SessionRegistry.

@hylkevds
Copy link
Collaborator

hylkevds commented Aug 3, 2021

Reference counting on the Session is an interesting idea!

Could we solve the handleConnectionLost() issue using reference counting? If we give the Session a reference to the SessionRegistry, then the Session can remove itself from the Registry when it terminates. This would be the cleanest solution, since only terminated Sessions should be removed from the Registry, and terminated sessions should always be removed from the Registry.
The MQTTConnection could just release() the Session.
For persistent Sessions, the registry itself can do a retain, so that the Session is not cleaned up when the connection releases the Session. For clean session, the session would be cleaned up as soon as the connection drops the session, and no other threads are in the publish process.

@jbutler
Copy link
Contributor Author

jbutler commented Aug 3, 2021

I like that! Then the only time the Registry releases a persistent Session is when a new clean Session is created to replace it.

@andsel curious on your thoughts when you have a moment. Most of the coding is done (implementing the changes @hylkevds suggested won't take long) but I'd appreciate your buy-in before I put more effort.

@andsel
Copy link
Collaborator

andsel commented Aug 6, 2021

Thank's folks to put down the discussion. I need some time to understand all the pictures you have listed above, I don't like too much to use reference counting for sessions because, we off load the work of the memory management to the application code while that's matter of the GC. However if that's the more practical solution I'm not against it, I only need time to think about.
I think that the SessionRegistry is the only point that should decide about the aliveness of a Session, and that's the reason why all the code of creating sessions stay there.

@hylkevds
Copy link
Collaborator

hylkevds commented Aug 6, 2021

Summary for the Reference Counting disussion:

  1. The Session hold references to ByteBuffers (sessionQueue, inflightWindow, qos2Receiving) and thus must be cleaned up before GC.
  2. At the time the SessionRegistry knows the Session is no longer needed, many different Threads may be adding/removing items to/from those Queues.
    • The Thread of the "old" MqttConnection, if a clean re-connect happens before the old connection was closed
    • Publishing Threads from other connections
  3. The clean up can not happen when other Threads can still access the Session.

Options:

  • Locking: Prevent multiple threads from accessing the Session.
    • Very reliable
    • Very bad for performance in our heavy multi-threaded situation
  • Check AtomicReference to SessionStatus before each action
    • Unreliable, a Thread may have been suspended right after it checked the Status, and woken up again after the clean up started.
  • Delayed clean up: Park the session in a clean-up queue for X seconds/minutes
    • Not perfectly reliable, but close (Active Threads are unlikely to be suspended for a long time)
    • Keeps Sessions in memory longer than needed
  • Reference Counting: Increase an atomic counter when a Thread starts work in the Session, decrease when the Thread is done. Only the SessionRegistry holds an external "reference" to the Session, all other "references" happen in the Session itself.
    • Reliable.
    • Clean up happens as soon as it can, but not sooner.

Ideally we would use the Java 9 Cleaner API, but that would tie us to Java 9+.

@andsel
Copy link
Collaborator

andsel commented Aug 8, 2021

Hi @hylkevds for the great summary.

The reference counting seems promising. I foresee some points where we need to pay attention.
If I understand correctly each thread does a +1 when enter the usage of a session and -1 when finish to use it; plus there is an hard reference from the SessionRegistry. So when the create session code in SessionRegistry is asked to drop an existing old session it could be in 2 cases:

  • the reference counter is == 0: the session references to buffers could be released (fields sessionQueue, inflightWindow, qos2Receiving)
  • the reference counter != 0: the session's referenced buffers can't be removed because there is another thread that's using it.

Possible problems

  • in case 1 we could suffer of the TOCTOU problem raised by @jbutler, we could have the thread that's executing the connection flow with a positive check, so it decides to start dropping the buffers but in the meantime it's suspended and another thread start using it. Maybe this second thread is not able to send data to the underlying channel because the session is not in CONNECTED state.
  • in case 2 if the thread that's executing the connect and is creating a new session check that it can't clean up the old session, than that job must be handled by the last thread exiting the session, so every time the code does a -1 to the counter it should check if counter == 0 and then call a method into the SessionRegistry to do the clean, but still suffer of the TOCTOU problem.

@hylkevds
Copy link
Collaborator

hylkevds commented Aug 8, 2021

Yeah, reference counting helps, but doesn't completely solve all problems. There are actually two problems:

  1. When SessionRegistry has decided that the Session can be terminated, how do we clean up that Session without running into issues with Threads work on Messages in the Session.
  2. Which Thread gets the Session when multiple Threads try to revive/terminate the Session at the same time

Reference counting solves the first problem. When the counter goes to 0, the deallocate() method gets called automatically (by AbstractReferenceCounted.handleRelease()), and as soon as the counter hits 0 (before the deallocate happens), all guarded methods are blocked. So there should not be any TOCTOU problems on that front.

There is, however, the second issue of one thread trying to terminate the Session, while another thread is trying to revive/claim the Session. I think the worst-case scenario is when a live Session is claimed simultaneously by two new connections, one with cleanSession and one without. In the MQTT 5 case it could be between a Connection trying to revive a Session, and the SessionCleaning trying to terminate that Session after it timed out.

I think we are going to need a bit of locking to decide who wins that fight, but that locking should not involve any working Threads that add or remove Messages to/from the session. This locking should not happen often, but only on connect events with an existing Session and only involve other Threads that try to claim/terminate the Session. I don't think it'll impact performance much.

@jbutler
Copy link
Contributor Author

jbutler commented Aug 10, 2021

Good call out on problem 2. I think we can avoid that problem entirely for clean sessions if we re-factor Session creation logic like I've done in #620. If clean sessions always get a brand new Session and that Session can never be resumed by another connection, then there's nothing to worry about. But I still have a bug when two clients connect simultaneously using persistent sessions.

Related to your concern are problems created by having the MQTTConnection bind itself to the Session after createOrReopenSession returns. If an MQTTConnection can modify the binding then there's potential for weird stuff to happen. I left a comment describing one such use case in my PR - the flow would be:

  1. New client connects and gets a Session
  2. Another client connects with clean session after createOrReopenSession returns but before the first Session binds
  3. SessionRegistry attempts to disconnect the first client, but the Session has not yet bound so the MQTTConnection is not closed
  4. The first MQTTConnection remains open, but with a reference to a terminated Session
    1. The client remains connected but cannot send or receive messages
    2. The MQTTConnection won't even close itself since MQTTConnection itself handles ping responses

Locking in createOrReopenSession still doesn't prevent this issue, but it does prevent SessionCorruptedException.

Steering the conversation to Session management for a second, since I've made some significant changes in this area and I'd like to justify them. I'll propose that we should distill cases 1-4 down into 3. Logic is as follows:

  1. Case 1 - No session exists, the old session was clean, or the new session is clean
    1. A new session is needed, along with a new Session queue
    2. If an old connection is present, terminate it
      1. Disconnect the binded MQTTConnection
      2. If the session is persistent, remove the registry reference to cause the deallocate method to be called
    3. If the new session is persistent, add a SessionRegistry reference to the Session
  2. Case 2 - A persistent session exists, the new session is persistent, and old client is disconnected
    1. Simply resume old session
  3. Case 3 - A persistent session exists, the new session is persistent, and the old client is connected
    1. Disconnect old client
    2. Resume old session

With this approach, we never need to re-use session queues. In fact, the SessionRegistry itself doesn't even need to track session queues. That responsibility can live entirely within the QueueRepository.

This fixes two issues:

  1. Resumed sessions drop publish messages that were sent but never acknowledged (e.g. case 4 returns newSession so it re-uses the session queue but not inflightWindow or qos2Receiving).
  2. Clean sessions receive messages in an old sessionQueue, despite being clean (case 2)

Of course, it would be possible to address those problems, but I believe that starting with whether a session is clean or not simplifies the decision matrix quite a bit. Thoughts?

@jbutler
Copy link
Contributor Author

jbutler commented Aug 10, 2021

BTW - Java 9 Cleaners look neat! But I'll selfishly advocate for a Java 8 compatible solution. My project allows my customers to run on Java 8, and I'd prefer not to maintain diffs in this particular code path.

@hylkevds
Copy link
Collaborator

hylkevds commented Aug 11, 2021

Yes, first checking if a new session is in order makes things a lot simpler.

If the session is persistent, remove the registry reference to cause the deallocate method to be called
If the new session is persistent, add a SessionRegistry reference to the Session

I think adding / removing the registry reference should always happen, not just for persistent sessions. All sessions have a registry reference, otherwise they'd be cleaned up immediately :)

On the session handling, when ever we try to resume, take-over or terminate a session we should:

  1. Get a lock on the session
  2. Check if the session is in the state we expect it to be
  3. Do what we want to do
  4. Release the lock

The same bit of code should never need to hold a lock on two Session instances, to avoid deadlocks. The only case were two instances are involved is when creating a new session and terminating an old one, and in that case we don't need a lock on the new session, since adding it to the registry is the last step. Only after that step we have the old Session for termination.

@andsel
Copy link
Collaborator

andsel commented Aug 11, 2021

Hi folks I've some doubts on some sentences:

@hylkevds

Which Thread gets the Session when multiple Threads try to revive/terminate the Session at the same time

All transitions of Session's state in SessionRegistry are guarded by CAS or atomic operation, so in that case only one thread is able to move a Session from DISCONNECTED to CONNECTING.

@jbutler

Case 1 - No session exists, the old session was clean, or the new session is clean

this is not clear to me what you mean, if there is no session in the registry how there could be an old session that was clean? We should only speak of the new session, or am I missing something.
I think the cases.
The idea to overwrite and oldSession which is clean with a freshly created could be nice, that avoid contention of resources re-opening.

@jbutler
Copy link
Contributor Author

jbutler commented Aug 11, 2021

I think adding / removing the registry reference should always happen, not just for persistent sessions. All sessions have a registry reference, otherwise they'd be cleaned up immediately :)

Sorry, you are absolutely correct! The SessionRegistry removes its reference while terminating the Session, regardless of whether it is clean. My code does this the correct way, I just explained it wrong here :) This was pre morning coffee

this is not clear to me what you mean, if there is no session in the registry how there could be an old session that was clean?

Sorry, that was confusing. I suppose it is actually 3 unrelated cases all in 1. Here it is in code:

final boolean newIsClean = msg.variableHeader().isCleanSession();
final Session oldSession = pool.get(clientId);

// A new session should be created in the following 3 cases:
//  1. clean session = true for the new session
//  2. No old session for the given client ID exists
//  3. An old session for the client ID does exist, but it itself is a clean session
if (newIsClean || oldSession == null || oldSession.isClean()) {
    < create new Session... >

Essentially, in all of these cases, the client should get a brand new Session (from an MQTT spec perspective). If the Session doesn't exist, there isn't one to resume (even if we wanted to). If the new Session is clean, then we don't want any state from the old Session. And if the old Session is clean, then it should not be resumed (according to the spec).

@jbutler
Copy link
Contributor Author

jbutler commented Aug 11, 2021

Which Thread gets the Session when multiple Threads try to revive/terminate the Session at the same time

All transitions of Session's state in SessionRegistry are guarded by CAS or atomic operation, so in that case only one thread is able to move a Session from DISCONNECTED to CONNECTING.

It is no longer necessary to replace anything in the SessionRegistry with the new changes when a Session is resumed. We need to be careful since replacing a connection on an existing Session is a multi-step process (e.g. disconnect old client, bind new connection, move Session state back to CONNECTING/CONNECTED). There is some discussion in the PR that highlights a new potential race condition

@andsel
Copy link
Collaborator

andsel commented Aug 23, 2021

@jbutler I would suggest to create a PR which focus only on the reference counting, which solves the problem of the missed ByteBuf deallocation when a Session is cleaned, this would simplify my job to review and understand it.

From this discussion a couple of interesting points come out, which should be tracked in different issues each:

  • asking to simplify the session creation, create a newly fresh session when an old one in closed status is reopened switching from persistent to clean session, instead of reusing and cleaning the old one.
  • a problem of synchronization between Session and it's binding to a MQTTConnection. If the creating thread is suspended after a session is created but before it's bound if another thread connect with same client ID and force closing the old one, creates throubles to the old session's thread. Maybe also the MQTTConnection should a be guarded by a state but this is a manifestation of the TOCTOUC problem

@andsel
Copy link
Collaborator

andsel commented Aug 27, 2021

Looking at the a possible thread interleaving that manifest the problem:

time thread 1 (publisher) thread 2 (subscriber disconnect) thread 3 (new connecting client)
1 if connected()
2 disconnect()
3 old is disconnected && new is clean => case 2, wipe queue and inflight
4 inflight + send to channel

the thread1 once resume the execution at time 4, puts a message in the wrong flight zone and send to a client that wasn't subscribed.

I think that here we have 2 distinct problems:

  • thread1 and thread 2 races to the same old client connection, and we can't synchronize them, also with atomic status variables
  • thread1 and thread3 race on same session object, but they are guarded by status variable that track various session statuses.

The problem 2 is due to the fact that the broker needs a unique status variable per clientId to coordinate the connection flow, this is the main motivation why the case 2 of the connection flow needs to wipe queues and inflight, the Session instance needs to be stable across various client connections.
Solution: avoid to keep the status of clientId session in the Session instance, but use another stable variable across various client connection, in this way the Session instances could be freshly created, instead of wipe an existing, like in the case 2.

The problem1 regards 2 threads that are trying to update an established Session. A Session is bound to an MQTTConnection and that a connection is linked to Netty's Channel. In Netty all operations related to a Channel for a socket are executed always in the same thread, so that there is any contention on the Channel level.
Idea: use the same principle for also the processing of the Session management. So when a publisher thread has to publish to another Session, once it identifies the target Session, is executes all the remaining logic to publish the message in the Netty's thread that executes the Channel's operations. In this way we serialize the operations on a Session, so we cant have anymore a publish interleaved with a disconnect, and hence no need to synchronize on that, only need to follow a linear flow. In that way the publishes on a Session becomes atomic.

What do you think @hylkevds and @jbutler ?

@hylkevds
Copy link
Collaborator

The interaction between thread 1 and 3 would be solved by the simplification of the connect logic + reference counting: Never re-use a Session unless it's a resumed session, and never re-use queues. If publishing thread adds messages to a disconnecting Session, the messages will simply be cleaned up when the last release() happens.

Always using the Netty-thread for message handling is a good idea, since in the above example thread 1 probably runs into an exception when trying to send the message. Always putting it on the queue would fix that, but can a Netty-thread be notified that there is work waiting for it on the sending-side?

That only leaves the problem of handing over a non-clean Session from one connection to another. Especially when there are multiple connections trying to "grab" the Session.

@jbutler
Copy link
Contributor Author

jbutler commented Sep 3, 2021

Sorry for the delay. Good discussion here. I agree simplifying Session creation logic does eliminate some of the problems.

I will break up my PR and start with just adding reference counting to the Session object so it's easier to review. It's a holiday weekend in the states so it will take me a few days, but I am planning to pick this up mid next week.

@andsel
Copy link
Collaborator

andsel commented Sep 3, 2021

Thanks @jbutler! there is no rush, I'll prepare the PR to switch the execution of publish in the peer's socket thread

@andsel
Copy link
Collaborator

andsel commented Sep 6, 2021

Tinkering with the code to understand if the "Netty's thread per session" is feasible, it come out that we can't use it because the broker can have sessions that still are live, they keep subscriptions and store message, but are not linked to an MQTTConnection (the client dropped it, for example).

So we can think to have a pool of threads that manages the sessions. Each session is sticked to a specific thread, and a thread can have multiple sessions. The linkage happens by clientId so that every interaction or changes to the session is physically executed by one thread and it's always the same. The change operations are simply serialiazed actions enqueued to a specific thread.
This would eliminate the necessity of synchronization and also the problem of reference counting to decide which is the thread that needs to release the buffers referenced by the session.

It comes at a cost of a one more CPU's context switch per published message, now we have 1 context switch per publish, the publisher thread has to switch to the subscriber thread; then we would have publisher thread -> session's thread -> subscriber thread, so 2 context switches.

WDYT of this idea?

@hylkevds
Copy link
Collaborator

hylkevds commented Sep 6, 2021 via email

@andsel
Copy link
Collaborator

andsel commented Sep 6, 2021

What needs to be done for a disconnected Session?

Storing messages on the queue.

Publish Threads add messages to the queue, but that's it.

In my idea is to let a unique thread all the handling of a sessions, enqueueing messages is part of the job of this management thread.

Having a separate Thread manage the messages doesn't solve the need for reference counting, since we still have a queue that is accessed by multiple Threads.

No, in this new shape the writing of messages, and the writing in general on the Session object is done only by one thread that receives it's jobs from a queue.

Obviously there isn't a thread per sessions, but we could create a pool of threads equals to number of cores, and have one thread that manage the change statuses for a subset of Session instances. In this way the access to the Session is serialized.

The problem with the actual design is that we need complex "status and CAS" logic for interaction with the session, plus we need to introduce also the concept of reference counting to avoid memory errors.

In-flight queues probably have to be per-connection instead of per-Session.
Could be, but the MQTT specs states that on client reconnection the broker has to resend all the previously inflight messages that weren't ACKed, if we put in the MQTTConnection, when the MQTTConnection is dropped due to client error, for example, we lost all the inflights.

@hylkevds
Copy link
Collaborator

After more testing I noticed that failedPublishes currently doesn't work, since adding to the queue never causes an exception, so .filter(CompletableFuture::isCompletedExceptionally) never triggers.

Fixing that noticed that both the Paho and HiveMQ clients never seem to re-send failed publishes. Instead they just "hang". The Async client of HiveMq does continue, but still doesn't send any dup packets. Having a quick look at the paho code it seems it only ever sets the dup flag when restoring after a disconnect. (I found a bit more info in this old email thread: https://www.eclipse.org/lists/paho-dev/msg03429.html )

This is, of course, quite bad for our plan to ignore messages and we may have to go back to delaying the response instead of completely dropping it...

On a brighter note, I did experiment with creating only one message per queue for a publish, and that does indeed improve things massively. The implementation is quite an ugly hack, but I've added the commit to #637 so you can have a look. (88104d0)
I also added the queue-skip-if-same-thread (8c78ace)

@andsel
Copy link
Collaborator

andsel commented Jan 11, 2022

After more testing I noticed that failedPublishes currently doesn't work, since adding to the queue never causes an exception, so .filter(CompletableFuture::isCompletedExceptionally) never triggers.

In next weeks I'll fix this, missed that the exception wasn't raised.

Fixing that noticed that both the Paho and HiveMQ clients never seem to re-send failed publishes.

From the email thread you linked it's a specification change introduced in MQTT v3.1.1, I know that v5.0 has the ability to have negative acks.
The only way the server has to signal upstream that it can't proceed, is the connection close from the broker side.
Your suggestion to have a blocking call in enqueue is functional to the problem , but it would block also a Netty thread, and the rule of thumb for Netty is "never block a Netty's thread" because that would block the event loop. WDYT, is the connection drop a viable solution? (in meanwhile I'm creating an integration test to reproduce the problem with small queues and many forward publishes)

@hylkevds
Copy link
Collaborator

With the change to only have 1 command-per-queue, and making sure the queue size is configurable, dropping the connection may actually be fine. We will have to come up with an alternative for the internalPublish method for embedded use, since there is no connection to drop. A checked exception would be suitable.

Using a tiny (configurable) timeout with a blocking queue may still improve things as well, since a publish with many subscribers will still result in a large number of Acks, though those are more spread out due to the nature of the network. On a slow 4-core laptop, using a tiny queue (size 32 and 1ms block limit) with 100 listeners and sending bursts of 20 QOS-2 messages does cause quite a few overflows in ack messages, but only very rarely dropped publish messages.

Logging a warning about the queue being overrun, with the suggestion to increase the queue size, is very important of course.

@andsel
Copy link
Collaborator

andsel commented Jan 16, 2022

@hylkevds with latest commits to #631 I've improved a flood test plus reworked a little bit your idea to squeeze the publishes. Nothing far from your original idea, just kept all the collection and iteration over the batched published in one place and changed the publishToSession to accept a list of Subscription instances, it also calculate the best QoS, so no need to have a class to catty Subscription and matching Qos for the subscription.
In the test I tried concurrent subscribes but I hit the problem of queue full, so what do you think if when we hit the queue full we drop the connection instead of returning an ACK, so that the client is forced to resend the inflight PUB/SUB/UNSUB messages; the problem I foresee is that more clients we drop more those clients reconnects and could potentially hit the queue full problem also during connection. I think MQTT5 has special error codes in ACK messages which are essentially NACKs, so that the client knows to slowdown, but it's outside the scope of this PR.

@hylkevds
Copy link
Collaborator

#631 starts looking good :)

Dropping the connection on a sub or unsub sounds like a bad idea. Especially for a clean session, since that will result in lost packets. Also especially for the unsub, since the client is trying to reduce our load...
On the other hand, a full queue is a very bad situation that should not happen in the first place, and the disconnection is most likely to hit the client that causes the full queue in the first place. Also, there is not much else we can do at that point.

For the test, it makes sense that the queue overflows with subscriptions, when making that many in a short time :)
Maybe you can do something with a barrier across all of them, with a size of the command queue? That would allow parallel processing of the subs, without sending enough subs to cause an overflow.

@andsel
Copy link
Collaborator

andsel commented Jan 17, 2022

#631 starts looking good :)

That's great, could you list all the tasks it needs to get approved? So that we can sort out all the item, kinda TODO list

Dropping the connection on a sub or unsub sounds like a bad idea. Especially for a clean session, since that will result in lost packets. Also especially for the unsub, since the client is trying to reduce our load...

Right, so the disconnect of the client in case of queue full is not a viable solution., we can just log a warn for this.

On the other hand, a full queue is a very bad situation that should not happen in the first place, and the disconnection is most likely to hit the client that causes the full queue in the first place

It could be that we drop the unlucky client that sends 1 messages every 1 minute, but it has bad luck and send exactly the message that makes the queue to overflow. So in normal circumstances when there is an heavy sender, it reasonable that it has more probability to be hit, but it's coarse grain solution. However as cited before, dropping connection is not viable.

For the test, it makes sense that the queue overflows with subscriptions, when making that many in a short time :)
Maybe you can do something with a barrier across all of them, with a size of the command queue? That would allow parallel processing of the subs, without sending enough subs to cause an overflow.

Yes we can assume that the clientIds are equally distributed across all queues, so we could simply use a barrier initialized at queue_size * number_of_cores, or in the worst case, we could use a max of queue_size parallel subscriptions.

@hylkevds
Copy link
Collaborator

hylkevds commented Jan 17, 2022

  • Check the Thread, and don't queue if the thread is the same as the processor thread of the target queue (I've updated b2bc4a2 so you could cherry pick it)
  • Improve the PublishToManySubscribersUseCaseTest (fixed with commit 0889d7f)
  • Clean up the FailedPublishCollection when clients disconnect (or add it to the todo list for the queues/Session cleanup) (fixed with fca4921)

I think that's it. After that we can actually get back to the original topic of this Issue, dropping queues and the Session handling :)

@andsel
Copy link
Collaborator

andsel commented Jan 21, 2022

Hi @hylkevds thank's for the "same thread" fix, I've cherry-picked it. I've also implemented the other 2 tasks, so now the PR #631 is ready for a final review, if you can. Then we go forward with other PRs targeting the original issue problem and queue drop fix.

@hylkevds
Copy link
Collaborator

hylkevds commented Jan 22, 2022

Great! I found and fixed two small issues while testing:

  • A disconnect can be queued right before a PubRec, resulting in a NPE on mqttConnection. Fix: 435fd68
  • On my machine the PublishToManySubscribersUseCaseTest flooded a few of the queues. I guess the distribution between the queues is not fully homogeneous. I also noticed that the test did not wait for all subscriptions to complete before publishing. In 364d8c2 I changed the subsciption procedure from latched batches to continuous with a Semaphore. The Semaphore allows for a continuous flow of subscribe commands, and allows for a final check to ensure all subscribes are finished. It also takes less code :)

With that this branch is ready to merge!

Edit: did a quick log of how the queues are assigned on my machine: https://gist.github.com/hylkevds/e50091cab9ba1d17d11d49adbe32d901
Fascinating pattern. Quite fair in the long run, but with a clear bias in the short term.

@andsel
Copy link
Collaborator

andsel commented Jan 22, 2022

Thanks @hylkevds good catch, commits cherry-picked.

About the .hashCode() function and the partitioning. I think it really depends by the algorithm used by Java's String, I tried to switch to Murmur3 hashing, provided by Apache's common-codec, using:

final int murmur = MurmurHash3.hash32x86(name.getBytes(UTF_8));
final int targetQueueId = Math.abs(murmur) % EVENT_LOOPS;

And got maybe more uneven results:
https://gist.github.com/andsel/101fa65110c921305535dd5eff4b370a

@hylkevds
Copy link
Collaborator

Interesting. The Java String hash does quite well, especially considering the input is not really evenly distributed either, since the input is readable text, not random binary data. Plenty good for our purpose any way. Here are some graphs:

Java String hash:
javaHashFunction

Murmur3 hash:
Murmur3HashFunction

@andsel
Copy link
Collaborator

andsel commented Jan 23, 2022

I thought that the problem could be in abs function. When the hash % event_loops return negative values, if we apply the abs we are "inverting the orientation of the rotation", so a -1 mean the last element the -2 is the index before the last , so -1 -> 3, -2 ->2, applying the abs means -1 -> 1, -2 -> 2 and so on. To avoid this I tried to do not invert with:

((hash % EVENT_LOOPS) + EVENT_LOOPS) % EVENT_LOOPS

but the results are pretty consistent with the abs. So a sum and modulo are more costly than the abs (no measured, but by gut feeling it should be), I think we could stay with |hash % event_loops|.

@hylkevds
Copy link
Collaborator

I agree. |hash % event_loops| is easy to understand, not very costly, and gives results that seem to be good enough.
I think with that this branch is ready to be merged 👍

@hylkevds
Copy link
Collaborator

hylkevds commented Feb 2, 2022

Now that the Thread safety is fixed, we can get back to the original topic of session and session queue handling. Now that session handling is always done by the same Thread, we should be able to simplify createOrReopenSession.

The issues I see:

  • IQueueRepository has no way to remove queues. Obvious problem...
  • IQueueRepository.createQueue either silently overwrites an existing old queue handle (MemoryQR) or returns the existing queue (H2)
  • Creating a Session requires a Queue but due to the above
    • it may have old stuff in it (H2)
    • we may have already overwritten the handle to the old queue, even if we don't need this new session
  • Queues that are dropped are not checked for ByteBuffers to release.
  • Sessions that are removed are not checked for ByteBuffers to release.
  • There are two stores for queues: queueRepository and queues. The reason is probably because the IQueueRepository doesn't have a getQueue method and the H2 implementation may create non-persistent queues without holding a reference to it.

@hylkevds
Copy link
Collaborator

hylkevds commented Feb 2, 2022

I've started working on it in #648.
Open issues:

  • In case of a hostile takeover of a non-disconnected, non-clean session we need some way to make sure the old MQTT connection (or any commands pending in the command queue) don't negatively affect the session after it has been taken over. PUBACK, PUBCOMP and PUBREL are fine and can be processed after the takeover, but DISCONNECT obviously not. Since the Session has a reference to the MQTTConnection, we could check that reference to ensure it's the right connection sending the message.
  • A configurable timeout for persistent sessions, to make sure we don't gather queues for all eternity.

@andsel
Copy link
Collaborator

andsel commented Feb 3, 2022

I'm working on a just discovered buffer leak, reproduced with #649

@andsel
Copy link
Collaborator

andsel commented Feb 5, 2022

About the queues management:

IQueueRepository existed only to separate the various implementations of persistent queues (once ago existed MapDB, then I inserted H2 and then removed the MapDB one). Exists also a MemrooyQueueRepository only for testing pourpose.
It's not good to have two places that manage queues, the queues field ConcurrentMap into SessionRegistry and the 2 implementations of IQueueRepository. We could think of:

  • remove the MemoryQueueRepository
    • remove the persistent_store configuration variable, change the behavior. If the user doesn't specify it, it goes in a data/ subfolder.
    • in tests that used the MemoryQueueRepository switch to temp folder and H2QueueRepository
  • rename H2QueueRepository into something else, remove the interface
  • move all queues related code from the SessionRegistry into the renamed H2QueueRepository and provide new methods to handle the cleanup of queued messages buffers (in case the queue is in memory).

After the queues are fine and don't leak we can think about the simplification of createOrReopenSession and the issue you identified in previous comment

@hylkevds
Copy link
Collaborator

hylkevds commented Feb 5, 2022

I think the concept of having an interface for the queues is not bad, though it may be a good idea to merge the subscriptions store (ISubscriptionsDirectory) and the queue store (IQueueRepository) into one Session Store. I can imagine use cases where a PostgreSQL Session store would be nice...
I can also see use cases where a pure in-memory store is needed, so the MemoryQueueRepository may actually be useful.

The IQueueRepository interface itself is fine, except for the missing void removeQueue(String cli) method, but adding that is trivial as you can see in this commit: c3c13db (Adding just 12 lines)

The problem with queues is mainly how they are handled in SessionRegistry and a bit in Session as you can see from f97c158.
Changes in that commit:

  • make the Session the only place where the Queue lives outside of the repository,
  • Tie a Queue and a Session in a 1-to-1 way,
  • remove and clean the old Session and Queue before creating a new pair for a ClientId,
  • Clean Session and Queue when they are no longer needed

That solves the problems in this issue, except for the two listed above.

@andsel
Copy link
Collaborator

andsel commented Feb 6, 2022

I think the concept of having an interface for the queues is not bad, though it may be a good idea to merge the subscriptions store (ISubscriptionsDirectory) and the queue store (IQueueRepository) into one Session Store. I can imagine use cases where a PostgreSQL Session store would be nice...

I think that keeping separate the SubscriptionDirectory from QueueRepository is the the best, because this permit to implement them using different storage technologies, and they are sibling concepts but are different one from the other.

The IQueueRepository interface itself is fine, except for the missing void removeQueue(String cli) method, but adding that is trivial as you can see in this commit: c3c13db (Adding just 12 lines)

Seems ok to me.

The problem with queues is mainly how they are handled in SessionRegistry and a bit in Session as you can see from f97c158.

The idea is ok, when #648 is ready to be reviewed, please, describe what it does and how it solves the problem and mark me as reviewer.

Then we can approach the problem of pending commands for an dropped session when a new session with same clientId is just recreated and the other for the timeout.

@hylkevds
Copy link
Collaborator

hylkevds commented Feb 6, 2022

Ok, I updated the comment for #648.

@hylkevds
Copy link
Collaborator

hylkevds commented Mar 1, 2022

In #662 that builds on #648 I've added a timeout to disconnected Sessions, to get some feeling for the effect and implications. Of course there are many ways to implement this, and here I just picked one to have something to discuss.

First result:
SessionCleaner
The timeout is currently hard-coded to one minute (Should be configurable of course). In this test I have 2 publishers sending 500 messages each every 2 seconds, that all go to 20 listeners, for 20000 messages per batch every 2 seconds.
Every 30 seconds all 20 listeners re-connect with a random ClientId, and cleanSession=false.

Clearly visible is the initial increase in memory use, until the sessions start timing out. After that things calm down and stay nice and constant.

@hylkevds
Copy link
Collaborator

hylkevds commented Jun 2, 2022

I've updated #662 a bit, to make the session timeout configurable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants