Skip to content

Commit

Permalink
Reduced memory use on busy Sessions by removing handled items from in…
Browse files Browse the repository at this point in the history
…flightTimeouts

Items in the inflightTimeouts DelayQueue were only ever removed when they
timed out. But in normal operation the related messages would have been
handled long before that. There should only ever be a number equal to the
number of inflightSlots in the queue, but the queue would grow to the
maximum number of messages ever handled in a 5 second interval. This made
each session take much more memory than needed.
  • Loading branch information
hylkevds committed Apr 29, 2024
1 parent 073cd63 commit 3ac2c8f
Showing 1 changed file with 4 additions and 0 deletions.
4 changes: 4 additions & 0 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ boolean isClean() {

public void processPubRec(int pubRecPacketId) {
// Message discarded, make sure any buffers in it are released
inflightTimeouts.removeIf(d -> d.packetId == pubRecPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId);
if (removed == null) {
LOG.warn("Received a PUBREC with not matching packetId");
Expand All @@ -218,6 +219,7 @@ public void processPubRec(int pubRecPacketId) {

public void processPubComp(int messageID) {
// Message discarded, make sure any buffers in it are released
inflightTimeouts.removeIf(d -> d.packetId == messageID);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID);
if (removed == null) {
LOG.warn("Received a PUBCOMP with not matching packetId");
Expand Down Expand Up @@ -343,6 +345,7 @@ private boolean inflightHasSlotsAndConnectionIsUp() {

void pubAckReceived(int ackPacketId) {
// TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged
inflightTimeouts.removeIf(d -> d.packetId == ackPacketId);
SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId);
if (removed == null) {
LOG.warn("Received a PUBACK with not matching packetId");
Expand Down Expand Up @@ -495,6 +498,7 @@ public void cleanUp() {
// in case of in memory session queues all contained messages
// has to be released.
sessionQueue.closeAndPurge();
inflightTimeouts.clear();
for (EnqueuedMessage msg : inflightWindow.values()) {
msg.release();
}
Expand Down

0 comments on commit 3ac2c8f

Please sign in to comment.