Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #573 ByteBuf reference counting #576

Closed

Conversation

hylkevds
Copy link
Collaborator

The main place where ByteBufs were not released was the inflightWindow.
Also improves two postoffice methods that passed the payload and retain
information twice, once direct and once in the message.

@andsel
Copy link
Collaborator

andsel commented May 8, 2021

Hi @hylkevds to simplify reasoning about buffer allocation could you:

  • avoid in this PR the improvements in postoffice methods
  • fix the conflicts.
    this should help grasp better the PR, thanks

@hylkevds
Copy link
Collaborator Author

hylkevds commented May 8, 2021

I'll update the PR, and split out the postoffice bits in a separate one.

The main place where ByteBufs were not released was the inflightWindow.
Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hylkevds for the contributions, I've some concerns I tried to fix with PR #599.

@@ -376,8 +377,9 @@ void processPublish(MqttPublishMessage msg) {
}
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
// Second pass-on, retain
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this payload retain is better to stay close to the retain of the full message, so I would move it into the Session.receivedPublishQos2

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works, as long as Session.receivedPublishQos2 is not called from other places.
Though I don't think it would make the code easier to read. MQTTConnectio.processPublish() is making the two passes, so it is also responsible for retaining the buffer. Otherwise Session.receivedPublishQos2() has two retains, even though it only passes the buffer one extra time.

The easiest way to get the buffer handling consistent is to stick to the simple rule:
If a method makes or receives a buffer, it must

  • retain it for every call that passes it on
  • release it before returning

By sticking to this rule for every method that does anything with buffers, you never have to check what the receiving methods do with the buffer to decide if you need to add a retain or release, making memory leaks a lot easier to find.
In practice, one retain cancels out the release, so for methods that pass the buffer only once, nothing needs to be done.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to reason about is that msg.retain() done in Session.receivedPublishQos2 (the first step) is essentially another retain of the payload, so this is mainly a double retain.

I don't think that a method that receives a buffer has retain it for every method call where it passes it, and release before the return. I think that every method that need to work with a buffer has to retain it.
The responsibility of retaining a buffer is of the method that works on it, not on the caller's hierarchy

