From cfca15e44584bc044a1defec0493e94997c01d48 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Sat, 20 Jan 2024 14:52:59 +0100 Subject: [PATCH] Extracted same code used in both sendPublishQos1 and sendPublishQos2 (#809) --- .../main/java/io/moquette/broker/Session.java | 45 +++++-------------- 1 file changed, 10 insertions(+), 35 deletions(-) diff --git a/broker/src/main/java/io/moquette/broker/Session.java b/broker/src/main/java/io/moquette/broker/Session.java index 4e545e8a9..860f615ec 100644 --- a/broker/src/main/java/io/moquette/broker/Session.java +++ b/broker/src/main/java/io/moquette/broker/Session.java @@ -282,45 +282,19 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload, boolean return; } - // TODO From this down is identical to sendPublishQos2 final MQTTConnection localMqttConnectionRef = mqttConnection; - if (canSkipQueue(localMqttConnectionRef)) { - inflightSlots.decrementAndGet(); - int packetId = localMqttConnectionRef.nextPacketId(); - - // Adding to a map, retain. - payload.retain(); - EnqueuedMessage old = inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload, retained, mqttProperties)); - // If there already was something, release it. - if (old != null) { - old.release(); - inflightSlots.incrementAndGet(); - } - if (resendInflightOnTimeout) { - inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); - } - - MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos, - payload, packetId, mqttProperties); - localMqttConnectionRef.sendPublish(publishMsg); - LOG.debug("Write direct to the peer, inflight slots: {}", inflightSlots.get()); - if (inflightSlots.get() == 0) { - localMqttConnectionRef.flush(); - } - - // TODO drainQueueToConnection();? - } else { - final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload, retained, mqttProperties); - // Adding to a queue, retain. - msg.retain(); - sessionQueue.enqueue(msg); - LOG.debug("Enqueue to peer session"); - } + sendPublishInFlightWindowOrQueueing(topic, qos, payload, retained, localMqttConnectionRef, mqttProperties); } private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload, boolean retained, MqttProperties.MqttProperty... mqttProperties) { final MQTTConnection localMqttConnectionRef = mqttConnection; + sendPublishInFlightWindowOrQueueing(topic, qos, payload, retained, localMqttConnectionRef, mqttProperties); + } + + private void sendPublishInFlightWindowOrQueueing(Topic topic, MqttQoS qos, ByteBuf payload, boolean retained, + MQTTConnection localMqttConnectionRef, + MqttProperties.MqttProperty... mqttProperties) { if (canSkipQueue(localMqttConnectionRef)) { inflightSlots.decrementAndGet(); int packetId = localMqttConnectionRef.nextPacketId(); @@ -337,15 +311,16 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload, boolean inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS)); } MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos, - payload, packetId, mqttProperties); + payload, packetId, mqttProperties); localMqttConnectionRef.sendPublish(publishMsg); drainQueueToConnection(); } else { - final SessionRegistry.PublishedMessage msg = new PublishedMessage(topic, qos, payload, retained, mqttProperties); + final PublishedMessage msg = new PublishedMessage(topic, qos, payload, retained, mqttProperties); // Adding to a queue, retain. msg.retain(); sessionQueue.enqueue(msg); + LOG.debug("Enqueue to peer session {} at QoS {}", getClientID(), qos); } }