Skip to content

Commit

Permalink
Payload format verification and content type properties handling (#826)
Browse files Browse the repository at this point in the history
Updates the processing of publish messages (for all QoS) to verify the payload to be encoded in UTF-8 when property `payload format indicator` is set. If the validation fails, return an acknowledgement message (PUBACK for QoS1 and PUBREC for QoS2, and DISCONNECT for QoS0) with an appropriate return code.
Manage also `content_type` property.
  • Loading branch information
andsel committed May 25, 2024
1 parent 1264ce6 commit 031e77b
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 14 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Manage payload format indicator property, when set verify payload format. (#826)
[refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827)
[feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806)
[feature] message expiry interval: (issue #818)
Expand Down
28 changes: 27 additions & 1 deletion broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
if (!isBoundToSession()) {
return null;
}
postOffice.receivedPublishQos0(username, clientId, msg, expiry);
postOffice.receivedPublishQos0(this, username, clientId, msg, expiry);
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
Expand Down Expand Up @@ -693,6 +693,17 @@ void sendPubRec(int messageID) {
sendIfWritableElseDrop(pubRecMessage);
}

void sendPubRec(int messageID, MqttReasonCodes.PubRec reasonCode) {
LOG.trace("sendPubRec for messageID: {}, reason code: {}", messageID, reasonCode);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE,
false, 0);
MqttPubReplyMessageVariableHeader variableHeader = new MqttPubReplyMessageVariableHeader(messageID,
reasonCode.byteValue(), MqttProperties.NO_PROPERTIES);
MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, variableHeader);

sendIfWritableElseDrop(pubRecMessage);
}

private void processPubRel(MqttMessage msg) {
final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
final String clientID = bindedSession.getClientID();
Expand Down Expand Up @@ -764,6 +775,17 @@ void sendPubAck(int messageID) {
sendIfWritableElseDrop(pubAckMessage);
}

void sendPubAck(int messageID, MqttReasonCodes.PubAck reasonCode) {
LOG.trace("sendPubAck for messageID: {}, reason code: {}", messageID, reasonCode);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE,
false, 0);
MqttPubReplyMessageVariableHeader variableHeader = new MqttPubReplyMessageVariableHeader(messageID,
reasonCode.byteValue(), MqttProperties.NO_PROPERTIES);
MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader);

sendIfWritableElseDrop(pubAckMessage);
}

private void sendPubCompMessage(int messageID) {
LOG.trace("Sending PUBCOMP message messageId: {}", messageID);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
Expand Down Expand Up @@ -867,4 +889,8 @@ void brokerDisconnect(MqttReasonCodes.Disconnect reasonCode) {
channel.writeAndFlush(disconnectMsg)
.addListener(ChannelFutureListener.CLOSE);
}

void disconnectSession() {
bindedSession.disconnect();
}
}
100 changes: 99 additions & 1 deletion broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
Expand All @@ -42,6 +43,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -612,14 +616,26 @@ public void unsubscribe(List<String> topics, MQTTConnection mqttConnection, int
mqttConnection.sendUnsubAckMessage(topics, clientID, messageId);
}

CompletableFuture<Void> receivedPublishQos0(String username, String clientID, MqttPublishMessage msg,
CompletableFuture<Void> receivedPublishQos0(MQTTConnection connection, String username, String clientID, MqttPublishMessage msg,
Instant messageExpiry) {
final Topic topic = new Topic(msg.variableHeader().topicName());
if (!authorizator.canWrite(topic, username, clientID)) {
LOG.error("client is not authorized to publish on topic: {}", topic);
ReferenceCountUtil.release(msg);
return CompletableFuture.completedFuture(null);
}

if (isPayloadFormatToValidate(msg)) {
if (!validatePayloadAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS0)");
ReferenceCountUtil.release(msg);
connection.brokerDisconnect(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID);
connection.disconnectSession();
connection.dropConnection();
return CompletableFuture.completedFuture(null);
}
}

final RoutingResults publishResult = publish2Subscribers(clientID, messageExpiry, msg);
if (publishResult.isAllFailed()) {
LOG.info("No one publish was successfully enqueued to session loops");
Expand Down Expand Up @@ -656,6 +672,28 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i
return RoutingResults.preroutingError();
}

if (isPayloadFormatToValidate(msg)) {
if (!validatePayloadAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS1)");
connection.sendPubAck(messageID, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);

ReferenceCountUtil.release(msg);
return RoutingResults.preroutingError();
}
}

if (isContentTypeToValidate(msg)) {
if (!validateContentTypeAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 content type (QoS1)");
ReferenceCountUtil.release(msg);
connection.brokerDisconnect(MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
connection.disconnectSession();
connection.dropConnection();

return RoutingResults.preroutingError();
}
}

final RoutingResults routes;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
Expand Down Expand Up @@ -683,6 +721,20 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i
return routes;
}

private static boolean validatePayloadAsUTF8(MqttPublishMessage msg) {
byte[] rawPayload = Utils.readBytesAndRewind(msg.payload());

boolean isValid = true;
try {
// Decoder instance is stateful so shouldn't be invoked concurrently, hence one instance per call.
// Possible optimization is to use one instance per thread.
StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(rawPayload));
} catch (CharacterCodingException ex) {
isValid = false;
}
return isValid;
}

