From 231e08ada23673bbec70b54d6a9ff8a97f8b4c3c Mon Sep 17 00:00:00 2001 From: onlyMIT Date: Tue, 29 Jan 2019 17:58:36 +0800 Subject: [PATCH 1/2] ARTEMIS-2226 last consumer connection should close the previous consumer connection Multiple consumers using the same clientId in the cluster, the last consumer connection should close the previous consumer connection! ARTEMIS-2226 last consumer connection should close the previous consumer connection to address apache-rat-plugin:0.12:check ARTEMIS-2226 last consumer connection should close the previous consumer connection to address checkstyle ARTEMIS-2226 last consumer connection should close the previous consumer connection adjust the code structure --- .../api/core/management/ManagementHelper.java | 4 + .../protocol/mqtt/MQTTConnectionManager.java | 4 +- .../protocol/mqtt/MQTTProtocolManager.java | 46 +- .../core/protocol/mqtt/MQTTSession.java | 39 +- .../cluster/impl/ClusterConnectionImpl.java | 8 + .../core/server/impl/ServerConsumerImpl.java | 5 + .../core/server/impl/ServerSessionImpl.java | 8 + .../MqttClusterRemoteSubscribeTest.java | 1132 +++++++++++++++-- 8 files changed, 1148 insertions(+), 98 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index bba84190222..53cb08716a0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -84,6 +84,10 @@ public final class ManagementHelper { public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID"); + public static final SimpleString HDR_PROTOCOL_NAME = new SimpleString("_AMQ_Protocol_Name"); + + public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID"); + // Attributes ---------------------------------------------------- // Static -------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index bc511ea1dc7..b533d50ba3b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -163,10 +163,10 @@ synchronized void disconnect(boolean failure) { private MQTTSessionState getSessionState(String clientId) { /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ - MQTTSessionState state = MQTTSession.SESSIONS.get(clientId); + MQTTSessionState state = session.getSessionStateFromSessionMap(clientId); if (state == null) { state = new MQTTSessionState(clientId); - MQTTSession.SESSIONS.put(clientId, state); + session.putSessionStateIntoSessionMap(clientId, state); } return state; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index c8832bafaad..3b43d63ee55 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +31,15 @@ import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; @@ -40,6 +48,9 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.collections.TypedProperties; + +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CREATED; /** * MQTTProtocolManager @@ -62,11 +73,44 @@ class MQTTProtocolManager extends AbstractProtocolManager outgoingInterceptors) { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); + server.getManagementService().addNotificationListener(this); } @Override public void onNotification(Notification notification) { - // TODO handle notifications + if (!(notification.getType() instanceof CoreNotificationType)) + return; + + CoreNotificationType type = (CoreNotificationType) notification.getType(); + if (type != CONSUMER_CREATED) + return; + + TypedProperties props = notification.getProperties(); + + SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME); + + if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME)) + return; + + int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE); + + if (distance > 0) { + SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME); + + Binding binding = server.getPostOffice().getBinding(queueName); + if (binding != null) { + Queue queue = (Queue) binding.getBindable(); + String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString(); + //If the client ID represents a client already connected to the server then the server MUST disconnect the existing client. + //Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time + Collection consumersSet = queue.getConsumers(); + for (Consumer consumer : consumersSet) { + ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; + if (clientId.equals(serverConsumer.getConnectionClientID())) + serverConsumer.getRemotingConnection().destroy(); + } + } + } } @Override diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 640b893958c..67551eaedea 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -30,10 +30,12 @@ public class MQTTSession { - static Map SESSIONS = new ConcurrentHashMap<>(); + private static Map SESSIONS = new ConcurrentHashMap<>(); private final String id = UUID.randomUUID().toString(); + private final String identity; + private MQTTProtocolHandler protocolHandler; private MQTTSubscriptionManager subscriptionManager; @@ -72,6 +74,8 @@ public MQTTSession(MQTTProtocolHandler protocolHandler, this.protocolManager = protocolManager; this.wildcardConfiguration = wildcardConfiguration; + identity = protocolHandler.getServer().getIdentity(); + this.connection = connection; mqttConnectionManager = new MQTTConnectionManager(this); @@ -108,7 +112,7 @@ synchronized void stop() throws Exception { if (isClean()) { clean(); - SESSIONS.remove(connection.getClientID()); + removeSessionStateFromSessionMap(connection.getClientID()); } } stopped = true; @@ -204,4 +208,35 @@ public CoreMessageObjectPools getCoreMessageObjectPools() { public static Map getSessions() { return new HashMap<>(SESSIONS); } + + public MQTTSessionState getSessionStateFromSessionMap(String clientId) { + return SESSIONS.get(generateSessionStateKey(clientId)); + } + + public void putSessionStateIntoSessionMap(String clientId, MQTTSessionState state) { + SESSIONS.put(generateSessionStateKey(clientId), state); + } + + public void removeSessionStateFromSessionMap(String clientId) { + SESSIONS.remove(generateSessionStateKey(clientId)); + } + + /** + * When performing cluster testing, different nodes of the cluster are actually started in the same JVM process, + * and their class data is shared. {@link #SESSIONS} is shared, cause the test abnormal when multiple consumers + * use the same clientId to connect to different nodes simultaneously abnormal. + * if {@link ActiveMQServer#getIdentity()} is not null, indicates that the test is in progress. + * {@link ActiveMQServer#getIdentity()} is unique among different nodes. Use "identity+clientId" as the key to + * call {@link #SESSIONS} during testing, to ensure the correctness of test cases. + * + * @param clientId + * @return + */ + private String generateSessionStateKey(String clientId) { + String key = clientId; + if (identity != null) { + key = identity + key; + } + return key; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 4b884b5c1c5..d2cc152bf4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1336,6 +1336,14 @@ private synchronized void doConsumerCreated(final ClientMessage message) throws // Need to propagate the consumer add TypedProperties props = new TypedProperties(); + SimpleString protocolName = message.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME); + if (protocolName != null) + props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, protocolName); + + SimpleString clientId = message.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID); + if (clientId != null) + props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, clientId); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 48e2f06bf31..077d4e01431 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.FutureLatch; @@ -1544,4 +1545,8 @@ public String getConnectionLocalAddress() { public String getConnectionRemoteAddress() { return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress(); } + + public RemotingConnection getRemotingConnection() { + return this.session.getRemotingConnection(); + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 56c60f53dab..fbb9af05416 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -540,6 +540,14 @@ public ServerConsumer createConsumer(final long consumerID, props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); } + String protocolName = remotingConnection.getProtocolName(); + if (protocolName != null) + props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, SimpleString.toSimpleString(protocolName)); + + String clientId = remotingConnection.getClientID(); + if (clientId != null) + props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(clientId)); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); if (logger.isDebugEnabled()) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java index 8caba17d148..2f64892f756 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java @@ -43,10 +43,112 @@ public boolean isNetty() { } @Test - public void unsubscribeRemoteQueue() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useSameClientIdAndAnycastSubscribeRemoteQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndAnycastSubscribeRemoteQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); @@ -54,18 +156,28 @@ public void unsubscribeRemoteQueue() throws Exception { BlockingConnection connection2 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); - connection2.subscribe(topics); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages @@ -73,9 +185,9 @@ public void unsubscribeRemoteQueue() throws Exception { String payload2 = "This is message 2"; String payload3 = "This is message 3"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); @@ -89,12 +201,19 @@ public void unsubscribeRemoteQueue() throws Exception { assertEquals(payload2, new String(message2.getPayload())); assertEquals(payload3, new String(message3.getPayload())); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection2.unsubscribe(new String[]{TOPIC}); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -103,7 +222,6 @@ public void unsubscribeRemoteQueue() throws Exception { Message message31 = connection1.receive(5, TimeUnit.SECONDS); message31.ack(); - String message11String = new String(message11.getPayload()); String message21String = new String(message21.getPayload()); String message31String = new String(message31.getPayload()); @@ -111,14 +229,13 @@ public void unsubscribeRemoteQueue() throws Exception { assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) ); assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) ); - } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } @@ -127,10 +244,114 @@ public void unsubscribeRemoteQueue() throws Exception { } @Test - public void unsubscribeRemoteQueueWildCard() throws Exception { - final String TOPIC = "test/+/some/#"; + public void useSameClientIdAndMulticastSubscribeRemoteQueue() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/1/some/la"; + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndMulticastSubscribeRemoteQueue() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/1/some/la"; + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); @@ -138,18 +359,141 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { BlockingConnection connection2 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + connection2.subscribe(topics); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = connection1.receive(5, TimeUnit.SECONDS); + message11.ack(); + Message message12 = connection1.receive(5, TimeUnit.SECONDS); + message12.ack(); + Message message13 = connection1.receive(5, TimeUnit.SECONDS); + message13.ack(); + + assertEquals(payload1, new String(message11.getPayload())); + assertEquals(payload2, new String(message12.getPayload())); + assertEquals(payload3, new String(message13.getPayload())); + + Message message21 = connection2.receive(5, TimeUnit.SECONDS); + message21.ack(); + Message message22 = connection2.receive(5, TimeUnit.SECONDS); + message22.ack(); + Message message23 = connection2.receive(5, TimeUnit.SECONDS); + message23.ack(); + + assertEquals(payload1, new String(message21.getPayload())); + assertEquals(payload2, new String(message22.getPayload())); + assertEquals(payload3, new String(message23.getPayload())); + + connection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message31 = connection1.receive(5, TimeUnit.SECONDS); + message31.ack(); + Message message32 = connection1.receive(5, TimeUnit.SECONDS); + message32.ack(); + Message message33 = connection1.receive(5, TimeUnit.SECONDS); + message33.ack(); + + assertEquals(payload1, new String(message31.getPayload())); + assertEquals(payload2, new String(message32.getPayload())); + assertEquals(payload3, new String(message33.getPayload())); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + + } + + @Test + public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); // Publish Messages @@ -157,9 +501,105 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { String payload2 = "This is message 2"; String payload3 = "This is message 3"; - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); @@ -174,11 +614,17 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { assertEquals(payload3, new String(message3.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -187,22 +633,232 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { Message message31 = connection1.receive(5, TimeUnit.SECONDS); message31.ack(); + String message11String = new String(message11.getPayload()); String message21String = new String(message21.getPayload()); String message31String = new String(message31.getPayload()); + assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String) ); + assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) ); + assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) ); - assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String)); - assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String)); - assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String)); + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + } + + @Test + public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/+/some/#"; + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + pubConnection.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/+/some/#"; + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + + Message message11 = connection1.receive(5, TimeUnit.SECONDS); + message11.ack(); + Message message12 = connection1.receive(5, TimeUnit.SECONDS); + message12.ack(); + Message message13 = connection1.receive(5, TimeUnit.SECONDS); + message13.ack(); + + assertEquals(payload1, new String(message11.getPayload())); + assertEquals(payload2, new String(message12.getPayload())); + assertEquals(payload3, new String(message13.getPayload())); + + Message message21 = connection2.receive(5, TimeUnit.SECONDS); + message21.ack(); + Message message22 = connection2.receive(5, TimeUnit.SECONDS); + message22.ack(); + Message message23 = connection2.receive(5, TimeUnit.SECONDS); + message23.ack(); + + assertEquals(payload1, new String(message21.getPayload())); + assertEquals(payload2, new String(message22.getPayload())); + assertEquals(payload3, new String(message23.getPayload())); + + + connection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection1.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message31 = connection1.receive(5, TimeUnit.SECONDS); + message31.ack(); + Message message32 = connection1.receive(5, TimeUnit.SECONDS); + message32.ack(); + Message message33 = connection1.receive(5, TimeUnit.SECONDS); + message33.ack(); + + assertEquals(payload1, new String(message31.getPayload())); + assertEquals(payload2, new String(message32.getPayload())); + assertEquals(payload3, new String(message33.getPayload())); + + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } @@ -211,11 +867,13 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { } @Test - public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useDiffClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; final String TOPIC2 = "sample"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); startServers(0, 1); @@ -223,19 +881,28 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { BlockingConnection connection2 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - connection1.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE)}); - connection2.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + connection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + connection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages String payload1 = "This is message 1"; @@ -243,9 +910,9 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { String payload3 = "This is message 3"; String payload4 = "This is message 4"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); @@ -263,11 +930,19 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { assertEquals(payload3, new String(message3.getPayload())); assertEquals(payload4, new String(message4.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); @@ -289,12 +964,12 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { } finally { - if (connection1 != null) { - connection1.unsubscribe(new String[]{TOPIC}); + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(new String[]{ANYCAST_TOPIC}); connection1.disconnect(); } - if (connection2 != null) { - connection2.unsubscribe(new String[]{TOPIC, TOPIC2}); + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2}); connection2.disconnect(); } } @@ -302,10 +977,248 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { } @Test - public void unsubscribeExistingQueue() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useSameClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String TOPIC2 = "sample"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + // Subscribe to topics + subConnection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + String payload4 = "This is message 4"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + Message message4 = subConnection2.receive(5, TimeUnit.SECONDS); + message4.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload4, new String(message4.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + message11.ack(); + assertEquals(payload4, new String(message11.getPayload())); + + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + Message message41 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message41); + + } finally { + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(new String[]{ANYCAST_TOPIC}); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2}); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useSameClientIdSubscribeExistingQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection subConnection3 = null; + BlockingConnection pubConnection = null; + try { + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + subConnection3 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + + //the second sub connection will be closed, when the third sub connection is connected + assertFalse(subConnection2.isConnected()); + + // Subscribe to topics + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + + subConnection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + + subConnection3.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + //the first sub connection also will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + String payload4 = "This is message 4"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection3.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection3.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection3.receive(5, TimeUnit.SECONDS); + message3.ack(); + Message message4 = subConnection3.receive(5, TimeUnit.SECONDS); + message4.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload4, new String(message4.getPayload())); + + + subConnection3.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (subConnection3 != null && subConnection3.isConnected()) { + subConnection3.unsubscribe(topics); + subConnection3.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.unsubscribe(topics); + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdSubscribeExistingQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + final String clientId3 = "clientId3"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); BlockingConnection connection1 = null; @@ -313,22 +1226,43 @@ public void unsubscribeExistingQueue() throws Exception { BlockingConnection connection3 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); - connection3 = retrieveMQTTConnection("tcp://localhost:61617"); + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + connection2.subscribe(topics); - connection3.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 2, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); - waitForBindings(0, TOPIC, 1, 2, false); - waitForBindings(1, TOPIC, 1, 1, false); + connection3.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 2, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 2, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages String payload1 = "This is message 1"; @@ -336,10 +1270,10 @@ public void unsubscribeExistingQueue() throws Exception { String payload3 = "This is message 3"; String payload4 = "This is message 4"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); message1.ack(); @@ -356,11 +1290,20 @@ public void unsubscribeExistingQueue() throws Exception { assertEquals(payload4, new String(message4.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -377,16 +1320,16 @@ public void unsubscribeExistingQueue() throws Exception { assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String)); } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } - if (connection3 != null) { + if (connection3 != null && connection3.isConnected()) { connection3.unsubscribe(topics); connection3.disconnect(); } @@ -395,9 +1338,12 @@ public void unsubscribeExistingQueue() throws Exception { } - private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost(host); + mqtt.setClientId(clientId); + mqtt.setConnectAttemptsMax(0); + mqtt.setReconnectAttemptsMax(0); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); return connection; From be1a939d080fdb29a4bd1d8eb2777988e3b7021d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Fri, 1 Feb 2019 02:12:50 +0000 Subject: [PATCH 2/2] ARTEMIS-2226 last consumer connection should close the previous consumer connection Addressing review comments. --- .../protocol/mqtt/MQTTConnectionManager.java | 8 +--- .../protocol/mqtt/MQTTProtocolManager.java | 32 ++++++++++++---- .../core/protocol/mqtt/MQTTSession.java | 37 +------------------ .../core/remoting/impl/AbstractAcceptor.java | 5 +++ .../activemq/artemis/core/server/Queue.java | 4 ++ .../artemis/core/server/impl/QueueImpl.java | 13 +++++++ .../core/server/impl/ServerSessionImpl.java | 6 ++- .../impl/ScheduledDeliveryHandlerTest.java | 6 +++ .../integration/mqtt/imported/MQTTTest.java | 15 +++----- .../mqtt/imported/MQTTTestSupport.java | 29 ++++++++++++--- .../unit/core/postoffice/impl/FakeQueue.java | 7 ++++ 11 files changed, 95 insertions(+), 67 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index b533d50ba3b..9587d54ab7d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -163,13 +163,7 @@ synchronized void disconnect(boolean failure) { private MQTTSessionState getSessionState(String clientId) { /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ - MQTTSessionState state = session.getSessionStateFromSessionMap(clientId); - if (state == null) { - state = new MQTTSessionState(clientId); - session.putSessionStateIntoSessionMap(clientId, state); - } - - return state; + return session.getProtocolManager().getSessionState(clientId); } private String validateClientId(String clientId, boolean cleanSession) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 3b43d63ee55..6e8446aa083 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -36,8 +37,7 @@ import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.Consumer; +import org.apache.activemq.artemis.core.server.*; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.management.Notification; @@ -55,7 +55,7 @@ /** * MQTTProtocolManager */ -class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { +public class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { private static final List websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1"); @@ -66,7 +66,8 @@ class MQTTProtocolManager extends AbstractProtocolManager outgoingInterceptors = new ArrayList<>(); //TODO Read in a list of existing client IDs from stored Sessions. - private Map connectedClients = new ConcurrentHashMap<>(); + private final Map connectedClients = new ConcurrentHashMap<>(); + private final Map sessionStates = new ConcurrentHashMap<>(); MQTTProtocolManager(ActiveMQServer server, List incomingInterceptors, @@ -103,11 +104,13 @@ public void onNotification(Notification notification) { String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString(); //If the client ID represents a client already connected to the server then the server MUST disconnect the existing client. //Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time - Collection consumersSet = queue.getConsumers(); + Collection consumersSet = queue.getConsumers((c) -> (c instanceof ServerConsumer) && clientId.equals(((ServerConsumer) c).getConnectionClientID())); for (Consumer consumer : consumersSet) { - ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; - if (clientId.equals(serverConsumer.getConnectionClientID())) - serverConsumer.getRemotingConnection().destroy(); + try { + ((ServerConsumer) consumer).close(false); + } catch (Exception e) { + log.error(e); + } } } } @@ -239,4 +242,17 @@ public void removeConnectedClient(String clientId) { public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) { return connectedClients.put(clientId, connection); } + + public MQTTSessionState getSessionState(String clientId) { + /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ + return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new); + } + + public MQTTSessionState removeSessionState(String clientId) { + return sessionStates.remove(clientId); + } + + public Map getSessionStates() { + return new HashMap<>(sessionStates); + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 67551eaedea..20d5535068a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -30,7 +30,6 @@ public class MQTTSession { - private static Map SESSIONS = new ConcurrentHashMap<>(); private final String id = UUID.randomUUID().toString(); @@ -112,7 +111,7 @@ synchronized void stop() throws Exception { if (isClean()) { clean(); - removeSessionStateFromSessionMap(connection.getClientID()); + protocolManager.removeSessionState(connection.getClientID()); } } stopped = true; @@ -205,38 +204,4 @@ public CoreMessageObjectPools getCoreMessageObjectPools() { return coreMessageObjectPools; } - public static Map getSessions() { - return new HashMap<>(SESSIONS); - } - - public MQTTSessionState getSessionStateFromSessionMap(String clientId) { - return SESSIONS.get(generateSessionStateKey(clientId)); - } - - public void putSessionStateIntoSessionMap(String clientId, MQTTSessionState state) { - SESSIONS.put(generateSessionStateKey(clientId), state); - } - - public void removeSessionStateFromSessionMap(String clientId) { - SESSIONS.remove(generateSessionStateKey(clientId)); - } - - /** - * When performing cluster testing, different nodes of the cluster are actually started in the same JVM process, - * and their class data is shared. {@link #SESSIONS} is shared, cause the test abnormal when multiple consumers - * use the same clientId to connect to different nodes simultaneously abnormal. - * if {@link ActiveMQServer#getIdentity()} is not null, indicates that the test is in progress. - * {@link ActiveMQServer#getIdentity()} is unique among different nodes. Use "identity+clientId" as the key to - * call {@link #SESSIONS} during testing, to ensure the correctness of test cases. - * - * @param clientId - * @return - */ - private String generateSessionStateKey(String clientId) { - String key = clientId; - if (identity != null) { - key = identity + key; - } - return key; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java index a2f30f3f626..1aa1dff7545 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.remoting.impl; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,4 +44,8 @@ public void updateInterceptors(List incomingInterceptors, } } + public Map getProtocolMap() { + return Collections.unmodifiableMap(protocolMap); + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 031d01a58a8..851ce51e5ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -19,8 +19,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -287,6 +289,8 @@ int moveReferences(int flushLimit, Collection getConsumers(); + Set getConsumers(Predicate predicate); + Map getGroups(); void resetGroup(SimpleString groupID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 8209995e429..97490a58fba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNullRefException; @@ -1204,6 +1205,18 @@ public Set getConsumers() { return consumersSet; } + @Override + public Set getConsumers(Predicate predicate) { + Set consumersSet = new HashSet<>(); + for (ConsumerHolder consumerHolder : consumers) { + if (predicate.test(consumerHolder.consumer)) { + consumersSet.add(consumerHolder.consumer); + } + } + return consumersSet; + } + + @Override public synchronized Map getGroups() { return new HashMap<>(groups); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index fbb9af05416..883e90d5b74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -541,12 +541,14 @@ public ServerConsumer createConsumer(final long consumerID, } String protocolName = remotingConnection.getProtocolName(); - if (protocolName != null) + if (protocolName != null) { props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, SimpleString.toSimpleString(protocolName)); + } String clientId = remotingConnection.getClientID(); - if (clientId != null) + if (clientId != null) { props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(clientId)); + } Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 2e3b691dca8..2d8e3666f56 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; @@ -1301,6 +1302,11 @@ public Collection getConsumers() { return null; } + @Override + public Set getConsumers(Predicate predicate) { + return null; + } + @Override public Map getGroups() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 9fc6cfd4d35..72eb624ded5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -82,9 +82,6 @@ public class MQTTTest extends MQTTTestSupport { @Override @Before public void setUp() throws Exception { - Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); - sessions.setAccessible(true); - sessions.set(null, new ConcurrentHashMap<>()); super.setUp(); } @@ -1100,7 +1097,7 @@ public void testCleanSessionForSubscriptions() throws Exception { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST receive message from existing subscription from previous not clean session notClean = mqttNotClean.blockingConnection(); @@ -1112,7 +1109,7 @@ public void testCleanSessionForSubscriptions() throws Exception { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST NOT receive message from previous not clean session as existing subscription should be gone final MQTT mqttClean = createMQTTConnection(CLIENTID, true); @@ -1124,7 +1121,7 @@ public void testCleanSessionForSubscriptions() throws Exception { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - assertEquals(0, MQTTSession.getSessions().size()); + assertEquals(0, getSessions().size()); // MUST NOT receive message from previous clean session as existing subscription should be gone notClean = mqttNotClean.blockingConnection(); @@ -1133,7 +1130,7 @@ public void testCleanSessionForSubscriptions() throws Exception { assertNull(msg); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); } @Test(timeout = 60 * 1000) @@ -1147,7 +1144,7 @@ public void testCleanSessionForMessages() throws Exception { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST NOT receive message from previous not clean session even when creating a new subscription final MQTT mqttClean = createMQTTConnection(CLIENTID, true); @@ -1159,7 +1156,7 @@ public void testCleanSessionForMessages() throws Exception { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - assertEquals(0, MQTTSession.getSessions().size()); + assertEquals(0, getSessions().size()); } @Test(timeout = 60 * 1000) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index e49ec92ed66..bd12a86d387 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -17,6 +17,12 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; import javax.jms.ConnectionFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; @@ -28,11 +34,6 @@ import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import io.netty.handler.codec.mqtt.MqttMessage; @@ -42,13 +43,18 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState; +import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.fusesource.mqtt.client.MQTT; @@ -383,6 +389,19 @@ private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exce return mqtt; } + public Map getSessions() { + Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT"); + if (acceptor instanceof AbstractAcceptor) { + ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); + if (protocolManager instanceof MQTTProtocolManager) { + MQTTProtocolManager mqttProtocolManager = (MQTTProtocolManager) protocolManager; + return ((MQTTProtocolManager) protocolManager).getSessionStates(); + } + + } + return Collections.emptyMap(); + } + protected Tracer createTracer() { return new Tracer() { @Override diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 15ac691f6c8..baff378a2d8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -429,6 +430,12 @@ public Set getConsumers() { return null; } + @Override + public Set getConsumers(Predicate predicate) { + // no-op + return null; + } + @Override public Map getGroups() { return null;