Skip to content

Commit

Permalink
Implement subscription retain policy handling (#816)
Browse files Browse the repository at this point in the history
Modify SubscriptionDirectory's add methods to return the indication of the fact that the subscription was freshly created.
Updates forwarding of retained messages during subscription processing to apply the three retained policies.
  • Loading branch information
andsel committed Feb 11, 2024
1 parent 0b92f0f commit eff06d8
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 19 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Version 0.18-SNAPSHOT:
- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
- Implements handling of noLocal subscription option on MQTT5 connections. (#814)
- Implements subscription options retain as published feature. (#815)
- Implements subscription options retains handling policies. (#816)
[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
Expand Down
24 changes: 20 additions & 4 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
Expand Down Expand Up @@ -370,12 +371,26 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}).collect(Collectors.toList());

final Set<Subscription> subscriptionToSendRetained = new HashSet<>();
for (Subscription subscription : newSubscriptions) {
boolean newlyAdded;
MqttSubscriptionOption subOptions = subscription.option();
if (subscriptionIdOpt.isPresent()) {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option(),
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions,
subscriptionIdOpt.get());
} else {
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option());
newlyAdded = subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subOptions);
}

switch (subOptions.retainHandling()) {
case SEND_AT_SUBSCRIBE:
subscriptionToSendRetained.add(subscription);
break;
case SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS:
if (newlyAdded) {
subscriptionToSendRetained.add(subscription);
}
break;
}
}

Expand All @@ -394,7 +409,8 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
// send ack message
mqttConnection.sendSubAckMessage(messageID, ackMessage);

publishRetainedMessagesForSubscriptions(clientID, newSubscriptions);
// shared subscriptions doesn't receive retained messages
publishRetainedMessagesForSubscriptions(clientID, subscriptionToSendRetained);

for (Subscription subscription : newSubscriptions) {
interceptor.notifyTopicSubscribed(subscription, username);
Expand Down Expand Up @@ -445,7 +461,7 @@ private static Optional<SubscriptionIdentifier> verifyAndExtractMessageIdentifie
}
}

