From d784d67694e172e9ed17aee79d196d559b594e1e Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Wed, 1 May 2024 11:34:57 +0200 Subject: [PATCH] Reduced memory use on busy Sessions by removing handled items from inflightTimeouts (#834) Items in the inflightTimeouts DelayQueue were only ever removed when they timed out. But in normal operation the related messages would have been handled long before that. There should only ever be a number equal to the number of inflightSlots in the queue, but the queue would grow to the maximum number of messages ever handled in a 5 second interval. This made each session take much more memory than needed. --- broker/src/main/java/io/moquette/broker/Session.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 364722486..df631e63f 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -192,6 +192,7 @@ boolean isClean() { public void processPubRec(int pubRecPacketId) { // Message discarded, make sure any buffers in it are released + cleanFromInflight(pubRecPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId); if (removed == null) { LOG.warn("Received a PUBREC with not matching packetId"); @@ -218,6 +219,7 @@ public void processPubRec(int pubRecPacketId) { public void processPubComp(int messageID) { // Message discarded, make sure any buffers in it are released + cleanFromInflight(messageID); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); if (removed == null) { LOG.warn("Received a PUBCOMP with not matching packetId"); @@ -343,6 +345,7 @@ private boolean inflightHasSlotsAndConnectionIsUp() { void pubAckReceived(int ackPacketId) { // TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged + cleanFromInflight(ackPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); if (removed == null) { LOG.warn("Received a PUBACK with not matching packetId"); @@ -355,6 +358,10 @@ void pubAckReceived(int ackPacketId) { drainQueueToConnection(); } + private void cleanFromInflight(int ackPacketId) { + inflightTimeouts.removeIf(d -> d.packetId == ackPacketId); + } + public void flushAllQueuedMessages() { drainQueueToConnection(); } @@ -495,6 +502,7 @@ public void cleanUp() { // in case of in memory session queues all contained messages // has to be released. sessionQueue.closeAndPurge(); + inflightTimeouts.clear(); for (EnqueuedMessage msg : inflightWindow.values()) { msg.release(); }