Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update message expiry remaining time and drop queued messages if expired #823

Merged
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Version 0.18-SNAPSHOT:
[feature] message expiry interval: (issue #818)
- Implements the management of message expiry for retained part. (#819)
- Avoid to publish messages that has elapsed its expire property. (#822)
- Update the message expiry property remaining seconds when a publish is forwarded. (#823)
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
Expand Down
4 changes: 2 additions & 2 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return PostOffice.RouteResult.failed(clientId);
}

final Instant expiry = extractExpiryFromPropery(msg);
final Instant expiry = extractExpiryFromProperty(msg);

// retain else msg is cleaned by the NewNettyMQTTHandler and is not available
// in execution by SessionEventLoop
Expand Down Expand Up @@ -672,7 +672,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
}
}

private Instant extractExpiryFromPropery(MqttPublishMessage msg) {
private Instant extractExpiryFromProperty(MqttPublishMessage msg) {
MqttProperties.MqttProperty expiryProp = msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (evt instanceof IdleStateEvent) {
IdleState e = ((IdleStateEvent) evt).state();
if (e == IdleState.READER_IDLE) {
LOG.info("Firing channel inactive event. MqttClientId = {}.", NettyUtils.clientID(ctx.channel()));
LOG.warn("Close channel because it's inactive, passed keep alive. MqttClientId = {}.", NettyUtils.clientID(ctx.channel()));
// fire a close that then fire channelInactive to trigger publish of Will
ctx.close().addListener(CLOSE_ON_FAILURE);
}
Expand Down
27 changes: 15 additions & 12 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@

import java.net.InetSocketAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -269,9 +262,11 @@ private void sendPublishQos0(PublishedMessage publishRequest) {
LOG.debug("Sending publish at QoS0 already expired, drop it");
return;
}

MqttProperties.MqttProperty[] mqttProperties = publishRequest.updatePublicationExpiryIfPresentOrAdd();
MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage(publishRequest.getTopic().toString(),
publishRequest.getPublishingQos(), publishRequest.getPayload(), 0,
publishRequest.retained, false, publishRequest.mqttProperties);
publishRequest.retained, false, mqttProperties);
mqttConnection.sendPublish(publishMsg);
}

Expand All @@ -281,7 +276,7 @@ private void sendPublishQos1(PublishedMessage publishRequest) {
return;
}
if (publishRequest.isExpired()) {
LOG.debug("Sending publish at QoS1 already expired, drop it");
LOG.debug("Sending publish at QoS1 already expired, expected to happen before {}, drop it", publishRequest.messageExpiry);
return;
}

Expand All @@ -307,6 +302,8 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect
inflightSlots.decrementAndGet();
int packetId = localMqttConnectionRef.nextPacketId();

LOG.debug("Adding into inflight for session {} at QoS {}", getClientID(), publishRequest.getPublishingQos());

EnqueuedMessage old = inflightWindow.put(packetId, publishRequest);
// If there already was something, release it.
if (old != null) {
Expand All @@ -316,9 +313,11 @@ private void sendPublishInFlightWindowOrQueueing(MQTTConnection localMqttConnect
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}

MqttProperties.MqttProperty[] mqttProperties = publishRequest.updatePublicationExpiryIfPresentOrAdd();
MqttPublishMessage publishMsg = MQTTConnection.createPublishMessage(
publishRequest.topic.toString(), publishRequest.getPublishingQos(),
publishRequest.payload, packetId, publishRequest.retained, false, publishRequest.mqttProperties);
publishRequest.payload, packetId, publishRequest.retained, false, mqttProperties);
localMqttConnectionRef.sendPublish(publishMsg);

drainQueueToConnection();
Expand Down Expand Up @@ -352,6 +351,7 @@ void pubAckReceived(int ackPacketId) {
removed.release();

inflightSlots.incrementAndGet();
LOG.debug("Received PUBACK {} for session {}", ackPacketId, getClientID());
drainQueueToConnection();
}

Expand Down Expand Up @@ -440,12 +440,15 @@ private void drainQueueToConnection() {
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
}

MqttProperties.MqttProperty[] mqttProperties = msgPub.updatePublicationExpiryIfPresentOrAdd();

MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(
msgPub.topic.toString(),
msgPub.publishingQos,
msgPub.payload,
sendPacketId,
msgPub.mqttProperties);
mqttProperties);
mqttConnection.sendPublish(publishMsg);

// we fetched msg from a map, but the release is cancelled out by the above retain
Expand Down
62 changes: 62 additions & 0 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,71 @@ public MqttProperties.MqttProperty[] getMqttProperties() {
public boolean isExpired() {
return messageExpiry != Instant.MAX && Instant.now().isAfter(messageExpiry);
}

public MqttProperties.MqttProperty[] updatePublicationExpiryIfPresentOrAdd() {
if (messageExpiry == Instant.MAX) {
return mqttProperties;
}

Duration duration = Duration.between(Instant.now(), messageExpiry);
// do some math rounding so that 2.9999 seconds remains 3 seconds
long remainingSeconds = Math.round(duration.toMillis() / 1_000.0);
final int indexOfExpiry = findPublicationExpiryProperty(mqttProperties);
MqttProperties.IntegerProperty updatedProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), (int) remainingSeconds);

// update existing property
if (indexOfExpiry != -1) {
mqttProperties[indexOfExpiry] = updatedProperty;
return mqttProperties;
}

// insert a new property
MqttProperties.MqttProperty[] newProperties = Arrays.copyOf(mqttProperties, mqttProperties.length + 1);
newProperties[newProperties.length - 1] = updatedProperty;
return newProperties;
}

