From 3ac2c8f98b230038460549ad3cfaaa26a3709f13 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Mon, 29 Apr 2024 15:42:45 +0200 Subject: [PATCH] Reduced memory use on busy Sessions by removing handled items from inflightTimeouts Items in the inflightTimeouts DelayQueue were only ever removed when they timed out. But in normal operation the related messages would have been handled long before that. There should only ever be a number equal to the number of inflightSlots in the queue, but the queue would grow to the maximum number of messages ever handled in a 5 second interval. This made each session take much more memory than needed. --- broker/src/main/java/io/moquette/broker/Session.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 364722486..38100c712 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -192,6 +192,7 @@ boolean isClean() { public void processPubRec(int pubRecPacketId) { // Message discarded, make sure any buffers in it are released + inflightTimeouts.removeIf(d -> d.packetId == pubRecPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId); if (removed == null) { LOG.warn("Received a PUBREC with not matching packetId"); @@ -218,6 +219,7 @@ public void processPubRec(int pubRecPacketId) { public void processPubComp(int messageID) { // Message discarded, make sure any buffers in it are released + inflightTimeouts.removeIf(d -> d.packetId == messageID); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); if (removed == null) { LOG.warn("Received a PUBCOMP with not matching packetId"); @@ -343,6 +345,7 @@ private boolean inflightHasSlotsAndConnectionIsUp() { void pubAckReceived(int ackPacketId) { // TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged + inflightTimeouts.removeIf(d -> d.packetId == ackPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); if (removed == null) { LOG.warn("Received a PUBACK with not matching packetId"); @@ -495,6 +498,7 @@ public void cleanUp() { // in case of in memory session queues all contained messages // has to be released. sessionQueue.closeAndPurge(); + inflightTimeouts.clear(); for (EnqueuedMessage msg : inflightWindow.values()) { msg.release(); }