Skip to content

Commit

Permalink
Make configurable the maximum qos that a server can handle (#811)
Browse files Browse the repository at this point in the history
Updates the PostOffice when process the subscribe message to cap the QoS that the server would handle.
  • Loading branch information
andsel authored Jan 22, 2024
1 parent 1cc5db6 commit 0ac5221
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 5 deletions.
2 changes: 1 addition & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Version 0.18-SNAPSHOT:
[feature] subscription option handling: (issue #808)
- Move from qos to subscription option implementing the persistence of SubscriptionOption to/from storage. (#810)

- Exposed the maximum granted QoS by the server with the config setting 'max_server_granted_qos'. (#811)
[feature] subscription identifiers: (issue #801)
- Implements the validation of subscription identifier properties in SUBSCRIBE. (#803)
- Store and retrieve the subscription identifier into the subscriptions directory. (#804)
Expand Down
51 changes: 48 additions & 3 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,38 @@
package io.moquette.broker;

import io.moquette.broker.scheduler.ScheduledExpirationService;
import io.moquette.broker.subscriptions.*;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.SubscriptionIdentifier;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.interception.BrokerInterceptor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Clock;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -182,6 +203,7 @@ public RouteResult ifFailed(Runnable action) {
private final SessionEventLoopGroup sessionLoops;
private final Clock clock;
private final ScheduledExpirationService<ISessionsRepository.Will> willExpirationService;
private final MqttQoS maxServerGrantedQos;

/**
* Used only in tests
Expand All @@ -196,6 +218,14 @@ public RouteResult ifFailed(Runnable action) {
SessionRegistry sessionRegistry, ISessionsRepository sessionRepository, BrokerInterceptor interceptor,
Authorizator authorizator,
SessionEventLoopGroup sessionLoops, Clock clock) {
this(subscriptions, retainedRepository, sessionRegistry, sessionRepository, interceptor, authorizator,
sessionLoops, clock, EXACTLY_ONCE);
}

PostOffice(ISubscriptionsDirectory subscriptions, IRetainedRepository retainedRepository,
SessionRegistry sessionRegistry, ISessionsRepository sessionRepository, BrokerInterceptor interceptor,
Authorizator authorizator,
SessionEventLoopGroup sessionLoops, Clock clock, MqttQoS maxServerGrantedQos) {
this.authorizator = authorizator;
this.subscriptions = subscriptions;
this.retainedRepository = retainedRepository;
Expand All @@ -204,6 +234,7 @@ public RouteResult ifFailed(Runnable action) {
this.interceptor = interceptor;
this.sessionLoops = sessionLoops;
this.clock = clock;
this.maxServerGrantedQos = maxServerGrantedQos;

this.willExpirationService = new ScheduledExpirationService<>(clock, this::publishWill);
recreateWillExpires(sessionRepository);
Expand Down Expand Up @@ -319,6 +350,7 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
} else {
ackTopics = authorizator.verifyTopicsReadAccess(clientID, username, msg);
}
ackTopics = updateWithMaximumSupportedQoS(ackTopics);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);

// store topics of non-shared subscriptions in session
Expand Down Expand Up @@ -366,6 +398,19 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}

private List<MqttTopicSubscription> updateWithMaximumSupportedQoS(List<MqttTopicSubscription> subscriptions) {
return subscriptions.stream()
.map(s -> new MqttTopicSubscription(s.topicName(), minQos(s.qualityOfService(), maxServerGrantedQos)))
.collect(Collectors.toList());
}

private static MqttQoS minQos(MqttQoS q1, MqttQoS q2) {
if (q1 == FAILURE || q2 == FAILURE) {
return FAILURE;
}
return q1.value() < q2.value() ? q1 : q2;
}

private static Optional<SubscriptionIdentifier> verifyAndExtractMessageIdentifier(MqttSubscribeMessage msg) {
final List<MqttProperties.MqttProperty<Integer>> subscriptionIdentifierProperties =
(List<MqttProperties.MqttProperty<Integer>>) msg.idAndPropertiesVariableHeader().properties()
Expand Down
31 changes: 30 additions & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.persistence.SegmentQueueRepository;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,6 +69,7 @@

import static io.moquette.broker.Session.INFINITE_EXPIRY;
import static io.moquette.logging.LoggingUtils.getInterceptorIds;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;

public class Server {

Expand Down Expand Up @@ -256,8 +258,10 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
final SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(interceptor, sessionQueueSize);
sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator, scheduler,
clock, globalSessionExpiry, loopsGroup);

final MqttQoS serverGrantedQoS = parseMaxGrantedQoS(config);
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, sessionsRepository, interceptor,
authorizator, loopsGroup, clock);
authorizator, loopsGroup, clock, serverGrantedQoS);
final BrokerConfiguration brokerConfig = new BrokerConfiguration(config);
MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions,
dispatcher);
Expand All @@ -276,6 +280,31 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
initialized = true;
}

private static MqttQoS parseMaxGrantedQoS(IConfig config) {
final String qosValue = config.getProperty(IConfig.MAX_SERVER_GRANTED_QOS_PROPERTY_NAME, "2");
try {
int qosIntValue = Integer.parseInt(qosValue);
if (qosIntValue < 0 || qosIntValue > 2) {
LOG.warn("Error parsing max_server_granted_qos int value, found {} but should be [0..2]", qosIntValue);
throw new IllegalArgumentException("QoS must in range [0..2] but was " + qosIntValue);
}
return MqttQoS.valueOf(qosIntValue);
} catch (NumberFormatException ex) {
// try to parse the string form
if (MqttQoS.AT_MOST_ONCE.toString().equalsIgnoreCase(qosValue)) {
return MqttQoS.AT_MOST_ONCE;
}
if (MqttQoS.AT_LEAST_ONCE.toString().equalsIgnoreCase(qosValue)) {
return MqttQoS.AT_LEAST_ONCE;
}
if (MqttQoS.EXACTLY_ONCE.toString().equalsIgnoreCase(qosValue)) {
return MqttQoS.EXACTLY_ONCE;
}
LOG.warn("Error parsing max_server_granted_qos string value, found {} but should be on of 'at_most_once', 'at_least_once', 'exactly_once'", qosValue);
throw new IllegalArgumentException("QoS must be one of 'at_most_once', 'at_least_once', 'exactly_once' but was " + qosValue);
}
}

private static IQueueRepository initQueuesRepository(IConfig config, Path dataPath, H2Builder h2Builder) throws IOException {
final IQueueRepository queueRepository;
final String queueType = config.getProperty(IConfig.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public abstract class IConfig {
public static final String KEY_STORE_PASSWORD_PROPERTY_NAME = "key_store_password";
public static final String KEY_MANAGER_PASSWORD_PROPERTY_NAME = "key_manager_password";
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = "netty.mqtt.message_size";
public static final String MAX_SERVER_GRANTED_QOS_PROPERTY_NAME = "max_server_granted_qos";
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = 8092;

public abstract void setProperty(String name, String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public List<Subscription> matchWithoutQosSharpening(Topic topicName) {
public List<Subscription> matchQosSharpening(Topic topicName) {
final List<Subscription> subscriptions = matchWithoutQosSharpening(topicName);

// for each session select the subscription with higher QoS
return selectSubscriptionsWithHigherQoSForEachSession(subscriptions);
}

private static List<Subscription> selectSubscriptionsWithHigherQoSForEachSession(List<Subscription> subscriptions) {
// for each session select the subscription with higher QoS
Map<String, Subscription> subsGroupedByClient = new HashMap<>();
for (Subscription sub : subscriptions) {
Expand Down
1 change: 1 addition & 0 deletions broker/src/test/resources/config/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ allow_anonymous true

reauthorize_subscriptions_on_connect false
telemetry_enabled false
max_server_granted_qos exactly_once
10 changes: 10 additions & 0 deletions distribution/src/main/resources/moquette.conf
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,13 @@ password_file config/password_file.conf
#*********************************************************************
# persistent_client_expiration 3d

#*********************************************************************
# Maximum QoS that the server can grant (byu default it's QoS2)
#
# max_server_granted_qos:
# This option permit to customize the QoS that the broker can grant. By default it's QoS2 (exactly_once)
# but in certain circumstances could be limited by the server itself.
# Possible values are 0, 1, 2 or the extended string versions at_most_once, at_least_once, exactly_once.
# default: 2 (exactly_once)
#*********************************************************************
# max_server_granted_qos 2

0 comments on commit 0ac5221

Please sign in to comment.