From ac2a9be0cdce8997fee54349f61f9490b53c7824 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Sat, 20 Apr 2024 11:48:54 +0200 Subject: [PATCH] Avoid decomposition of Publish fields into method params (#827) Updated all PostOffice methods that accepted QoS, topic, retained flag and payload, components of a publish message to move around just the publish message without decomposing it. --- ChangeLog.txt | 3 +- .../io/moquette/broker/MQTTConnection.java | 4 +- .../java/io/moquette/broker/PostOffice.java | 102 +++++++++++------- .../broker/PostOfficePublishTest.java | 18 ++-- .../broker/PostOfficeSubscribeTest.java | 6 +- .../broker/PostOfficeUnsubscribeTest.java | 18 ++-- 6 files changed, 87 insertions(+), 64 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 764c34671..b81845c93 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,5 +1,6 @@ Version 0.18-SNAPSHOT: - [feature] Add Netty native trsansport support on MacOS. Bundle all the native transport module by default (#806) + [refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827) + [feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806) [feature] message expiry interval: (issue #818) - Implements the management of message expiry for retained part. (#819) - Avoid to publish messages that has elapsed its expire property. (#822) diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index 3036f5a6b..0d0bcbd91 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -637,7 +637,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { if (!isBoundToSession()) { return null; } - postOffice.receivedPublishQos0(topic, username, clientId, msg, expiry); + postOffice.receivedPublishQos0(username, clientId, msg, expiry); return null; }).ifFailed(msg::release); case AT_LEAST_ONCE: @@ -645,7 +645,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) { checkMatchSessionLoop(clientId); if (!isBoundToSession()) return null; - postOffice.receivedPublishQos1(this, topic, username, messageID, msg, expiry); + postOffice.receivedPublishQos1(this, username, messageID, msg, expiry); return null; }).ifFailed(msg::release); case EXACTLY_ONCE: { diff --git a/broker/src/main/java/io/moquette/broker/PostOffice.java b/broker/src/main/java/io/moquette/broker/PostOffice.java index 421a2b9c9..b7a4b592e 100644 --- a/broker/src/main/java/io/moquette/broker/PostOffice.java +++ b/broker/src/main/java/io/moquette/broker/PostOffice.java @@ -27,6 +27,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttFixedHeader; +import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType; @@ -61,7 +62,6 @@ import static io.moquette.broker.Utils.messageId; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; -import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; @@ -312,8 +312,14 @@ private void trackWillSpecificationForFutureFire(Session bindedSession, ISession private void publishWill(ISessionsRepository.Will will) { final Instant messageExpiryInstant = willMessageExpiry(will); - publish2Subscribers(WILL_PUBLISHER, Unpooled.copiedBuffer(will.payload), new Topic(will.topic), - will.qos, will.retained, messageExpiryInstant); + MqttPublishMessage willPublishMessage = MqttMessageBuilders.publish() + .topicName(will.topic) + .retained(will.retained) + .qos(will.qos) + .payload(Unpooled.copiedBuffer(will.payload)) + .build(); + + publish2Subscribers(WILL_PUBLISHER, messageExpiryInstant, willPublishMessage); } private static Instant willMessageExpiry(ISessionsRepository.Will will) { @@ -526,7 +532,7 @@ private void publishRetainedMessagesForSubscriptions(String clientID, Collection LOG.info("No retained messages matching topic filter {}", topicFilter); continue; } - MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(subscription); + MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(subscription, Collections.emptyList()); for (RetainedMessage retainedMsg : retainedMsgs) { final MqttQoS retainedQos = retainedMsg.qosLevel(); MqttQoS qos = lowerQosToTheSubscriptionDesired(subscription, retainedQos); @@ -612,14 +618,15 @@ public void unsubscribe(List topics, MQTTConnection mqttConnection, int mqttConnection.sendUnsubAckMessage(topics, clientID, messageId); } - CompletableFuture receivedPublishQos0(Topic topic, String username, String clientID, MqttPublishMessage msg, + CompletableFuture receivedPublishQos0(String username, String clientID, MqttPublishMessage msg, Instant messageExpiry) { + final Topic topic = new Topic(msg.variableHeader().topicName()); if (!authorizator.canWrite(topic, username, clientID)) { LOG.error("client is not authorized to publish on topic: {}", topic); ReferenceCountUtil.release(msg); return CompletableFuture.completedFuture(null); } - final RoutingResults publishResult = publish2Subscribers(clientID, msg.payload(), topic, AT_MOST_ONCE, msg.fixedHeader().isRetain(), Instant.MAX); + final RoutingResults publishResult = publish2Subscribers(clientID, messageExpiry, msg); if (publishResult.isAllFailed()) { LOG.info("No one publish was successfully enqueued to session loops"); ReferenceCountUtil.release(msg); @@ -637,9 +644,10 @@ CompletableFuture receivedPublishQos0(Topic topic, String username, String }); } - RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, String username, int messageID, + RoutingResults receivedPublishQos1(MQTTConnection connection, String username, int messageID, MqttPublishMessage msg, Instant messageExpiry) { // verify if topic can be written + final Topic topic = new Topic(msg.variableHeader().topicName()); topic.getTokens(); if (!topic.isValid()) { LOG.warn("Invalid topic format, force close the connection"); @@ -654,14 +662,12 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin return RoutingResults.preroutingError(); } - ByteBuf payload = msg.payload(); - boolean retainPublish = msg.fixedHeader().isRetain(); final RoutingResults routes; if (msg.fixedHeader().isDup()) { final Set failedClients = failedPublishes.listFailed(clientId, messageID); - routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, failedClients, retainPublish, messageExpiry); + routes = publish2Subscribers(clientId, failedClients, messageExpiry, msg); } else { - routes = publish2Subscribers(clientId, payload, topic, AT_LEAST_ONCE, retainPublish, messageExpiry); + routes = publish2Subscribers(clientId, messageExpiry, msg); } if (LOG.isTraceEnabled()) { LOG.trace("subscriber routes: {}", routes); @@ -684,7 +690,7 @@ RoutingResults receivedPublishQos1(MQTTConnection connection, Topic topic, Strin } private void manageRetain(Topic topic, MqttPublishMessage msg) { - if (msg.fixedHeader().isRetain()) { + if (isRetained(msg)) { if (!msg.payload().isReadable()) { retainedRepository.cleanRetained(topic); // clean also the tracker @@ -715,9 +721,10 @@ private static int getIntProperty(MqttProperties props, MqttPropertyType prop) { return mqttProperty.value(); } - private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, - MqttQoS publishingQos, boolean isPublishRetained, Instant messageExpiry) { - return publish2Subscribers(publisherClientId, payload, topic, publishingQos, NO_FILTER, isPublishRetained, messageExpiry); + private RoutingResults publish2Subscribers(String publisherClientId, + Instant messageExpiry, + MqttPublishMessage msg) { + return publish2Subscribers(publisherClientId, NO_FILTER, messageExpiry, msg); } private class BatchingPublishesCollector { @@ -783,8 +790,12 @@ public int countBatches() { } } - private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf payload, Topic topic, MqttQoS publishingQos, - Set filterTargetClients, boolean retainPublish, Instant messageExpiry) { + private RoutingResults publish2Subscribers(String publisherClientId, + Set filterTargetClients, Instant messageExpiry, + MqttPublishMessage msg) { + final boolean retainPublish = msg.fixedHeader().isRetain(); + final Topic topic = new Topic(msg.variableHeader().topicName()); + final MqttQoS publishingQos = msg.fixedHeader().qosLevel(); List topicMatchingSubscriptions = subscriptions.matchQosSharpening(topic); if (topicMatchingSubscriptions.isEmpty()) { // no matching subscriptions, clean exit @@ -815,11 +826,11 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null)); } - payload.retain(subscriptionCount); + msg.retain(subscriptionCount); List publishResults = collector.routeBatchedPublishes((batch) -> { - publishToSession(payload, topic, batch, publishingQos, retainPublish, messageExpiry); - payload.release(); + publishToSession(topic, batch, publishingQos, retainPublish, messageExpiry, msg); + msg.release(); }); final CompletableFuture[] publishFutures = publishResults.stream() @@ -833,7 +844,7 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay Collection subscibersIds = collector.subscriberIdsByEventLoop(rr.clientId); if (rr.status == RouteResult.Status.FAIL) { failedRoutings.addAll(subscibersIds); - payload.release(); + msg.release(); } else { successedRoutings.addAll(subscibersIds); } @@ -841,28 +852,30 @@ private RoutingResults publish2Subscribers(String publisherClientId, ByteBuf pay return new RoutingResults(successedRoutings, failedRoutings, publishes); } - private void publishToSession(ByteBuf payload, Topic topic, Collection subscriptions, - MqttQoS publishingQos, boolean retainPublish, Instant messageExpiry) { - ByteBuf duplicatedPayload = payload.duplicate(); + private void publishToSession(Topic topic, Collection subscriptions, + MqttQoS publishingQos, boolean retainPublish, Instant messageExpiry, MqttPublishMessage msg) { + ByteBuf duplicatedPayload = msg.payload().duplicate(); for (Subscription sub : subscriptions) { MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos); boolean retained = false; if (sub.option().isRetainAsPublished()) { retained = retainPublish; } - publishToSession(duplicatedPayload, topic, sub, qos, retained, messageExpiry); + publishToSession(duplicatedPayload, topic, sub, qos, retained, messageExpiry, msg); } } private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, MqttQoS qos, boolean retained, - Instant messageExpiry) { + Instant messageExpiry, MqttPublishMessage msg) { Session targetSession = this.sessionRegistry.retrieve(sub.getClientId()); boolean isSessionPresent = targetSession != null; if (isSessionPresent) { LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", sub.getClientId(), sub.getTopicFilter(), qos); - final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub); + + Collection existingProperties = msg.variableHeader().properties().listAll(); + final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub, existingProperties); final SessionRegistry.PublishedMessage publishedMessage = new SessionRegistry.PublishedMessage(topic, qos, payload, retained, messageExpiry, properties); targetSession.sendPublishOnSessionAtQos(publishedMessage); @@ -874,15 +887,23 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq } } - private MqttProperties.MqttProperty[] prepareSubscriptionProperties(Subscription sub) { - MqttProperties.MqttProperty[] properties; + private MqttProperties.MqttProperty[] prepareSubscriptionProperties(Subscription sub, + Collection existingProperties) { + + // copy all properties except SubscriptionId + Collection properties = new ArrayList<>(existingProperties.size() + 1); + for (MqttProperties.MqttProperty property : existingProperties) { + // skip SUBSCRIPTION_IDENTIFIER because could be added by the subscription + if (property.propertyId() != MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value()) { + properties.add(property); + } + } if (sub.hasSubscriptionIdentifier()) { MqttProperties.IntegerProperty subscriptionId = createSubscriptionIdProperty(sub); - properties = new MqttProperties.MqttProperty[] { subscriptionId }; - } else { - properties = new MqttProperties.MqttProperty[0]; + properties.add(subscriptionId); } - return properties; + + return properties.toArray(new MqttProperties.MqttProperty[0]); } private MqttProperties.IntegerProperty createSubscriptionIdProperty(Subscription sub) { @@ -899,7 +920,6 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage Instant messageExpiry) { LOG.trace("Processing PUB QoS2 message on connection: {}", connection); final Topic topic = new Topic(msg.variableHeader().topicName()); - final ByteBuf payload = msg.payload(); final String clientId = connection.getClientId(); if (!authorizator.canWrite(topic, username, clientId)) { @@ -910,13 +930,12 @@ RoutingResults receivedPublishQos2(MQTTConnection connection, MqttPublishMessage } final int messageID = msg.variableHeader().packetId(); - boolean retainPublish = msg.fixedHeader().isRetain(); final RoutingResults publishRoutings; if (msg.fixedHeader().isDup()) { final Set failedClients = failedPublishes.listFailed(clientId, messageID); - publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, failedClients, retainPublish, messageExpiry); + publishRoutings = publish2Subscribers(clientId, failedClients, messageExpiry, msg); } else { - publishRoutings = publish2Subscribers(clientId, payload, topic, EXACTLY_ONCE, retainPublish, messageExpiry); + publishRoutings = publish2Subscribers(clientId, messageExpiry, msg); } if (publishRoutings.isAllSuccess()) { // QoS2 PUB message was enqueued successfully to every event loop @@ -958,11 +977,10 @@ public RoutingResults internalPublish(MqttPublishMessage msg) { final ByteBuf payload = msg.payload(); LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qos); - boolean retainPublish = msg.fixedHeader().isRetain(); - final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, payload, topic, qos, retainPublish, Instant.MAX); + final RoutingResults publishResult = publish2Subscribers(INTERNAL_PUBLISHER, Instant.MAX, msg); LOG.trace("after routed publishes: {}", publishResult); - if (!retainPublish) { + if (!isRetained(msg)) { return publishResult; } if (qos == AT_MOST_ONCE || payload.readableBytes() == 0) { @@ -974,6 +992,10 @@ public RoutingResults internalPublish(MqttPublishMessage msg) { return publishResult; } + private static boolean isRetained(MqttPublishMessage msg) { + return msg.fixedHeader().isRetain(); + } + /** * notify MqttConnectMessage after connection established (already pass login). * @param msg diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 9afa55a07..d6942ee5c 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -124,7 +124,7 @@ public void testPublishQoS0ToItself() throws ExecutionException, InterruptedExce // Exercise final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, + sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -226,7 +226,7 @@ public void testPublishToMultipleSubscribers() throws ExecutionException, Interr // Exercise final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, + sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -253,7 +253,7 @@ public void testPublishWithEmptyPayloadClearRetainedStore() throws ExecutionExce // Exercise final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, FAKE_CLIENT_ID, + sut.receivedPublishQos0(TEST_USER, FAKE_CLIENT_ID, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_MOST_ONCE) @@ -278,7 +278,7 @@ public void testPublishWithQoS1() throws ExecutionException, InterruptedExceptio // Exercise final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(senderConnection, new Topic(NEWS_TOPIC), TEST_USER, 1, + sut.receivedPublishQos1(senderConnection, TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_LEAST_ONCE) @@ -327,7 +327,7 @@ public void forwardQoS1PublishesWhenNotCleanSessionReconnects() throws Execution ConnectionTestUtils.assertConnectAccepted(pubChannel); final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1, + sut.receivedPublishQos1(pubConn, TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload.retainedDuplicate()) .qos(MqttQoS.AT_LEAST_ONCE) @@ -366,7 +366,7 @@ public void checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession( ConnectionTestUtils.assertConnectAccepted(pubChannel); final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1, + sut.receivedPublishQos1(pubConn, TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload.retainedDuplicate()) .qos(MqttQoS.AT_LEAST_ONCE) @@ -392,7 +392,7 @@ public void noPublishToInactiveSession() throws ExecutionException, InterruptedE ConnectionTestUtils.assertConnectAccepted(pubChannel); final ByteBuf anyPayload = Unpooled.copiedBuffer("Any payload", Charset.defaultCharset()); - sut.receivedPublishQos1(pubConn, new Topic(NEWS_TOPIC), TEST_USER, 1, + sut.receivedPublishQos1(pubConn, TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_LEAST_ONCE) @@ -424,7 +424,7 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() thr .retained(true) .topicName(NEWS_TOPIC) .build(); - sut.receivedPublishQos1(senderConnection, new Topic(NEWS_TOPIC), TEST_USER, 1, + sut.receivedPublishQos1(senderConnection, TEST_USER, 1, publishMsg, Instant.MAX); assertMessageIsRetained(NEWS_TOPIC, anyPayload); @@ -432,7 +432,7 @@ public void cleanRetainedMessageStoreWhenPublishWithRetainedQos0IsReceived() thr // publish a QoS0 retained message // Exercise final ByteBuf qos0Payload = Unpooled.copiedBuffer("QoS0 payload", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, connection.getClientId(), + sut.receivedPublishQos0(TEST_USER, connection.getClientId(), MqttMessageBuilders.publish() .payload(qos0Payload) .qos(MqttQoS.AT_MOST_ONCE) diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index 8b629c96c..aa34a7eb4 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -255,7 +255,7 @@ public void testCleanSession_maintainClientSubscriptions() throws ExecutionExcep assertEquals(1, subscriptions.size(), "After a reconnect, subscription MUST be still present"); final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -300,7 +300,7 @@ public void testCleanSession_correctlyClientSubscriptions() throws ExecutionExce // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload) .qos(MqttQoS.AT_MOST_ONCE) @@ -322,7 +322,7 @@ public void testReceiveRetainedPublishRespectingSubscriptionQoSAndNotPublisher() .qos(MqttQoS.AT_LEAST_ONCE) .retained(true) .topicName(NEWS_TOPIC).build(); - sut.receivedPublishQos1(connection, new Topic(NEWS_TOPIC), TEST_USER, 1, + sut.receivedPublishQos1(connection, TEST_USER, 1, retainedPubQoS1Msg, Instant.MAX); // subscriber connects subscribe to topic /news and receive the last retained message diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index b8c809148..612899702 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -152,7 +152,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -165,7 +165,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2) .qos(MqttQoS.AT_MOST_ONCE) @@ -190,7 +190,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -210,7 +210,7 @@ public void testDontNotifyClientSubscribedToTopicAfterDisconnectedAndReconnectOn // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2) .qos(MqttQoS.AT_MOST_ONCE) @@ -257,7 +257,7 @@ private void connectPublishDisconnectFromAnotherClient(String firstPayload, Stri // publish from another channel final ByteBuf anyPayload = Unpooled.copiedBuffer(firstPayload, Charset.defaultCharset()); - sut.receivedPublishQos1(anotherConnection, new Topic(topic), TEST_USER, 1, + sut.receivedPublishQos1(anotherConnection, TEST_USER, 1, MqttMessageBuilders.publish() .payload(anyPayload) .qos(MqttQoS.AT_LEAST_ONCE) @@ -288,7 +288,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() t subscribe(connection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload = Unpooled.copiedBuffer("Hello world!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -308,7 +308,7 @@ public void testConnectSubPub_cycle_getTimeout_on_second_disconnect_issue142() t subscribe(subscriberConnection, NEWS_TOPIC, AT_MOST_ONCE); // publish on /news final ByteBuf payload2 = Unpooled.copiedBuffer("Hello world2!", Charset.defaultCharset()); - sut.receivedPublishQos0(new Topic(NEWS_TOPIC), TEST_USER, TEST_PWD, + sut.receivedPublishQos0(TEST_USER, TEST_PWD, MqttMessageBuilders.publish() .payload(payload2.retainedDuplicate()) .qos(MqttQoS.AT_MOST_ONCE) @@ -346,12 +346,12 @@ public void checkReplayofStoredPublishResumeAfter_a_disconnect_cleanSessionFalse private void publishQos1(MQTTConnection publisher, String topic, String payload, int messageID) { final ByteBuf bytePayload = Unpooled.copiedBuffer(payload, Charset.defaultCharset()); try { - sut.receivedPublishQos1(publisher, new Topic(topic), TEST_USER, messageID, + sut.receivedPublishQos1(publisher, TEST_USER, messageID, MqttMessageBuilders.publish() .payload(bytePayload) .qos(MqttQoS.AT_LEAST_ONCE) .retained(false) - .topicName(NEWS_TOPIC).build(), Instant.MAX).completableFuture().get(5, TimeUnit.SECONDS); + .topicName(topic).build(), Instant.MAX).completableFuture().get(5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new RuntimeException(e); }