/**
* Linear search of PUBLICATION_EXPIRY_INTERVAL.
* @param properties the array of properties.
* @return the index of matched property or -1.
* */
private static int findPublicationExpiryProperty(MqttProperties.MqttProperty[] properties) {
for (int i = 0; i < properties.length; i++) {
if (isPublicationExpiryProperty(properties[i])) {
return i;
}
}
return -1;
}

private static boolean isPublicationExpiryProperty(MqttProperties.MqttProperty property) {
return property instanceof MqttProperties.IntegerProperty &&
property.propertyId() == MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value();
}

public Instant getMessageExpiry() {
return messageExpiry;
}

@Override
public String toString() {
return "PublishedMessage{" +
"topic=" + topic +
", publishingQos=" + publishingQos +
", payload=" + payload +
", retained=" + retained +
", messageExpiry=" + messageExpiry +
", mqttProperties=" + Arrays.toString(mqttProperties) +
'}';
}
}

public static final class PubRelMarker extends EnqueuedMessage {
@Override
public String toString() {
return "PubRelMarker{}";
}
}

public enum CreationModeEnum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.unsafequeues.Queue;
import io.moquette.broker.unsafequeues.QueueException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Optional;

public class SegmentPersistentQueue extends AbstractSessionMessageQueue<SessionRegistry.EnqueuedMessage> {

private static final Logger LOG = LoggerFactory.getLogger(SegmentPersistentQueue.class);

private final Queue segmentedQueue;
private final SegmentedPersistentQueueSerDes serdes = new SegmentedPersistentQueueSerDes();

Expand All @@ -19,6 +23,7 @@ public SegmentPersistentQueue(Queue segmentedQueue) {

@Override
public void enqueue(SessionRegistry.EnqueuedMessage message) {
LOG.debug("Adding message {}", message);
checkEnqueuePreconditions(message);

final ByteBuffer payload = serdes.toBytes(message);
Expand All @@ -40,11 +45,14 @@ public SessionRegistry.EnqueuedMessage dequeue() {
throw new RuntimeException(e);
}
if (!dequeue.isPresent()) {
LOG.debug("No data pulled out from the queue");
return null;
}

final ByteBuffer content = dequeue.get();
return serdes.fromBytes(content);
SessionRegistry.EnqueuedMessage message = serdes.fromBytes(content);
LOG.debug("Retrieved message {}", message);
return message;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) {
final String topic = casted.getTopic().toString();

writeTopic(buff, topic);
writeMessageExpiry(buff, casted.getMessageExpiry());
writePayload(buff, casted.getPayload());
if (EnqueuedMessageValueType.hasProperties(casted)) {
buff.put((byte) 1); // there are properties
Expand All @@ -49,6 +50,10 @@ private void write(SessionRegistry.EnqueuedMessage obj, ByteBuffer buff) {
}
}

private void writeMessageExpiry(ByteBuffer buff, Instant messageExpiry) {
writeString(buff, messageExpiry.toString());
}

private void writePayload(ByteBuffer target, ByteBuf source) {
final int payloadSize = source.readableBytes();
byte[] rawBytes = new byte[payloadSize];
Expand Down Expand Up @@ -112,6 +117,7 @@ private int getMemory(SessionRegistry.EnqueuedMessage obj) {
return 1 + // message type
1 + // qos
topicMemorySize(casted.getTopic()) +
messageExpirySize(casted.getMessageExpiry()) +
payloadMemorySize(casted.getPayload()) +
1 + // flag to indicate if there are MQttProperties or not
propertiesSize;
Expand All @@ -127,6 +133,11 @@ private int topicMemorySize(Topic topic) {
topic.toString().getBytes(StandardCharsets.UTF_8).length;
}

private int messageExpirySize(Instant messageExpiry) {
return 4 + // size
messageExpiry.toString().getBytes(StandardCharsets.UTF_8).length;
}

private int propertiesMemorySize(MqttProperties.MqttProperty[] properties) {
return 4 + // integer containing the number of properties
Arrays.stream(properties).mapToInt(SegmentedPersistentQueueSerDes::propertyMemorySize).sum();
Expand Down Expand Up @@ -164,12 +175,13 @@ public SessionRegistry.EnqueuedMessage fromBytes(ByteBuffer buff) {
} else if (messageType == MessageType.PUBLISHED_MESSAGE.ordinal()) {
final MqttQoS qos = MqttQoS.valueOf(buff.get());
final String topicStr = readTopic(buff);
final Instant messageExpiry = readExpiry(buff);
final ByteBuf payload = readPayload(buff);
if (SerdesUtils.containsProperties(buff)) {
MqttProperties.MqttProperty[] mqttProperties = readProperties(buff);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, Instant.MAX, mqttProperties);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, messageExpiry, mqttProperties);
} else {
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, Instant.MAX);
return new SessionRegistry.PublishedMessage(Topic.asTopic(topicStr), qos, payload, false, messageExpiry);
}
} else {
throw new IllegalArgumentException("Can't recognize record of type: " + messageType);
Expand All @@ -185,12 +197,24 @@ private MqttProperties.MqttProperty readProperty(ByteBuffer buff) {
}

private String readTopic(ByteBuffer buff) {
return readString(buff);
}

private static String readString(ByteBuffer buff) {
final int stringLen = buff.getInt();
final byte[] rawString = new byte[stringLen];
buff.get(rawString);
return new String(rawString, StandardCharsets.UTF_8);
}

private Instant readExpiry(ByteBuffer buff) {
final String expiryText = readString(buff);
if (Instant.MAX.toString().equals(expiryText)) {
return Instant.MAX;
}
return Instant.parse(expiryText);
}

private ByteBuf readPayload(ByteBuffer buff) {
return Unpooled.wrappedBuffer(readByteArray(buff));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,9 @@ void connectLowLevel() {
MqttConnAckMessage connAck = lowLevelClient.connectV5();
assertConnectionAccepted(connAck, "Connection must be accepted");
}

void connectLowLevel(int keepAliveSecs) {
MqttConnAckMessage connAck = lowLevelClient.connectV5(keepAliveSecs);
assertConnectionAccepted(connAck, "Connection must be accepted");
}
}
Loading
Loading