private void publishRetainedMessagesForSubscriptions(String clientID, List<Subscription> newSubscriptions) {
private void publishRetainedMessagesForSubscriptions(String clientID, Collection<Subscription> newSubscriptions) {
Session targetSession = this.sessionRegistry.retrieve(clientID);
for (Subscription subscription : newSubscriptions) {
final String topicFilter = subscription.getTopicFilter().toString();
Expand Down
11 changes: 8 additions & 3 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ interface IVisitor<T> {
private static final INode NO_PARENT = null;

private enum Action {
OK, REPEAT
OK, REPEAT,
OK_NEW // used to indicate that the action was successful and the subscription created a new branch
}

INode root;
Expand Down Expand Up @@ -270,11 +271,15 @@ private List<Subscription> recursiveMatch(Topic topicName, INode inode, int dept
return subscriptions;
}

public void addToTree(SubscriptionRequest request) {
/**
* @return true if the subscription didn't exist.
* */
public boolean addToTree(SubscriptionRequest request) {
Action res;
do {
res = insert(request.getTopicFilter(), this.root, request);
} while (res == Action.REPEAT);
return res == Action.OK_NEW;
}

private Action insert(Topic topic, final INode inode, SubscriptionRequest request) {
Expand Down Expand Up @@ -315,7 +320,7 @@ private Action createNodeAndInsertSubscription(Topic topic, INode inode, CNode c
}
updatedCnode.add(newInode);

return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK_NEW : Action.REPEAT;
}

private INode createPathRec(Topic topic, SubscriptionRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,21 @@ private static List<Subscription> selectSubscriptionsWithHigherQoSForEachSession
}

@Override
public void add(String clientId, Topic filter, MqttSubscriptionOption option) {
public boolean add(String clientId, Topic filter, MqttSubscriptionOption option) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option);
addNonSharedSubscriptionRequest(subRequest);
return addNonSharedSubscriptionRequest(subRequest);
}

@Override
public void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
public boolean add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId) {
SubscriptionRequest subRequest = SubscriptionRequest.buildNonShared(clientId, filter, option, subscriptionId);
addNonSharedSubscriptionRequest(subRequest);
return addNonSharedSubscriptionRequest(subRequest);
}

private void addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) {
ctrie.addToTree(subRequest);
private boolean addNonSharedSubscriptionRequest(SubscriptionRequest subRequest) {
boolean notExistingSubscription = ctrie.addToTree(subRequest);
subscriptionsRepository.addNewSubscription(subRequest.subscription());
return notExistingSubscription;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
package io.moquette.broker.subscriptions;

import io.moquette.broker.ISubscriptionsRepository;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;

import java.util.List;
import java.util.Set;

/**
* Contains all topic filters that are used to match against topic names.
Expand All @@ -33,9 +31,9 @@ public interface ISubscriptionsDirectory {

List<Subscription> matchQosSharpening(Topic topic);

void add(String clientId, Topic filter, MqttSubscriptionOption option);
boolean add(String clientId, Topic filter, MqttSubscriptionOption option);

void add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId);
boolean add(String clientId, Topic filter, MqttSubscriptionOption option, SubscriptionIdentifier subscriptionId);

void addShared(String clientId, ShareName name, Topic topicFilter, MqttSubscriptionOption option);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ public void testLookup() {
@Test
public void testAddNewSubscriptionOnExistingNode() {
final SubscriptionRequest existingSubscription = clientSubOnTopic("TempSensor1", "/temp");
sut.addToTree(existingSubscription);
assertTrue(sut.addToTree(existingSubscription), "First created subscription on topic filter MUST return true");

//Exercise
final SubscriptionRequest newSubscription = clientSubOnTopic("TempSensor2", "/temp");
sut.addToTree(newSubscription);
assertFalse(sut.addToTree(newSubscription), "Not new created subscription on topic filter MUST return false");

//Verify
final Optional<CNode> matchedNode = sut.lookup(asTopic("/temp"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5RetainHandling;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode;
import org.eclipse.paho.mqttv5.client.IMqttMessageListener;
import org.eclipse.paho.mqttv5.client.IMqttToken;
Expand Down Expand Up @@ -243,6 +244,89 @@ public void givenSubscriptionWithRetainAsPublishedUnsetThenRetainedFlagIsUnsetOn
assertFalse(publishCollector.receivedMessage.isRetained());
}

@Test
public void givenFirstSubscriptionWithRetainPolicyToSendAtSubscribeIfNotYetExistsAndARetainedMessagedExistsThenPublishIsReceived() throws Exception {
Mqtt5BlockingClient publisher = createPublisherClient();
//publish a retained message
publisher.publishWith()
.topic("metric/temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.retain(true)
.qos(MqttQos.AT_LEAST_ONCE)
.send();

// receive retained only if new subscription
PublishCollector publishCollector = new PublishCollector();
createClientWithRetainPolicy(publishCollector, Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST.getCode());

// verify retain flag is respected
publishCollector.assertReceivedMessageIn(1, TimeUnit.SECONDS);
verifyTopicPayloadAndQoSAsExpected(publishCollector);
}

@Test
public void givenNonFirstSubscriptionWithRetainPolicyToSendAtSubscribeIfAlreadyExistsAndARetainedMessagedExistsThenPublishIsNotReceived() throws Exception {
Mqtt5BlockingClient publisher = createPublisherClient();
//publish a retained message
publisher.publishWith()
.topic("metric/temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.retain(true)
.qos(MqttQos.AT_LEAST_ONCE)
.send();

// create first subscriber and subscribe to the topic
final PublishCollector unusedCollector = new PublishCollector();
createSubscriberClient(unusedCollector, "firstSubscriber");

// create second subscriber to same topic with RetainPolicy to SendAtSubscribeIfAlreadyExists
PublishCollector publishCollector = new PublishCollector();
createClientWithRetainPolicy(publishCollector, Mqtt5RetainHandling.SEND_IF_SUBSCRIPTION_DOES_NOT_EXIST.getCode());

// verify no retained message is received
publishCollector.assertNotReceivedMessageIn(2, TimeUnit.SECONDS);
}

@Test
public void givenSubscriptionWithRetainPolicyToDoNotSendAndARetainedMessagedExistsThenPublishIsNotReceived() throws Exception {
Mqtt5BlockingClient publisher = createPublisherClient();
//publish a retained message
publisher.publishWith()
.topic("metric/temperature/living")
.payload("18".getBytes(StandardCharsets.UTF_8))
.retain(true)
.qos(MqttQos.AT_LEAST_ONCE)
.send();

// subscriber subscribe to same topic matching the retained but with DO_NOT_SEND policy
PublishCollector publishCollector = new PublishCollector();
createClientWithRetainPolicy(publishCollector, Mqtt5RetainHandling.DO_NOT_SEND.getCode());

// verify no retained message is received
publishCollector.assertNotReceivedMessageIn(1, TimeUnit.SECONDS);
}

private static void createSubscriberClient(PublishCollector publishCollector, String clientId) throws MqttException {
MqttClient subscriber = new MqttClient("tcp://localhost:1883", clientId, new MemoryPersistence());
subscriber.connect();
MqttSubscription subscription = new MqttSubscription("metric/temperature/living", MqttQos.AT_LEAST_ONCE.getCode());

IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
verifySubscribedSuccessfully(subscribeToken);
}

private static void createClientWithRetainPolicy(PublishCollector publishCollector, int retainPolicy) throws MqttException {
MqttClient subscriber = new MqttClient("tcp://localhost:1883", "subscriber", new MemoryPersistence());
subscriber.connect();
MqttSubscription subscription = new MqttSubscription("metric/temperature/living", MqttQos.AT_LEAST_ONCE.getCode());
subscription.setRetainHandling(retainPolicy);

IMqttToken subscribeToken = subscriber.subscribe(new MqttSubscription[]{subscription},
new IMqttMessageListener[] {publishCollector});
verifySubscribedSuccessfully(subscribeToken);
}

private static MqttClient createSubscriberClientWithRetainAsPublished(PublishCollector publishCollector, String topic) throws MqttException {
return createSubscriberClient(publishCollector, topic, true);
}
Expand Down

0 comments on commit eff06d8

Please sign in to comment.