Skip to content

Commit

Permalink
Extracted same code used in both sendPublishQos1 and sendPublishQos2 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Jan 20, 2024
1 parent 4973627 commit cfca15e
Showing 1 changed file with 10 additions and 35 deletions.
45 changes: 10 additions & 35 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,45 +282,19 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload, boolean
return;
}

// TODO From this down is identical to sendPublishQos2
final MQTTConnection localMqttConnectionRef = mqttConnection;
if (canSkipQueue(localMqttConnectionRef)) {
inflightSlots.decrementAndGet();
int packetId = localMqttConnectionRef.nextPacketId();

// Adding to a map, retain.
payload.retain();
EnqueuedMessage old = inflightWindow.put(packetId, new PublishedMessage(topic, qos, payload, retained, mqttProperties));
// If there already was something, release it.
if (old != null) {
old.release();
inflightSlots.incrementAndGet();
}
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}

MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos,
payload, packetId, mqttProperties);
localMqttConnectionRef.sendPublish(publishMsg);
LOG.debug("Write direct to the peer, inflight slots: {}", inflightSlots.get());
if (inflightSlots.get() == 0) {
localMqttConnectionRef.flush();
}

// TODO drainQueueToConnection();?
} else {
final SessionRegistry.PublishedMessage msg = new SessionRegistry.PublishedMessage(topic, qos, payload, retained, mqttProperties);
// Adding to a queue, retain.
msg.retain();
sessionQueue.enqueue(msg);
LOG.debug("Enqueue to peer session");
}
sendPublishInFlightWindowOrQueueing(topic, qos, payload, retained, localMqttConnectionRef, mqttProperties);
}

private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload, boolean retained,
MqttProperties.MqttProperty... mqttProperties) {
final MQTTConnection localMqttConnectionRef = mqttConnection;
sendPublishInFlightWindowOrQueueing(topic, qos, payload, retained, localMqttConnectionRef, mqttProperties);
}

private void sendPublishInFlightWindowOrQueueing(Topic topic, MqttQoS qos, ByteBuf payload, boolean retained,
MQTTConnection localMqttConnectionRef,
MqttProperties.MqttProperty... mqttProperties) {
if (canSkipQueue(localMqttConnectionRef)) {
inflightSlots.decrementAndGet();
int packetId = localMqttConnectionRef.nextPacketId();
Expand All @@ -337,15 +311,16 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload, boolean
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}
MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos,
payload, packetId, mqttProperties);
payload, packetId, mqttProperties);
localMqttConnectionRef.sendPublish(publishMsg);

drainQueueToConnection();
} else {
final SessionRegistry.PublishedMessage msg = new PublishedMessage(topic, qos, payload, retained, mqttProperties);
final PublishedMessage msg = new PublishedMessage(topic, qos, payload, retained, mqttProperties);
// Adding to a queue, retain.
msg.retain();
sessionQueue.enqueue(msg);
LOG.debug("Enqueue to peer session {} at QoS {}", getClientID(), qos);
}
}

Expand Down

0 comments on commit cfca15e

Please sign in to comment.