From b36af550627cf541e5236ad829cff2771cb96920 Mon Sep 17 00:00:00 2001 From: Kazetsukai Date: Mon, 1 May 2023 12:33:33 -0400 Subject: [PATCH] Connection filter interface for IP Bans --- .../java/io/moquette/BrokerConstants.java | 2 ++ .../io/moquette/broker/MQTTConnection.java | 26 ++++++++++++++- .../broker/MQTTConnectionFactory.java | 7 ++-- .../main/java/io/moquette/broker/Server.java | 19 +++++++++-- .../broker/security/IConnectionFilter.java | 7 ++++ .../broker/MQTTConnectionConnectTest.java | 20 ++++++++++- .../broker/MQTTConnectionPublishTest.java | 4 ++- .../moquette/broker/MockConnectionFilter.java | 33 +++++++++++++++++++ .../broker/PostOfficeInternalPublishTest.java | 3 +- .../broker/PostOfficePublishTest.java | 3 +- .../broker/PostOfficeSubscribeTest.java | 3 +- .../broker/PostOfficeUnsubscribeTest.java | 3 +- .../moquette/broker/SessionRegistryTest.java | 3 +- .../java/io/moquette/broker/SessionTest.java | 3 +- ...nPahoCanPublishOnReadBlockedTopicTest.java | 2 +- 15 files changed, 122 insertions(+), 16 deletions(-) create mode 100644 broker/src/main/java/io/moquette/broker/security/IConnectionFilter.java create mode 100644 broker/src/test/java/io/moquette/broker/MockConnectionFilter.java diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index d3e926d00..66b2e1b06 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -66,6 +66,8 @@ public final class BrokerConstants { public static final String REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT = "reauthorize_subscriptions_on_connect"; public static final String ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME = "allow_zero_byte_client_id"; public static final String ACL_FILE_PROPERTY_NAME = "acl_file"; + + public static final String CONNECTION_FILTER_CLASS_NAME = "connection_filter_class"; public static final String AUTHORIZATOR_CLASS_NAME = "authorizator_class"; public static final String AUTHENTICATOR_CLASS_NAME = "authenticator_class"; public static final String DB_AUTHENTICATOR_DRIVER = "authenticator.db.driver"; diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index a4a16035f..6690b59f2 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -16,6 +16,7 @@ package io.moquette.broker; import io.moquette.BrokerConstants; +import io.moquette.broker.security.IConnectionFilter; import io.moquette.broker.subscriptions.Topic; import io.moquette.broker.security.IAuthenticator; import io.netty.buffer.ByteBuf; @@ -50,6 +51,7 @@ final class MQTTConnection { final Channel channel; private final BrokerConfiguration brokerConfig; private final IAuthenticator authenticator; + private final IConnectionFilter connectionFilter; private final SessionRegistry sessionRegistry; private final PostOffice postOffice; private volatile boolean connected; @@ -57,10 +59,11 @@ final class MQTTConnection { private Session bindedSession; MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator, - SessionRegistry sessionRegistry, PostOffice postOffice) { + IConnectionFilter connectionFilter, SessionRegistry sessionRegistry, PostOffice postOffice) { this.channel = channel; this.brokerConfig = brokerConfig; this.authenticator = authenticator; + this.connectionFilter = connectionFilter; this.sessionRegistry = sessionRegistry; this.postOffice = postOffice; this.connected = false; @@ -180,6 +183,12 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { return PostOffice.RouteResult.failed(clientId); } + if (!connectionAllowed(clientId)) { + abortConnection(CONNECTION_REFUSED_BANNED); + channel.close().addListener(CLOSE_ON_FAILURE); + return PostOffice.RouteResult.failed(clientId); + } + final String sessionId = clientId; return postOffice.routeCommand(clientId, "CONN", () -> { checkMatchSessionLoop(sessionId); @@ -188,6 +197,10 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { }); } + private boolean connectionAllowed(String clientId) { + return connectionFilter == null || connectionFilter.allowConnection(clientDescriptor(clientId)); + } + private void checkMatchSessionLoop(String clientId) { if (!sessionLoopDebug) { return; @@ -632,10 +645,21 @@ public String toString() { return "MQTTConnection{channel=" + channel + ", connected=" + connected + '}'; } + // TODO : Unsafe cast, this is something else during testing (EmbeddedSocketAddress) InetSocketAddress remoteAddress() { return (InetSocketAddress) channel.remoteAddress(); } + ClientDescriptor clientDescriptor(String clientId) { + if (channel.remoteAddress() instanceof InetSocketAddress) { + return new ClientDescriptor( + clientId, + ((InetSocketAddress) channel.remoteAddress()).getHostString(), + ((InetSocketAddress) channel.remoteAddress()).getPort()); + } + return new ClientDescriptor(clientId, "unknown", -1); + } + public void readCompleted() { LOG.debug("readCompleted client CId: {}", getClientId()); if (getClientId() != null) { diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnectionFactory.java b/broker/src/main/java/io/moquette/broker/MQTTConnectionFactory.java index f171dcc66..3bfef8f80 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnectionFactory.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnectionFactory.java @@ -16,24 +16,27 @@ package io.moquette.broker; import io.moquette.broker.security.IAuthenticator; +import io.moquette.broker.security.IConnectionFilter; import io.netty.channel.Channel; class MQTTConnectionFactory { private final BrokerConfiguration brokerConfig; private final IAuthenticator authenticator; + private final IConnectionFilter connectionFilter; private final SessionRegistry sessionRegistry; private final PostOffice postOffice; MQTTConnectionFactory(BrokerConfiguration brokerConfig, IAuthenticator authenticator, - SessionRegistry sessionRegistry, PostOffice postOffice) { + IConnectionFilter connectionFilter, SessionRegistry sessionRegistry, PostOffice postOffice) { this.brokerConfig = brokerConfig; this.authenticator = authenticator; + this.connectionFilter = connectionFilter; this.sessionRegistry = sessionRegistry; this.postOffice = postOffice; } MQTTConnection create(Channel channel) { - return new MQTTConnection(channel, brokerConfig, authenticator, sessionRegistry, postOffice); + return new MQTTConnection(channel, brokerConfig, authenticator, connectionFilter, sessionRegistry, postOffice); } } diff --git a/broker/src/main/java/io/moquette/broker/Server.java b/broker/src/main/java/io/moquette/broker/Server.java index aaa55475d..f1cdc9808 100644 --- a/broker/src/main/java/io/moquette/broker/Server.java +++ b/broker/src/main/java/io/moquette/broker/Server.java @@ -26,6 +26,7 @@ import io.moquette.broker.security.DenyAllAuthorizatorPolicy; import io.moquette.broker.security.IAuthenticator; import io.moquette.broker.security.IAuthorizatorPolicy; +import io.moquette.broker.security.IConnectionFilter; import io.moquette.broker.security.PermitAllAuthorizatorPolicy; import io.moquette.broker.security.ResourceAuthenticator; import io.moquette.broker.unsafequeues.QueueException; @@ -167,11 +168,11 @@ public void startServer(IConfig config) throws IOException { */ public void startServer(IConfig config, List handlers) throws IOException { LOG.debug("Starting moquette integration using IConfig instance and intercept handlers"); - startServer(config, handlers, null, null, null); + startServer(config, handlers, null, null, null, null); } public void startServer(IConfig config, List handlers, ISslContextCreator sslCtxCreator, - IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException { + IConnectionFilter connectionFilter, IAuthenticator authenticator, IAuthorizatorPolicy authorizatorPolicy) throws IOException { final long start = System.currentTimeMillis(); if (handlers == null) { handlers = Collections.emptyList(); @@ -190,6 +191,7 @@ public void startServer(IConfig config, List handler LOG.info("Using default SSL context creator"); sslCtxCreator = new DefaultMoquetteSslContextCreator(config); } + connectionFilter = initializeConnectionFilter(connectionFilter, config); authenticator = initializeAuthenticator(authenticator, config); authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config); @@ -245,7 +247,7 @@ public void startServer(IConfig config, List handler dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, interceptor, authorizator, loopsGroup); final BrokerConfiguration brokerConfig = new BrokerConfiguration(config); - MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions, + MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, connectionFilter, sessions, dispatcher); final NewNettyMQTTHandler mqttHandler = new NewNettyMQTTHandler(connectionFactory); @@ -262,6 +264,7 @@ public void startServer(IConfig config, List handler initialized = true; } + private static IQueueRepository initQueuesRepository(IConfig config, Path dataPath, H2Builder h2Builder) throws IOException { final IQueueRepository queueRepository; final String queueType = config.getProperty(BrokerConstants.PERSISTENT_QUEUE_TYPE_PROPERTY_NAME); @@ -459,6 +462,16 @@ private IAuthorizatorPolicy initializeAuthorizatorPolicy(IAuthorizatorPolicy aut return authorizatorPolicy; } + private IConnectionFilter initializeConnectionFilter(IConnectionFilter connectionFilter, IConfig props) { + LOG.debug("Configuring MQTT Connection Filter"); + String connectionFilterClassName = props.getProperty(BrokerConstants.CONNECTION_FILTER_CLASS_NAME, ""); + + if (connectionFilter == null && !connectionFilterClassName.isEmpty()) { + connectionFilter = loadClass(connectionFilterClassName, IConnectionFilter.class, IConfig.class, props); + } + return connectionFilter; + } + private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, IConfig props) { LOG.debug("Configuring MQTT authenticator"); String authenticatorClassName = props.getProperty(BrokerConstants.AUTHENTICATOR_CLASS_NAME, ""); diff --git a/broker/src/main/java/io/moquette/broker/security/IConnectionFilter.java b/broker/src/main/java/io/moquette/broker/security/IConnectionFilter.java new file mode 100644 index 000000000..101e673b4 --- /dev/null +++ b/broker/src/main/java/io/moquette/broker/security/IConnectionFilter.java @@ -0,0 +1,7 @@ +package io.moquette.broker.security; + +import io.moquette.broker.ClientDescriptor; + +public interface IConnectionFilter { + boolean allowConnection(ClientDescriptor clientDescriptor); +} diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java index c0313aed7..1ba463870 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionConnectTest.java @@ -55,6 +55,7 @@ public class MQTTConnectionConnectTest { private SessionRegistry sessionRegistry; private MqttMessageBuilders.ConnectBuilder connMsg; private static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); + private MockConnectionFilter connectionFilter = new MockConnectionFilter(); private IAuthenticator mockAuthenticator; private PostOffice postOffice; private MemoryQueueRepository queueRepository; @@ -92,7 +93,7 @@ private MQTTConnection createMQTTConnectionWithPostOffice(BrokerConfiguration co } private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel, PostOffice postOffice) { - return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, postOffice); } @Test @@ -194,6 +195,23 @@ public void validAuthentication() throws ExecutionException, InterruptedExceptio assertTrue(channel.isOpen(), "Connection is accepted and therefore must remain open"); } + + @Test + public void validAuthenticationBannedClient() throws ExecutionException, InterruptedException { + MqttConnectMessage msg = connMsg.clientId(FAKE_CLIENT_ID) + .username(TEST_USER).password(TEST_PWD).build(); + + connectionFilter.banClientId(FAKE_CLIENT_ID); + + // Exercise + PostOffice.RouteResult result = sut.processConnect(msg); + assertFalse(result.isSuccess()); + + // Verify + assertEqualsConnAck(CONNECTION_REFUSED_BANNED, channel.readOutbound()); + assertFalse(channel.isOpen(), "Connection is refused/baned and therefore must not remain open"); + } + @Test public void noPasswdAuthentication() { MqttConnectMessage msg = connMsg.clientId(FAKE_CLIENT_ID) diff --git a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java index b264bf50c..9d1b5e3df 100644 --- a/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/MQTTConnectionPublishTest.java @@ -15,6 +15,7 @@ */ package io.moquette.broker; +import io.moquette.broker.security.IConnectionFilter; import io.moquette.broker.security.PermitAllAuthorizatorPolicy; import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory; import io.moquette.broker.subscriptions.ISubscriptionsDirectory; @@ -71,6 +72,7 @@ private void createMQTTConnection(BrokerConfiguration config) { private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { IAuthenticator mockAuthenticator = new MockAuthenticator(singleton(FAKE_CLIENT_ID), singletonMap(TEST_USER, TEST_PWD)); + IConnectionFilter connectionFilter = new MockConnectionFilter(); ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory(); ISubscriptionsRepository subscriptionsRepository = new MemorySubscriptionsRepository(); @@ -83,7 +85,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, 1024); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sessionRegistry, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup); - return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, postOffice); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, postOffice); } // @NotNull diff --git a/broker/src/test/java/io/moquette/broker/MockConnectionFilter.java b/broker/src/test/java/io/moquette/broker/MockConnectionFilter.java new file mode 100644 index 000000000..68575a705 --- /dev/null +++ b/broker/src/test/java/io/moquette/broker/MockConnectionFilter.java @@ -0,0 +1,33 @@ +package io.moquette.broker; + +import io.moquette.broker.security.IConnectionFilter; + +import java.util.Set; +import java.util.HashSet; +import java.util.stream.Stream; + +public class MockConnectionFilter implements IConnectionFilter { + private Set bannedClientIds = new HashSet<>(); + private Set bannedAddresses = new HashSet<>(); + @Override + public boolean allowConnection(ClientDescriptor clientDescriptor) { + return !bannedClientIds.contains(clientDescriptor.getClientID()) + && !bannedAddresses.contains(clientDescriptor.getAddress()); + } + + public MockConnectionFilter banClientId(String clientId) { + bannedClientIds.add(clientId); + return this; + } + + public MockConnectionFilter banAddress(String address) { + bannedAddresses.add(address); + return this; + } + + public MockConnectionFilter reset() { + bannedClientIds = new HashSet<>(); + bannedAddresses = new HashSet<>(); + return this; + } +} diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java index 4aab271f0..efd15c700 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeInternalPublishTest.java @@ -54,6 +54,7 @@ public class PostOfficeInternalPublishTest { private ISubscriptionsDirectory subscriptions; private MqttConnectMessage connectMessage; private SessionRegistry sessionRegistry; + private MockConnectionFilter connectionFilter = new MockConnectionFilter(); private MockAuthenticator mockAuthenticator; private static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); @@ -79,7 +80,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config) { } private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { - return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, sut); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, sut); } private void initPostOfficeAndSubsystems() { diff --git a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java index 1c52b52e8..18635d74f 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficePublishTest.java @@ -66,6 +66,7 @@ public class PostOfficePublishTest { public static final String FAKE_USER_NAME = "UnAuthUser"; private MqttConnectMessage connectMessage; private SessionRegistry sessionRegistry; + private MockConnectionFilter connectionFilter = new MockConnectionFilter(); private MockAuthenticator mockAuthenticator; static final BrokerConfiguration ALLOW_ANONYMOUS_AND_ZERO_BYTES_CLID = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); @@ -93,7 +94,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config) { } private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { - return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, sut); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, sut); } private void initPostOfficeAndSubsystems() { diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java index 7a7d43fc4..675c7dcb3 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeSubscribeTest.java @@ -67,6 +67,7 @@ public class PostOfficeSubscribeTest { private ISubscriptionsDirectory subscriptions; public static final String FAKE_USER_NAME = "UnAuthUser"; private MqttConnectMessage connectMessage; + private MockConnectionFilter connectionFilter = new MockConnectionFilter(); private IAuthenticator mockAuthenticator; private SessionRegistry sessionRegistry; public static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); @@ -104,7 +105,7 @@ private void prepareSUT() { } private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { - return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, sut); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, sut); } protected void connect() { diff --git a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java index 763af9ef9..9e0ece94e 100644 --- a/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java +++ b/broker/src/test/java/io/moquette/broker/PostOfficeUnsubscribeTest.java @@ -58,6 +58,7 @@ public class PostOfficeUnsubscribeTest { private PostOffice sut; private ISubscriptionsDirectory subscriptions; private MqttConnectMessage connectMessage; + private MockConnectionFilter connectionFilter = new MockConnectionFilter(); private IAuthenticator mockAuthenticator; private SessionRegistry sessionRegistry; public static final BrokerConfiguration CONFIG = new BrokerConfiguration(true, true, false, NO_BUFFER_FLUSH); @@ -95,7 +96,7 @@ private void prepareSUT() { } private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { - return new MQTTConnection(channel, config, mockAuthenticator, sessionRegistry, sut); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sessionRegistry, sut); } protected static void connect(MQTTConnection connection, String clientId) { diff --git a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java index e4fd626d5..087a0d038 100644 --- a/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionRegistryTest.java @@ -79,6 +79,7 @@ private void createMQTTConnection(BrokerConfiguration config) { private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel channel) { IAuthenticator mockAuthenticator = new MockAuthenticator(singleton(FAKE_CLIENT_ID), singletonMap(TEST_USER, TEST_PWD)); + MockConnectionFilter connectionFilter = new MockConnectionFilter(); ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory(); ISubscriptionsRepository subscriptionsRepository = new MemorySubscriptionsRepository(); @@ -91,7 +92,7 @@ private MQTTConnection createMQTTConnection(BrokerConfiguration config, Channel final SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, 1024); final PostOffice postOffice = new PostOffice(subscriptions, new MemoryRetainedRepository(), sut, ConnectionTestUtils.NO_OBSERVERS_INTERCEPTOR, permitAll, loopsGroup); - return new MQTTConnection(channel, config, mockAuthenticator, sut, postOffice); + return new MQTTConnection(channel, config, mockAuthenticator, connectionFilter, sut, postOffice); } @Test diff --git a/broker/src/test/java/io/moquette/broker/SessionTest.java b/broker/src/test/java/io/moquette/broker/SessionTest.java index 00fb2b63b..8305cf9eb 100644 --- a/broker/src/test/java/io/moquette/broker/SessionTest.java +++ b/broker/src/test/java/io/moquette/broker/SessionTest.java @@ -19,7 +19,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static io.moquette.BrokerConstants.NO_BUFFER_FLUSH; -import static org.junit.jupiter.api.Assertions.*; public class SessionTest { @@ -122,7 +121,7 @@ public void testRemoveSubscription() { private void createConnection(Session client) { BrokerConfiguration brokerConfiguration = new BrokerConfiguration(true, false, false, NO_BUFFER_FLUSH); - MQTTConnection mqttConnection = new MQTTConnection(testChannel, brokerConfiguration, null, null, null); + MQTTConnection mqttConnection = new MQTTConnection(testChannel, brokerConfiguration, null, null, null, null); client.markConnecting(); client.bind(mqttConnection); client.completeConnection(); diff --git a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java index 09370b073..aadcec5df 100644 --- a/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java +++ b/broker/src/test/java/io/moquette/integration/ServerIntegrationPahoCanPublishOnReadBlockedTopicTest.java @@ -96,7 +96,7 @@ public boolean canRead(Topic topic, String user, String client) { } }; - m_server.startServer(m_config, EMPTY_OBSERVERS, null, new AcceptAllAuthenticator(), switchingAuthorizator); + m_server.startServer(m_config, EMPTY_OBSERVERS, null, null, new AcceptAllAuthenticator(), switchingAuthorizator); } @BeforeEach