Skip to content

Commit

Permalink
Switch serializer of Retained messages in H2 store to also handle MQT…
Browse files Browse the repository at this point in the history
…T properties. (#828)

Switch serializer of Retained messages in H2 store to also handle MQTT properties.
Implement a new H2 DataType for Retained Message.
  • Loading branch information
andsel committed Apr 20, 2024
1 parent ac2a9be commit 073cd63
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;

import java.time.Instant;
Expand Down Expand Up @@ -44,17 +45,22 @@ public void cleanRetained(Topic topic) {
@Override
public void retain(Topic topic, MqttPublishMessage msg) {
byte[] rawPayload = payloadToByteArray(msg);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload, extractPropertiesArray(msg));
storage.put(topic, toStore);
}

@Override
public void retain(Topic topic, MqttPublishMessage msg, Instant expiryTime) {
byte[] rawPayload = payloadToByteArray(msg);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload, expiryTime);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload, extractPropertiesArray(msg), expiryTime);
storageExpire.put(topic, toStore);
}

private static MqttProperties.MqttProperty[] extractPropertiesArray(MqttPublishMessage msg) {
MqttProperties properties = msg.variableHeader().properties();
return properties.listAll().toArray(new MqttProperties.MqttProperty[0]);
}

private static byte[] payloadToByteArray(MqttPublishMessage msg) {
final ByteBuf payload = msg.content();
byte[] rawPayload = new byte[payload.readableBytes()];
Expand Down
12 changes: 3 additions & 9 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -532,8 +525,9 @@ private void publishRetainedMessagesForSubscriptions(String clientID, Collection
LOG.info("No retained messages matching topic filter {}", topicFilter);
continue;
}
MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(subscription, Collections.emptyList());

for (RetainedMessage retainedMsg : retainedMsgs) {
MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(subscription, Arrays.asList(retainedMsg.getMqttProperties()));
final MqttQoS retainedQos = retainedMsg.qosLevel();
MqttQoS qos = lowerQosToTheSubscriptionDesired(subscription, retainedQos);

Expand Down
15 changes: 11 additions & 4 deletions broker/src/main/java/io/moquette/broker/RetainedMessage.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
package io.moquette.broker;

import io.moquette.broker.subscriptions.Topic;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;

import java.io.Serializable;
import java.time.Instant;

public class RetainedMessage implements Serializable{
public class RetainedMessage /*implements Serializable*/{

private final Topic topic;
private final MqttQoS qos;
private final byte[] payload;
private final MqttProperties.MqttProperty[] properties;
private Instant expiryTime;

public RetainedMessage(Topic topic, MqttQoS qos, byte[] payload) {
public RetainedMessage(Topic topic, MqttQoS qos, byte[] payload, MqttProperties.MqttProperty[] properties) {
this.topic = topic;
this.qos = qos;
this.payload = payload;
this.properties = properties;
}

public RetainedMessage(Topic topic, MqttQoS qos, byte[] rawPayload, Instant expiryTime) {
this(topic, qos, rawPayload);
public RetainedMessage(Topic topic, MqttQoS qos, byte[] rawPayload, MqttProperties.MqttProperty[] properties, Instant expiryTime) {
this(topic, qos, rawPayload, properties);
this.expiryTime = expiryTime;
}

Expand All @@ -39,4 +42,8 @@ public byte[] getPayload() {
public Instant getExpiryTime() {
return expiryTime;
}

public MqttProperties.MqttProperty[] getMqttProperties() {
return properties;
}
}
120 changes: 116 additions & 4 deletions broker/src/main/java/io/moquette/persistence/H2RetainedRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@

import io.moquette.broker.IRetainedRepository;
import io.moquette.broker.RetainedMessage;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.BasicDataType;
import org.h2.mvstore.type.StringDataType;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -19,9 +26,14 @@ public class H2RetainedRepository implements IRetainedRepository {
private final MVMap<Topic, RetainedMessage> retainedMap;
private final MVMap<Topic, RetainedMessage> retainedExpireMap;

private final MVMap.Builder<Topic, RetainedMessage> retainedBuilder = new MVMap.Builder<Topic, RetainedMessage>()
.valueType(new RetainedMessageValueType());

public H2RetainedRepository(MVStore mvStore) {
this.retainedMap = mvStore.openMap("retained_store");
this.retainedExpireMap = mvStore.openMap("retained_expiry_store");
// this.retainedMap = mvStore.openMap("retained_store");
this.retainedMap = mvStore.openMap("retained_store", retainedBuilder);
// this.retainedExpireMap = mvStore.openMap("retained_expiry_store");
this.retainedExpireMap = mvStore.openMap("retained_expiry_store", retainedBuilder);
}

@Override
Expand All @@ -33,14 +45,19 @@ public void cleanRetained(Topic topic) {
@Override
public void retain(Topic topic, MqttPublishMessage msg) {
byte[] rawPayload = payloadToByteArray(msg);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload, extractPropertiesArray(msg));
retainedMap.put(topic, toStore);
}

private static MqttProperties.MqttProperty[] extractPropertiesArray(MqttPublishMessage msg) {
MqttProperties properties = msg.variableHeader().properties();
return properties.listAll().toArray(new MqttProperties.MqttProperty[0]);
}

@Override
public void retain(Topic topic, MqttPublishMessage msg, Instant expiryTime) {
byte[] rawPayload = payloadToByteArray(msg);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload, expiryTime);
final RetainedMessage toStore = new RetainedMessage(topic, msg.fixedHeader().qosLevel(), rawPayload, extractPropertiesArray(msg), expiryTime);
retainedExpireMap.put(topic, toStore);
}

Expand Down Expand Up @@ -81,4 +98,99 @@ private List<RetainedMessage> findMatching(Topic searchTopic, MVMap<Topic, Retai
public Collection<RetainedMessage> listExpirable() {
return retainedExpireMap.values();
}

private static final class RetainedMessageValueType extends BasicDataType<RetainedMessage> {
// Layout for RetainedMessage:
// - topic String
// - qos int
// - payload byte[]
// - flag map to say if contains properties, expiry time (MSB, LSB)
// - (opt) expiry time in epoch millis long
// - (opt) properties

private final PropertiesDataType propertiesDataType = new PropertiesDataType();

private static final byte MESSAGE_EXPIRY_FLAG = 0x01;
private static final byte PROPERTIES_FLAG = MESSAGE_EXPIRY_FLAG << 1;

@Override
public int getMemory(RetainedMessage retainedMsg) {
int bytesSize = StringDataType.INSTANCE.getMemory(retainedMsg.getTopic().toString()) +
1 + // qos, 1 byte
4 + retainedMsg.getPayload().length + // length + bytes
1; // flags
if (retainedMsg.getExpiryTime() != null) {
bytesSize += 8; // long
}
int propertiesSize = retainedMsg.getMqttProperties().length > 0 ?
propertiesDataType.getMemory(retainedMsg.getMqttProperties()) :
0;

return bytesSize + propertiesSize;
}

@Override
public void write(WriteBuffer buff, RetainedMessage retainedMsg) {
StringDataType.INSTANCE.write(buff, retainedMsg.getTopic().toString());
buff.put((byte) retainedMsg.qosLevel().value());
buff.putInt(retainedMsg.getPayload().length);
buff.put(retainedMsg.getPayload());

byte flagsBitmask = 0x00;
if (retainedMsg.getExpiryTime() != null) {
flagsBitmask = (byte) (flagsBitmask | MESSAGE_EXPIRY_FLAG);
}
if (retainedMsg.getMqttProperties().length > 0) {
flagsBitmask = (byte) (flagsBitmask | PROPERTIES_FLAG);
}

buff.put(flagsBitmask);

if (retainedMsg.getExpiryTime() != null) {
buff.putLong(retainedMsg.getExpiryTime().toEpochMilli());
}

if (retainedMsg.getMqttProperties().length > 0) {
propertiesDataType.write(buff, retainedMsg.getMqttProperties());
}
}

@Override
public RetainedMessage read(ByteBuffer buff) {
final String topicStr = StringDataType.INSTANCE.read(buff);
final MqttQoS qos = MqttQoS.valueOf(buff.get());

final int payloadSize = buff.getInt();
byte[] payload = new byte[payloadSize];
buff.get(payload);

final byte flags = buff.get();

final Instant expiry;
if ((flags & MESSAGE_EXPIRY_FLAG) > 0) {
long millis = buff.getLong();
expiry = Instant.ofEpochMilli(millis);
} else {
expiry = null;
}

final MqttProperties.MqttProperty[] mqttProperties;
if ((flags & PROPERTIES_FLAG) > 0) {
mqttProperties = propertiesDataType.read(buff);
} else {
mqttProperties = new MqttProperties.MqttProperty[0];
}

if ((flags & MESSAGE_EXPIRY_FLAG) > 0) {
return new RetainedMessage(new Topic(topicStr), qos, payload, mqttProperties, expiry);
} else {
return new RetainedMessage(new Topic(topicStr), qos, payload, mqttProperties);
}
}

@Override
public RetainedMessage[] createStorage(int size) {
return new RetainedMessage[size];
}
}
}

0 comments on commit 073cd63

Please sign in to comment.