diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 364722486..38100c712 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -192,6 +192,7 @@ boolean isClean() { public void processPubRec(int pubRecPacketId) { // Message discarded, make sure any buffers in it are released + inflightTimeouts.removeIf(d -> d.packetId == pubRecPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId); if (removed == null) { LOG.warn("Received a PUBREC with not matching packetId"); @@ -218,6 +219,7 @@ public void processPubRec(int pubRecPacketId) { public void processPubComp(int messageID) { // Message discarded, make sure any buffers in it are released + inflightTimeouts.removeIf(d -> d.packetId == messageID); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); if (removed == null) { LOG.warn("Received a PUBCOMP with not matching packetId"); @@ -343,6 +345,7 @@ private boolean inflightHasSlotsAndConnectionIsUp() { void pubAckReceived(int ackPacketId) { // TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged + inflightTimeouts.removeIf(d -> d.packetId == ackPacketId); SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); if (removed == null) { LOG.warn("Received a PUBACK with not matching packetId"); @@ -495,6 +498,7 @@ public void cleanUp() { // in case of in memory session queues all contained messages // has to be released. sessionQueue.closeAndPurge(); + inflightTimeouts.clear(); for (EnqueuedMessage msg : inflightWindow.values()) { msg.release(); }