@@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This release call impose the client code that uses Server.internalPublish to do a retain upfront (like in https://github.com/moquette-io/moquette/pull/576/files#diff-3b795bc663c1934e2316bbeee5566fd65aa5c2e2a2804e2bdc6f812e54f4a53aR150) and respect this not explicit behavior:

payload.retain
server.internalPublish

I think in this case is responsibility of the client code to decide if retain/release the payload or not

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the client code wants to use the payload further, it should always do a retain. Passing the buffer to moquette means giving moquette the control over the payload, with the expectation that moquette will release it when it's done with the buffer. This is the same as what Netty does when it passes a buffer to moquette, or when moquette passes a buffer back to Netty.
No one expects having to release a buffer after passing said buffer on.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the buffer is passed to moquette, the responsibility passes to moquette, so in this case it' moquette code that needs to retain and release, it needs to do it. In this the retain and release of the buffer should be responsibility of the internalPublish, this means the code become something like:

public void internalPublish(MqttPublishMessage msg, final String clientId) {
        final int messageID = msg.variableHeader().packetId();
        if (!initialized) {
            LOG.error("Moquette is not started, internal message cannot be published. CId: {}, messageId: {}", clientId,
                      messageID);
            throw new IllegalStateException("Can't publish on a integration is not yet started");
        }
        LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
   ---> msg.payload().retain();
        dispatcher.internalPublish(msg);
        msg.payload().release();
    }

dispatcher.internalPublish(msg); manages its retains, so this method simply does

refCnt +1
dispatcher.internalPublish(msg);
refCnt - 1 

which is equivalent to don't do anything, and we could remove completely the retain and release

inflightWindow.remove(packetId);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId);
if (removed != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case happen only if the broker receives a PUBREC (from a client) with not matching packetID, in which case the broker should ignore the message and don't do any processing

Copy link
Collaborator Author

@hylkevds hylkevds May 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the discussion to #599

@@ -200,7 +205,12 @@ public void processPubRec(int packetId) {
}

public void processPubComp(int messageID) {
inflightWindow.remove(messageID);
// Message discarded, make sure any buffers in it are released
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concerns as PUBREC above

@@ -283,7 +306,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightWindow.remove(ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern as PUBREC and PUBREL with invalid packetID

Copy link
Collaborator

@andsel andsel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hylkevds thanks for the effort you are putting in this.
I have some comments on your suggestions, in the intent to simplify the management of retain

@@ -376,8 +377,9 @@ void processPublish(MqttPublishMessage msg) {
}
case EXACTLY_ONCE: {
bindedSession.receivedPublishQos2(messageID, msg);
// Second pass-on, retain
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to reason about is that msg.retain() done in Session.receivedPublishQos2 (the first step) is essentially another retain of the payload, so this is mainly a double retain.

I don't think that a method that receives a buffer has retain it for every method call where it passes it, and release before the return. I think that every method that need to work with a buffer has to retain it.
The responsibility of retaining a buffer is of the method that works on it, not on the caller's hierarchy

@@ -308,6 +308,7 @@ public void internalPublish(MqttPublishMessage msg, final String clientId) {
}
LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
dispatcher.internalPublish(msg);
msg.payload().release();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the buffer is passed to moquette, the responsibility passes to moquette, so in this case it' moquette code that needs to retain and release, it needs to do it. In this the retain and release of the buffer should be responsibility of the internalPublish, this means the code become something like:

public void internalPublish(MqttPublishMessage msg, final String clientId) {
        final int messageID = msg.variableHeader().packetId();
        if (!initialized) {
            LOG.error("Moquette is not started, internal message cannot be published. CId: {}, messageId: {}", clientId,
                      messageID);
            throw new IllegalStateException("Can't publish on a integration is not yet started");
        }
        LOG.trace("Internal publishing message CId: {}, messageId: {}", clientId, messageID);
   ---> msg.payload().retain();
        dispatcher.internalPublish(msg);
        msg.payload().release();
    }

dispatcher.internalPublish(msg); manages its retains, so this method simply does

refCnt +1
dispatcher.internalPublish(msg);
refCnt - 1 

which is equivalent to don't do anything, and we could remove completely the retain and release

@hylkevds
Copy link
Collaborator Author

You are correct that:

refCnt +1
dispatcher.internalPublish(msg);
refCnt - 1 

is the same as not doing any retains/releases. Hence my statement In practice, one retain cancels out the release, so for methods that pass the buffer only once, nothing needs to be done.

In the end it comes down to maintainability. How easy is it to see if a method, viewed on its own, handles buffers correctly. It is very hard to see from just the name of a message call, if that call will consume the buffer or not, since the name does not tell the viewer if the call just passes the buffer on within moquette, or if it is actually a call to an external library. As a result, when reviewing code, the reviewer has to check for every method call where it goes to see if a retain is needed.

@andsel
Copy link
Collaborator

andsel commented May 22, 2021

Correct, I'm thing to best ways to avoid instructions that are not strictly correlated to a method logic, such those retain before passing a buffer to a method call, and release after that method returned, because I think it make the code little bit hard to understand.

I thought that if a method consumes a buffer we could also impose that before touching it, it has to retain and release after, before the return of the method. In this way only the outer "shell method" that received the buffer from Netty (or created it) has the duty to call the release to make it clean.

@hylkevds
Copy link
Collaborator Author

Yes, it is also possible to only do those retains/releases at the "edges" of Moquette.
Lets see, that would mean:

Release at the end of every method that

  • receives data from Netty or other external sources. It's important that those methods are never called from within Moquette. Maybe explicitly naming them would help there. Adding a javadoc comment about it would also help. Renaming internalPublish to directPublish (or adding a directPublish that releases) may be a good idea to make this clearer.
  • creates a buffer.
  • calls remove() on a queue/buffer. (also, put() can remove something!)
  • calls 'get()' on a queue/buffer, unless:
    • the value is never passed on, then also no retain()
    • it's certain that another thread can not release() the value before the method is done

Retain when

  • Handing to external libraries
    • channel.write()
    • channel.writeAndFlush()
    • others?
  • put() on a queue/buffer
  • get() on a queue/buffer, unless
    • the value is never passed on, then also no release()
    • it's certain that another thread can not release() the value before the method is done

I think that's all the cases...

@andsel
Copy link
Collaborator

andsel commented May 23, 2021

Adding a javadoc comment about it would also help.

This is good in general, in Javadoc cite the fact that the ByteBuf the method is receiving is going to be released.

Renaming internalPublish to directPublish

Yes, I'm in favor of this, maybe it's a separate PR

calls remove() on a queue

In this case depends if the method is passing the ByteBuf to a Netty codec, in which case it's Netty that invoke release() on it. In the other case if the buffer is processed an no more usable it could be dropped, so it should invoke release()

(also, put() can remove something!)

I think in this case you mean a put on an hash map that overwrite the existing value,is it correct?

(also, put() can remove something!)

Do you mean a pop operation that also remove the Buffer from the head of the queue?

@hylkevds
Copy link
Collaborator Author

I forgot the pop(), right.
I did indeed mean put() on a Map that returns a possible previous entry with the same key.
Is there a different check somewhere to see if a client is re-using a message ID?

I've given it a try, based on the branch that has my set of PRs, and it seems quite feasible, the commit is smaller than expected:
FraunhoferIOSB@c9cbffd?branch=c9cbffd66217330469a6d878a66f25f0a7f27cec&diff=split

I did notice that just doing a retain() before sending to Netty is not enough, it actually has to be a call to retainedDuplicate(), which is a bit tricky, since the method that does the sending has a generic MqttMessage, which may or may not be a ByteBufHolder...
Currently testing, no obvious memory leaks after 5e6 messages...

By the way, what formatting rules, for which IDE do you use?

@andsel
Copy link
Collaborator

andsel commented May 23, 2021

Is there a different check somewhere to see if a client is re-using a message ID?

No and actually Moquette doesn't check for duplication of packet identifier, indeed reading
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718025 a second message with same packet ID is admissible because could be a re-send. In this case also the DUP flag should be true, and actually Moquette doesn't check for uniqueness of packet ID nor it checks for dup flag in case of re-send.

By the way, what formatting rules, for which IDE do you use?

I use IntelliJIDEA, but never forced styled rules. I tend to follow Sun style guide, but we can enforce a style with a plugin in another PR.


This PR is shaping good in the commit FraunhoferIOSB@c9cbffd?branch=c9cbffd66217330469a6d878a66f25f0a7f27cec&diff=split

I'll though if we can add some verification of buffer counting in the existing unit tests, so to don't have a future regression on this.

I think also that a brief description of the policy we are introducing here about the usage of buffers and ref counting could be described in the README.md or a DEVELOPER.md

@hylkevds
Copy link
Collaborator Author

I've made a new PR that moves the ByteBuf reference counting to the outer interfaces of Moquette: #600
That means this PR can be closed.

@andsel andsel closed this in #600 Jun 6, 2021
@hylkevds hylkevds deleted the fix_573_ByteBufRefCount branch April 30, 2024 07:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants