diff --git a/ChangeLog.txt b/ChangeLog.txt index b81845c93..d9151c7d9 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [feature] Manage payload format indicator property, when set verify payload format. (#826) [refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827) [feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806) [feature] message expiry interval: (issue #818) diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 0d0bcbd91..37453103f 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -637,7 +637,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) { return null; } - postOffice.receivedPublishQos0(username, clientId, msg, expiry); + postOffice.receivedPublishQos0(this, username, clientId, msg, expiry); return null; }).ifFailed(msg::release); case AT_LEAST_ONCE: @@ -693,6 +693,17 @@ void sendPubRec(int messageID) { sendIfWritableElseDrop(pubRecMessage); } + void sendPubRec(int messageID, MqttReasonCodes.PubRec reasonCode) { + LOG.trace("sendPubRec for messageID: {}, reason code: {}", messageID, reasonCode); + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, + false, 0); + MqttPubReplyMessageVariableHeader variableHeader = new MqttPubReplyMessageVariableHeader(messageID, + reasonCode.byteValue(), MqttProperties.NO_PROPERTIES); + MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, variableHeader); + + sendIfWritableElseDrop(pubRecMessage); + } + private void processPubRel(MqttMessage msg) { final int messageID = ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId(); final String clientID = bindedSession.getClientID(); @@ -764,6 +775,17 @@ void sendPubAck(int messageID) { sendIfWritableElseDrop(pubAckMessage); } + void sendPubAck(int messageID, MqttReasonCodes.PubAck reasonCode) { + LOG.trace("sendPubAck for messageID: {}, reason code: {}", messageID, reasonCode); + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, + false, 0); + MqttPubReplyMessageVariableHeader variableHeader = new MqttPubReplyMessageVariableHeader(messageID, + reasonCode.byteValue(), MqttProperties.NO_PROPERTIES); + MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader); + + sendIfWritableElseDrop(pubAckMessage); + } + private void sendPubCompMessage(int messageID) { LOG.trace("Sending PUBCOMP message messageId: {}", messageID); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0); @@ -867,4 +889,8 @@ void brokerDisconnect(MqttReasonCodes.Disconnect reasonCode) { channel.writeAndFlush(disconnectMsg) .addListener(ChannelFutureListener.CLOSE); } + + void disconnectSession() { + bindedSession.disconnect(); + } } diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 95c30ba63..5ff719ac5 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -33,6 +33,7 @@ import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttReasonCodes; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; @@ -42,6 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Duration; import java.time.Instant; @@ -612,7 +616,7 @@ public void unsubscribe(List topics, MQTTConnection mqttConnection, int mqttConnection.sendUnsubAckMessage(topics, clientID, messageId); } - CompletableFuture receivedPublishQos0(String username, String clientID, MqttPublishMessage msg, + CompletableFuture receivedPublishQos0(MQTTConnection connection, String username, String clientID, MqttPublishMessage msg, Instant messageExpiry) { final Topic topic = new Topic(msg.variableHeader().topicName()); if (!authorizator.canWrite(topic, username, clientID)) { @@ -620,6 +624,18 @@ CompletableFuture receivedPublishQos0(String username, String clientID, Mq ReferenceCountUtil.release(msg); return CompletableFuture.completedFuture(null); } + + if (isPayloadFormatToValidate(msg)) { + if (!validatePayloadAsUTF8(msg)) { + LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS0)"); + ReferenceCountUtil.release(msg); + connection.brokerDisconnect(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID); + connection.disconnectSession(); + connection.dropConnection(); + return CompletableFuture.completedFuture(null); + } + } + final RoutingResults publishResult = publish2Subscribers(clientID, messageExpiry, msg); if (publishResult.isAllFailed()) { LOG.info("No one publish was successfully enqueued to session loops"); @@ -656,6 +672,28 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i return RoutingResults.preroutingError(); } + if (isPayloadFormatToValidate(msg)) { + if (!validatePayloadAsUTF8(msg)) { + LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS1)"); + connection.sendPubAck(messageID, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID); + + ReferenceCountUtil.release(msg); + return RoutingResults.preroutingError(); + } + } + + if (isContentTypeToValidate(msg)) { + if (!validateContentTypeAsUTF8(msg)) { + LOG.warn("Received not valid UTF-8 content type (QoS1)"); + ReferenceCountUtil.release(msg); + connection.brokerDisconnect(MqttReasonCodes.Disconnect.PROTOCOL_ERROR); + connection.disconnectSession(); + connection.dropConnection(); + + return RoutingResults.preroutingError(); + } + } + final RoutingResults routes; if (msg.fixedHeader().isDup()) { final Set failedClients = failedPublishes.listFailed(clientId, messageID); @@ -683,6 +721,20 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, String username, i return routes; } + private static boolean validatePayloadAsUTF8(MqttPublishMessage msg) { + byte[] rawPayload = Utils.readBytesAndRewind(msg.payload()); + + boolean isValid = true; + try { + // Decoder instance is stateful so shouldn't be invoked concurrently, hence one instance per call. + // Possible optimization is to use one instance per thread. + StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(rawPayload)); + } catch (CharacterCodingException ex) { + isValid = false; + } + return isValid; + } + private void manageRetain(Topic topic, MqttPublishMessage msg) { if (isRetained(msg)) { if (!msg.payload().isReadable()) { @@ -924,6 +976,16 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage } final int messageID = msg.variableHeader().packetId(); + if (isPayloadFormatToValidate(msg)) { + if (!validatePayloadAsUTF8(msg)) { + LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS2)"); + connection.sendPubRec(messageID, MqttReasonCodes.PubRec.PAYLOAD_FORMAT_INVALID); + + ReferenceCountUtil.release(msg); + return RoutingResults.preroutingError(); + } + } + final RoutingResults publishRoutings; if (msg.fixedHeader().isDup()) { final Set failedClients = failedPublishes.listFailed(clientId, messageID); @@ -990,6 +1052,42 @@ private static boolean isRetained(MqttPublishMessage msg) { return msg.fixedHeader().isRetain(); } + private static boolean isPayloadFormatToValidate(MqttPublishMessage msg) { + MqttProperties.MqttProperty payloadFormatProperty = msg.variableHeader().properties() + .getProperty(MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value()); + if (payloadFormatProperty == null) { + return false; + } + + if (payloadFormatProperty instanceof MqttProperties.IntegerProperty) { + return ((MqttProperties.IntegerProperty) payloadFormatProperty).value() == 1; + } + return false; + } + + private static boolean isContentTypeToValidate(MqttPublishMessage msg) { + MqttProperties.MqttProperty contentTypeProperty = msg.variableHeader().properties() + .getProperty(MqttPropertyType.CONTENT_TYPE.value()); + return contentTypeProperty != null; + } + + private static boolean validateContentTypeAsUTF8(MqttPublishMessage msg) { + MqttProperties.StringProperty contentTypeProperty = (MqttProperties.StringProperty) msg.variableHeader().properties() + .getProperty(MqttPropertyType.CONTENT_TYPE.value()); + + byte[] rawPayload = contentTypeProperty.value().getBytes(); + + boolean isValid = true; + try { + // Decoder instance is stateful so shouldn't be invoked concurrently, hence one instance per call. + // Possible optimization is to use one instance per thread. + StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(rawPayload)); + } catch (CharacterCodingException ex) { + isValid = false; + } + return isValid; + } + /** * notify MqttConnectMessage after connection established (already pass login). * @param msg diff --git a/broker/src/main/java/io/moquette/broker/Utils.java b/broker/src/main/java/io/moquette/broker/Utils.java index 0107932c8..1bbb173e4 100644 --- a/broker/src/main/java/io/moquette/broker/Utils.java +++ b/broker/src/main/java/io/moquette/broker/Utils.java @@ -41,6 +41,12 @@ public static int messageId(MqttMessage msg) { return ((MqttMessageIdVariableHeader) msg.variableHeader()).messageId(); } + public static byte[] readBytesAndRewind(ByteBuf payload) { + byte[] payloadContent = new byte[payload.readableBytes()]; + payload.getBytes(payload.readerIndex(), payloadContent, 0, payload.readableBytes()); + return payloadContent; + } + public static MqttVersion versionFromConnect(MqttConnectMessage msg) { return MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version()); } diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index d6942ee5c..b3314fb6b 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -124,7 +124,7 @@ public void testPublishQoS0ToItself() throws ExecutionException, InterruptedExce // Exercise final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID, + sut.receivedPublishQos0(connection, TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -226,7 +226,7 @@ public void testPublishToMultipleSubscribers() throws ExecutionException, Interr // Exercise final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID, + sut.receivedPublishQos0(connection, TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -253,7 +253,7 @@ public void testPublishWithEmptyPayloadClearRetainedStore() throws ExecutionExce // Exercise final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID, + sut.receivedPublishQos0(connection, TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_MOST_ONCE) @@ -432,7 +432,7 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() thr // publish a QoS0 retained message // Exercise final ByteBuf qos0Payload = Unpooled.copiedBuffer("QoS0 payload", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, connection.getClientId(), + sut.receivedPublishQos0(connection, TEST_USER, connection.getClientId(), MqttMessageBuilders.publish() .payload(qos0Payload) .qos(MqttQoS.AT_MOST_ONCE) diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index aa34a7eb4..f0b25dcff 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -255,7 +255,7 @@ public void testCleanSession_maintainClientSubscriptions() throws ExecutionExcep assertEquals(1, subscriptions.size(), "After a reconnect, subscription MUST be still present"); final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -300,7 +300,7 @@ public void testCleanSession_correctlyClientSubscriptions() throws ExecutionExce // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload) .qos(MqttQoS.AT_MOST_ONCE) diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 612899702..13bda420f 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -152,7 +152,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -165,7 +165,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2) .qos(MqttQoS.AT_MOST_ONCE) @@ -190,7 +190,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -210,7 +210,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(null, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2) .qos(MqttQoS.AT_MOST_ONCE) @@ -288,7 +288,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() t subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(connection, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -308,7 +308,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() t subscribe(subscriberConnection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world2!", Charset.defaultCharset()); - sut.receivedPublishQos0(TEST_USER, TEST_PWD, + sut.receivedPublishQos0(null, TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java index b0aa8af29..848915d29 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -91,6 +91,18 @@ static void verifyOfType(MqttMessage received, MqttMessageType mqttMessageType) assertEquals(mqttMessageType, received.fixedHeader().messageType()); } + static void verifyPublishMessage(Mqtt5BlockingClient subscriber, Consumer assertion) throws InterruptedException { + try (Mqtt5BlockingClient.Mqtt5Publishes publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL)) { + Optional publishMessage = publishes.receive(1, TimeUnit.SECONDS); + if (!publishMessage.isPresent()) { + fail("Expected to receive a publish message"); + return; + } + Mqtt5Publish msgPub = publishMessage.get(); + assertion.accept(msgPub); + } + } + @NotNull Mqtt5BlockingClient createSubscriberClient() { String clientId = clientName(); diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java new file mode 100644 index 000000000..267bd7e96 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ContentTypeTest.java @@ -0,0 +1,108 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.integration.mqtt5; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode; +import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.*; +import io.netty.handler.codec.mqtt.MqttProperties.BinaryProperty; +import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty; +import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType; +import org.eclipse.paho.mqttv5.client.IMqttMessageListener; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttSubscription; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.shadow.com.univocity.parsers.common.processor.InputValueSwitch; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static org.junit.jupiter.api.Assertions.*; + +public class ContentTypeTest extends AbstractServerIntegrationTest { + + // second octet is invalid + public static final byte[] INVALID_UTF_8_BYTES = new byte[]{(byte) 0xC3, 0x28}; + + @Override + public String clientName() { + return "subscriber"; + } + + @Test + public void givenAPublishWithContentTypeWhenForwardedToSubscriberThenIsPresent() throws InterruptedException { + Mqtt5BlockingClient subscriber = createSubscriberClient(); + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.AT_MOST_ONCE) + .send(); + + Mqtt5BlockingClient publisher = createPublisherClient(); + publisher.publishWith() + .topic("temperature/living") + .payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8)) + .contentType("application/json") + .qos(MqttQos.AT_MOST_ONCE) + .send(); + + verifyPublishMessage(subscriber, msgPub -> { + assertTrue(msgPub.getContentType().isPresent(), "Content-type MUST be present"); + assertEquals("application/json", msgPub.getContentType().get().toString(), "Content-type MUST be untouched"); + }); + } + + @Test + public void givenAPublishWithContentTypeRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException { + Mqtt5BlockingClient publisher = createPublisherClient(); + publisher.publishWith() + .topic("temperature/living") + .payload("{\"max\": 18}".getBytes(StandardCharsets.UTF_8)) + .retain(true) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_LEAST_ONCE) // retained works for QoS > 0 + .send(); + + MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence()); + client.connect(); + MqttSubscription subscription = new MqttSubscription("temperature/living", 1); + SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector(); + IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, + new IMqttMessageListener[] {publishCollector}); + TestUtils.verifySubscribedSuccessfully(subscribeToken); + + // Verify the message is also reflected back to the sender + publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); + assertEquals("temperature/living", publishCollector.receivedTopic()); + assertEquals("{\"max\": 18}", publishCollector.receivedPayload(), "Payload published on topic should match"); + org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage(); + assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos()); + assertTrue(receivedMessage.getProperties().getPayloadFormat()); + } +} diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java new file mode 100644 index 000000000..ff0b7b405 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/PayloadFormatIndicatorTest.java @@ -0,0 +1,183 @@ +/* + * + * Copyright (c) 2012-2024 The original author or authors + * ------------------------------------------------------ + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * and Apache License v2.0 which accompanies this distribution. + * + * The Eclipse Public License is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * The Apache License v2.0 is available at + * http://www.opensource.org/licenses/apache2.0.php + * + * You may elect to redistribute this code under either of these licenses. + * + */ + +package io.moquette.integration.mqtt5; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator; +import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAckReasonCode; +import com.hivemq.client.mqtt.mqtt5.message.publish.pubrec.Mqtt5PubRecReasonCode; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty; +import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType; +import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; +import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader; +import io.netty.handler.codec.mqtt.MqttReasonCodes; +import org.eclipse.paho.mqttv5.client.IMqttMessageListener; +import org.eclipse.paho.mqttv5.client.IMqttToken; +import org.eclipse.paho.mqttv5.client.MqttClient; +import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttSubscription; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class PayloadFormatIndicatorTest extends AbstractServerIntegrationTest { + + // second octet is invalid + public static final byte[] INVALID_UTF_8_BYTES = new byte[]{(byte) 0xC3, 0x28}; + + @Override + public String clientName() { + return "subscriber"; + } + + @Test + public void givenAPublishWithPayloadFormatIndicatorWhenForwardedToSubscriberThenIsPresent() throws InterruptedException { + Mqtt5BlockingClient subscriber = createSubscriberClient(); + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.AT_MOST_ONCE) + .send(); + + Mqtt5BlockingClient publisher = createPublisherClient(); + publisher.publishWith() + .topic("temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_MOST_ONCE) + .send(); + + verifyPublishMessage(subscriber, msgPub -> { + assertTrue(msgPub.getPayloadFormatIndicator().isPresent()); + }); + } + + @Test + public void givenAPublishWithPayloadFormatIndicatorRetainedWhenForwardedToSubscriberThenIsPresent() throws InterruptedException, MqttException { + Mqtt5BlockingClient publisher = createPublisherClient(); + publisher.publishWith() + .topic("temperature/living") + .payload("18".getBytes(StandardCharsets.UTF_8)) + .retain(true) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_LEAST_ONCE) // retained works for QoS > 0 + .send(); + + MqttClient client = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence()); + client.connect(); + MqttSubscription subscription = new MqttSubscription("temperature/living", 1); + SubscriptionOptionsTest.PublishCollector publishCollector = new SubscriptionOptionsTest.PublishCollector(); + IMqttToken subscribeToken = client.subscribe(new MqttSubscription[]{subscription}, + new IMqttMessageListener[] {publishCollector}); + TestUtils.verifySubscribedSuccessfully(subscribeToken); + + // Verify the message is also reflected back to the sender + publishCollector.assertReceivedMessageIn(2, TimeUnit.SECONDS); + assertEquals("temperature/living", publishCollector.receivedTopic()); + assertEquals("18", publishCollector.receivedPayload(), "Payload published on topic should match"); + org.eclipse.paho.mqttv5.common.MqttMessage receivedMessage = publishCollector.receivedMessage(); + assertEquals(MqttQos.AT_LEAST_ONCE.getCode(), receivedMessage.getQos()); + assertTrue(receivedMessage.getProperties().getPayloadFormat()); + } + + @Test + public void givenNotValidUTF8StringInPublishQoS1WhenPayloadFormatIndicatorIsSetThenShouldReturnBadPublishResponse() { + Mqtt5BlockingClient subscriber = createSubscriberClient(); + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + + Mqtt5BlockingClient publisher = createPublisherClient(); + try { + publisher.publishWith() + .topic("temperature/living") + .payload(INVALID_UTF_8_BYTES) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.AT_LEAST_ONCE) + .send(); + fail("Publish with an invalid UTF8 payload and payload format indicator set to UTF8 MUST throw an error"); + } catch (Mqtt5PubAckException pubAckEx) { + assertEquals(Mqtt5PubAckReasonCode.PAYLOAD_FORMAT_INVALID, pubAckEx.getMqttMessage().getReasonCode()); + } + } + + @Test + public void givenNotValidUTF8StringInPublishQoS2WhenPayloadFormatIndicatorIsSetThenShouldReturnBadPublishResponse() { + Mqtt5BlockingClient subscriber = createSubscriberClient(); + subscriber.subscribeWith() + .topicFilter("temperature/living") + .qos(MqttQos.EXACTLY_ONCE) + .send(); + + Mqtt5BlockingClient publisher = createPublisherClient(); + try { + publisher.publishWith() + .topic("temperature/living") + .payload(INVALID_UTF_8_BYTES) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .qos(MqttQos.EXACTLY_ONCE) + .send(); + fail("Publish with an invalid UTF8 payload and payload format indicator set to UTF8 MUST throw an error"); + } catch (Mqtt5PubRecException pubRecEx) { + assertEquals(Mqtt5PubRecReasonCode.PAYLOAD_FORMAT_INVALID, pubRecEx.getMqttMessage().getReasonCode()); + } + } + + @Test + public void givenNotValidUTF8StringInPublishQoS0WhenPayloadFormatIndicatorIsSetThenShouldReturnDisconnectWithBadPublishResponse() throws InterruptedException { + connectLowLevel(); + + MqttProperties props = new MqttProperties(); + IntegerProperty payloadFormatProperty = new IntegerProperty(MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value(), 1); + props.add(payloadFormatProperty); + + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, + false, 0); + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader("temperature/living", 1, props); + MqttPublishMessage publishQoS0 = new MqttPublishMessage(fixedHeader, variableHeader, Unpooled.wrappedBuffer(INVALID_UTF_8_BYTES)); + // in a reasonable amount of time (say 500 ms) it should receive a DISCONNECT + lowLevelClient.publish(publishQoS0, 500, TimeUnit.MILLISECONDS); + + // Verify a DISCONNECT is received with PAYLOAD_FORMAT_INVALID reason code and connection is closed + final MqttMessage receivedMessage = lowLevelClient.lastReceivedMessage(); + assertEquals(MqttMessageType.DISCONNECT, receivedMessage.fixedHeader().messageType()); + MqttReasonCodeAndPropertiesVariableHeader disconnectHeader = (MqttReasonCodeAndPropertiesVariableHeader) receivedMessage.variableHeader(); + assertEquals(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID.byteValue(), disconnectHeader.reasonCode(), + "Expected Disconnect to contain PAYLOAD_FORMAT_INVALID as reason code"); + // Ugly hack, but give the Netty channel some instants to receive the disconnection of the socket + Thread.sleep(100); + assertTrue(lowLevelClient.isConnectionLost(),"After the disconnect message the connection MUST be closed"); + } +} diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index e423d8663..7ca53e603 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -267,6 +267,33 @@ private void doSubscribe(MqttSubscribeMessage subscribeMessage, int timeout, Tim } } + public void publish(MqttPublishMessage publishMessage, int timeout, TimeUnit timeUnit) { + final CountDownLatch publishResponseLatch = new CountDownLatch(1); + this.setCallback(msg -> { + receivedMsg.getAndSet(msg); + LOG.debug("Publish callback invocation, received message {}", msg.fixedHeader().messageType()); + publishResponseLatch.countDown(); + + // clear the callback + setCallback(null); + }); + + LOG.debug("Sending PUBLISH message"); + sendMessage(publishMessage); + LOG.debug("Sent PUBLISH message"); + + boolean notExpired; + try { + notExpired = publishResponseLatch.await(timeout, timeUnit); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting", e); + } + + if (! notExpired) { + throw new RuntimeException("Cannot receive any message after PUBLISH in " + timeout + " " + timeUnit); + } + } + public MqttMessage subscribeWithError(String topic, MqttQoS qos) { final MqttSubscribeMessage subscribeMessage = MqttMessageBuilders.subscribe() .messageId(1)