private void manageRetain(Topic topic, MqttPublishMessage msg) {
if (isRetained(msg)) {
if (!msg.payload().isReadable()) {
Expand Down Expand Up @@ -924,6 +976,16 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage
}

final int messageID = msg.variableHeader().packetId();
if (isPayloadFormatToValidate(msg)) {
if (!validatePayloadAsUTF8(msg)) {
LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS2)");
connection.sendPubRec(messageID, MqttReasonCodes.PubRec.PAYLOAD_FORMAT_INVALID);

ReferenceCountUtil.release(msg);
return RoutingResults.preroutingError();
}
}

final RoutingResults publishRoutings;
if (msg.fixedHeader().isDup()) {
final Set<String> failedClients = failedPublishes.listFailed(clientId, messageID);
Expand Down Expand Up @@ -990,6 +1052,42 @@ private static boolean isRetained(MqttPublishMessage msg) {
return msg.fixedHeader().isRetain();
}

private static boolean isPayloadFormatToValidate(MqttPublishMessage msg) {
MqttProperties.MqttProperty payloadFormatProperty = msg.variableHeader().properties()
.getProperty(MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value());
if (payloadFormatProperty == null) {
return false;
}

if (payloadFormatProperty instanceof MqttProperties.IntegerProperty) {
return ((MqttProperties.IntegerProperty) payloadFormatProperty).value() == 1;
}
return false;
}

private static boolean isContentTypeToValidate(MqttPublishMessage msg) {
MqttProperties.MqttProperty contentTypeProperty = msg.variableHeader().properties()
.getProperty(MqttPropertyType.CONTENT_TYPE.value());
return contentTypeProperty != null;
}

private static boolean validateContentTypeAsUTF8(MqttPublishMessage msg) {
MqttProperties.StringProperty contentTypeProperty = (MqttProperties.StringProperty) msg.variableHeader().properties()
.getProperty(MqttPropertyType.CONTENT_TYPE.value());

byte[] rawPayload = contentTypeProperty.value().getBytes();

boolean isValid = true;
try {
// Decoder instance is stateful so shouldn't be invoked concurrently, hence one instance per call.
// Possible optimization is to use one instance per thread.
StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(rawPayload));
} catch (CharacterCodingException ex) {
isValid = false;
}
return isValid;
}

/**
* notify MqttConnectMessage after connection established (already pass login).
* @param msg
Expand Down
6 changes: 6 additions & 0 deletions broker/src/main/java/io/moquette/broker/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public static int messageId(MqttMessage msg) {
return ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId();
}

public static byte[] readBytesAndRewind(ByteBuf payload) {
byte[] payloadContent = new byte[payload.readableBytes()];
payload.getBytes(payload.readerIndex(), payloadContent, 0, payload.readableBytes());
return payloadContent;
}

public static MqttVersion versionFromConnect(MqttConnectMessage msg) {
return MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testPublishQoS0ToItself() throws ExecutionException, InterruptedExce

// Exercise
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID,
sut.receivedPublishQos0(connection, TEST_USER, FAKE_CLIENT_ID,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down Expand Up @@ -226,7 +226,7 @@ public void testPublishToMultipleSubscribers() throws ExecutionException, Interr

// Exercise
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID,
sut.receivedPublishQos0(connection, TEST_USER, FAKE_CLIENT_ID,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand All @@ -253,7 +253,7 @@ public void testPublishWithEmptyPayloadClearRetainedStore() throws ExecutionExce

// Exercise
final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID,
sut.receivedPublishQos0(connection, TEST_USER, FAKE_CLIENT_ID,
MqttMessageBuilders.publish()
.payload(anyPayload)
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down Expand Up @@ -432,7 +432,7 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() thr
// publish a QoS0 retained message
// Exercise
final ByteBuf qos0Payload = Unpooled.copiedBuffer("QoS0 payload", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, connection.getClientId(),
sut.receivedPublishQos0(connection, TEST_USER, connection.getClientId(),
MqttMessageBuilders.publish()
.payload(qos0Payload)
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public void testCleanSession_maintainClientSubscriptions() throws ExecutionExcep
assertEquals(1, subscriptions.size(), "After a reconnect, subscription MUST be still present");

final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testCleanSession_correctlyClientSubscriptions() throws ExecutionExce

// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload)
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn

// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand All @@ -165,7 +165,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn

// publish on /news
final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload2)
.qos(MqttQoS.AT_MOST_ONCE)
Expand All @@ -190,7 +190,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn
subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE);
// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand All @@ -210,7 +210,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn

// publish on /news
final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(null, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload2)
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down Expand Up @@ -288,7 +288,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() t
subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE);
// publish on /news
final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand All @@ -308,7 +308,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() t
subscribe(subscriberConnection, NEWS_TOPIC, AT_MOST_ONCE);
// publish on /news
final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world2!", Charset.defaultCharset());
sut.receivedPublishQos0(TEST_USER, TEST_PWD,
sut.receivedPublishQos0(null, TEST_USER, TEST_PWD,
MqttMessageBuilders.publish()
.payload(payload2.retainedDuplicate())
.qos(MqttQoS.AT_MOST_ONCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType)
assertEquals(mqttMessageType, received.fixedHeader().messageType());
}

static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer<Mqtt5Publish> assertion) throws InterruptedException {
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt5Publish> publishMessage = publishes.receive(1, TimeUnit.SECONDS);
if (!publishMessage.isPresent()) {
fail("Expected to receive a publish message");
return;
}
Mqtt5Publish msgPub = publishMessage.get();
assertion.accept(msgPub);
}
}

@NotNull
Mqtt5BlockingClient createSubscriberClient() {
String clientId = clientName();
Expand Down

0 comments on commit 031e77b

Please sign in to comment.