From 3070946e853ae573e9da755a9de694183bff8f94 Mon Sep 17 00:00:00 2001 From: andsel Date: Sun, 6 Jun 2021 17:08:25 +0200 Subject: [PATCH] Fix, avoid to increment inflight window counter in case of invalid ACK with invalid packet ID is received --- .../main/java/io/moquette/broker/Session.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 7e1323407..156127735 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -184,20 +184,24 @@ boolean isClean() { return clean; } - public void processPubRec(int packetId) { + public void processPubRec(int pubRecPacketId) { // Message discarded, make sure any buffers in it are released - SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(packetId); - if (removed != null) { - removed.release(); + SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(pubRecPacketId); + if (removed == null) { + LOG.warn("Received a PUBREC with not matching packetId"); + return; + } + if (removed instanceof SessionRegistry.PubRelMarker) { + LOG.info("Received a PUBREC for packetId that was already moved in second step of Qos2"); + return; } inflightSlots.incrementAndGet(); if (canSkipQueue()) { inflightSlots.decrementAndGet(); - int pubRelPacketId = packetId/*mqttConnection.nextPacketId()*/; - inflightWindow.put(pubRelPacketId, new SessionRegistry.PubRelMarker()); - inflightTimeouts.add(new InFlightPacket(pubRelPacketId, FLIGHT_BEFORE_RESEND_MS)); - MqttMessage pubRel = MQTTConnection.pubrel(pubRelPacketId); + inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker()); + inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS)); + MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId); mqttConnection.sendIfWritableElseDrop(pubRel); drainQueueToConnection(); @@ -209,12 +213,12 @@ public void processPubRec(int packetId) { public void processPubComp(int messageID) { // Message discarded, make sure any buffers in it are released SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(messageID); - if (removed != null) { - removed.release(); + if (removed == null) { + LOG.warn("Received a PUBCOMP with not matching packetId"); + return; } - + removed.release(); inflightSlots.incrementAndGet(); - drainQueueToConnection(); // TODO notify the interceptor @@ -323,9 +327,11 @@ private boolean inflighHasSlotsAndConnectionIsUp() { void pubAckReceived(int ackPacketId) { // TODO remain to invoke in somehow m_interceptor.notifyMessageAcknowledged SessionRegistry.EnqueuedMessage removed = inflightWindow.remove(ackPacketId); - if (removed != null) { - removed.release(); + if (removed == null) { + LOG.warn("Received a PUBACK with not matching packetId"); + return; } + removed.release(); inflightSlots.incrementAndGet(); drainQueueToConnection();