diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index bba84190222..53cb08716a0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -84,6 +84,10 @@ public final class ManagementHelper { public static final SimpleString HDR_MESSAGE_ID = new SimpleString("_AMQ_Message_ID"); + public static final SimpleString HDR_PROTOCOL_NAME = new SimpleString("_AMQ_Protocol_Name"); + + public static final SimpleString HDR_CLIENT_ID = new SimpleString("_AMQ_Client_ID"); + // Attributes ---------------------------------------------------- // Static -------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index bc511ea1dc7..9587d54ab7d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -163,13 +163,7 @@ synchronized void disconnect(boolean failure) { private MQTTSessionState getSessionState(String clientId) { /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ - MQTTSessionState state = MQTTSession.SESSIONS.get(clientId); - if (state == null) { - state = new MQTTSessionState(clientId); - MQTTSession.SESSIONS.put(clientId, state); - } - - return state; + return session.getProtocolManager().getSessionState(clientId); } private String validateClientId(String clientId, boolean cleanSession) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index c8832bafaad..6e8446aa083 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -18,6 +18,8 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,8 +32,14 @@ import io.netty.handler.codec.mqtt.MqttMessage; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.*; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager; @@ -40,11 +48,14 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.collections.TypedProperties; + +import static org.apache.activemq.artemis.api.core.management.CoreNotificationType.CONSUMER_CREATED; /** * MQTTProtocolManager */ -class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { +public class MQTTProtocolManager extends AbstractProtocolManager implements NotificationListener { private static final List websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1"); @@ -55,18 +66,54 @@ class MQTTProtocolManager extends AbstractProtocolManager outgoingInterceptors = new ArrayList<>(); //TODO Read in a list of existing client IDs from stored Sessions. - private Map connectedClients = new ConcurrentHashMap<>(); + private final Map connectedClients = new ConcurrentHashMap<>(); + private final Map sessionStates = new ConcurrentHashMap<>(); MQTTProtocolManager(ActiveMQServer server, List incomingInterceptors, List outgoingInterceptors) { this.server = server; this.updateInterceptors(incomingInterceptors, outgoingInterceptors); + server.getManagementService().addNotificationListener(this); } @Override public void onNotification(Notification notification) { - // TODO handle notifications + if (!(notification.getType() instanceof CoreNotificationType)) + return; + + CoreNotificationType type = (CoreNotificationType) notification.getType(); + if (type != CONSUMER_CREATED) + return; + + TypedProperties props = notification.getProperties(); + + SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME); + + if (protocolName == null || !protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME)) + return; + + int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE); + + if (distance > 0) { + SimpleString queueName = props.getSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME); + + Binding binding = server.getPostOffice().getBinding(queueName); + if (binding != null) { + Queue queue = (Queue) binding.getBindable(); + String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString(); + //If the client ID represents a client already connected to the server then the server MUST disconnect the existing client. + //Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time + Collection consumersSet = queue.getConsumers((c) -> (c instanceof ServerConsumer) && clientId.equals(((ServerConsumer) c).getConnectionClientID())); + for (Consumer consumer : consumersSet) { + try { + ((ServerConsumer) consumer).close(false); + } catch (Exception e) { + log.error(e); + } + } + } + } } @Override @@ -195,4 +242,17 @@ public void removeConnectedClient(String clientId) { public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) { return connectedClients.put(clientId, connection); } + + public MQTTSessionState getSessionState(String clientId) { + /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */ + return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new); + } + + public MQTTSessionState removeSessionState(String clientId) { + return sessionStates.remove(clientId); + } + + public Map getSessionStates() { + return new HashMap<>(sessionStates); + } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 640b893958c..20d5535068a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -30,10 +30,11 @@ public class MQTTSession { - static Map SESSIONS = new ConcurrentHashMap<>(); private final String id = UUID.randomUUID().toString(); + private final String identity; + private MQTTProtocolHandler protocolHandler; private MQTTSubscriptionManager subscriptionManager; @@ -72,6 +73,8 @@ public MQTTSession(MQTTProtocolHandler protocolHandler, this.protocolManager = protocolManager; this.wildcardConfiguration = wildcardConfiguration; + identity = protocolHandler.getServer().getIdentity(); + this.connection = connection; mqttConnectionManager = new MQTTConnectionManager(this); @@ -108,7 +111,7 @@ synchronized void stop() throws Exception { if (isClean()) { clean(); - SESSIONS.remove(connection.getClientID()); + protocolManager.removeSessionState(connection.getClientID()); } } stopped = true; @@ -201,7 +204,4 @@ public CoreMessageObjectPools getCoreMessageObjectPools() { return coreMessageObjectPools; } - public static Map getSessions() { - return new HashMap<>(SESSIONS); - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java index a2f30f3f626..1aa1dff7545 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.remoting.impl; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,4 +44,8 @@ public void updateInterceptors(List incomingInterceptors, } } + public Map getProtocolMap() { + return Collections.unmodifiableMap(protocolMap); + } + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 031d01a58a8..851ce51e5ad 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -19,8 +19,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -287,6 +289,8 @@ int moveReferences(int flushLimit, Collection getConsumers(); + Set getConsumers(Predicate predicate); + Map getGroups(); void resetGroup(SimpleString groupID); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 4b884b5c1c5..d2cc152bf4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -1336,6 +1336,14 @@ private synchronized void doConsumerCreated(final ClientMessage message) throws // Need to propagate the consumer add TypedProperties props = new TypedProperties(); + SimpleString protocolName = message.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME); + if (protocolName != null) + props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, protocolName); + + SimpleString clientId = message.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID); + if (clientId != null) + props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, clientId); + props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress()); props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 8209995e429..97490a58fba 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNullRefException; @@ -1204,6 +1205,18 @@ public Set getConsumers() { return consumersSet; } + @Override + public Set getConsumers(Predicate predicate) { + Set consumersSet = new HashSet<>(); + for (ConsumerHolder consumerHolder : consumers) { + if (predicate.test(consumerHolder.consumer)) { + consumersSet.add(consumerHolder.consumer); + } + } + return consumersSet; + } + + @Override public synchronized Map getGroups() { return new HashMap<>(groups); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 48e2f06bf31..077d4e01431 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.FutureLatch; @@ -1544,4 +1545,8 @@ public String getConnectionLocalAddress() { public String getConnectionRemoteAddress() { return this.session.getRemotingConnection().getTransportConnection().getRemoteAddress(); } + + public RemotingConnection getRemotingConnection() { + return this.session.getRemotingConnection(); + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 56c60f53dab..883e90d5b74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -540,6 +540,16 @@ public ServerConsumer createConsumer(final long consumerID, props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); } + String protocolName = remotingConnection.getProtocolName(); + if (protocolName != null) { + props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, SimpleString.toSimpleString(protocolName)); + } + + String clientId = remotingConnection.getClientID(); + if (clientId != null) { + props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, SimpleString.toSimpleString(clientId)); + } + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); if (logger.isDebugEnabled()) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 2e3b691dca8..2d8e3666f56 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; @@ -1301,6 +1302,11 @@ public Collection getConsumers() { return null; } + @Override + public Set getConsumers(Predicate predicate) { + return null; + } + @Override public Map getGroups() { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 9fc6cfd4d35..72eb624ded5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -82,9 +82,6 @@ public class MQTTTest extends MQTTTestSupport { @Override @Before public void setUp() throws Exception { - Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); - sessions.setAccessible(true); - sessions.set(null, new ConcurrentHashMap<>()); super.setUp(); } @@ -1100,7 +1097,7 @@ public void testCleanSessionForSubscriptions() throws Exception { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST receive message from existing subscription from previous not clean session notClean = mqttNotClean.blockingConnection(); @@ -1112,7 +1109,7 @@ public void testCleanSessionForSubscriptions() throws Exception { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST NOT receive message from previous not clean session as existing subscription should be gone final MQTT mqttClean = createMQTTConnection(CLIENTID, true); @@ -1124,7 +1121,7 @@ public void testCleanSessionForSubscriptions() throws Exception { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - assertEquals(0, MQTTSession.getSessions().size()); + assertEquals(0, getSessions().size()); // MUST NOT receive message from previous clean session as existing subscription should be gone notClean = mqttNotClean.blockingConnection(); @@ -1133,7 +1130,7 @@ public void testCleanSessionForSubscriptions() throws Exception { assertNull(msg); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); } @Test(timeout = 60 * 1000) @@ -1147,7 +1144,7 @@ public void testCleanSessionForMessages() throws Exception { notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); notClean.disconnect(); - assertEquals(1, MQTTSession.getSessions().size()); + assertEquals(1, getSessions().size()); // MUST NOT receive message from previous not clean session even when creating a new subscription final MQTT mqttClean = createMQTTConnection(CLIENTID, true); @@ -1159,7 +1156,7 @@ public void testCleanSessionForMessages() throws Exception { clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false); clean.disconnect(); - assertEquals(0, MQTTSession.getSessions().size()); + assertEquals(0, getSessions().size()); } @Test(timeout = 60 * 1000) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index e49ec92ed66..bd12a86d387 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -17,6 +17,12 @@ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; import javax.jms.ConnectionFactory; import javax.net.ssl.KeyManager; import javax.net.ssl.SSLContext; @@ -28,11 +34,6 @@ import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import io.netty.handler.codec.mqtt.MqttMessage; @@ -42,13 +43,18 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState; +import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.fusesource.mqtt.client.MQTT; @@ -383,6 +389,19 @@ private MQTT createMQTTSslConnection(String clientId, boolean clean) throws Exce return mqtt; } + public Map getSessions() { + Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT"); + if (acceptor instanceof AbstractAcceptor) { + ProtocolManager protocolManager = ((AbstractAcceptor) acceptor).getProtocolMap().get("MQTT"); + if (protocolManager instanceof MQTTProtocolManager) { + MQTTProtocolManager mqttProtocolManager = (MQTTProtocolManager) protocolManager; + return ((MQTTProtocolManager) protocolManager).getSessionStates(); + } + + } + return Collections.emptyMap(); + } + protected Tracer createTracer() { return new Tracer() { @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java index 8caba17d148..2f64892f756 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java @@ -43,10 +43,112 @@ public boolean isNetty() { } @Test - public void unsubscribeRemoteQueue() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useSameClientIdAndAnycastSubscribeRemoteQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndAnycastSubscribeRemoteQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); @@ -54,18 +156,28 @@ public void unsubscribeRemoteQueue() throws Exception { BlockingConnection connection2 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); - connection2.subscribe(topics); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages @@ -73,9 +185,9 @@ public void unsubscribeRemoteQueue() throws Exception { String payload2 = "This is message 2"; String payload3 = "This is message 3"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); @@ -89,12 +201,19 @@ public void unsubscribeRemoteQueue() throws Exception { assertEquals(payload2, new String(message2.getPayload())); assertEquals(payload3, new String(message3.getPayload())); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection2.unsubscribe(new String[]{TOPIC}); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -103,7 +222,6 @@ public void unsubscribeRemoteQueue() throws Exception { Message message31 = connection1.receive(5, TimeUnit.SECONDS); message31.ack(); - String message11String = new String(message11.getPayload()); String message21String = new String(message21.getPayload()); String message31String = new String(message31.getPayload()); @@ -111,14 +229,13 @@ public void unsubscribeRemoteQueue() throws Exception { assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) ); assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) ); - } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } @@ -127,10 +244,114 @@ public void unsubscribeRemoteQueue() throws Exception { } @Test - public void unsubscribeRemoteQueueWildCard() throws Exception { - final String TOPIC = "test/+/some/#"; + public void useSameClientIdAndMulticastSubscribeRemoteQueue() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/1/some/la"; + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndMulticastSubscribeRemoteQueue() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/1/some/la"; + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); @@ -138,18 +359,141 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { BlockingConnection connection2 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + connection2.subscribe(topics); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = connection1.receive(5, TimeUnit.SECONDS); + message11.ack(); + Message message12 = connection1.receive(5, TimeUnit.SECONDS); + message12.ack(); + Message message13 = connection1.receive(5, TimeUnit.SECONDS); + message13.ack(); + + assertEquals(payload1, new String(message11.getPayload())); + assertEquals(payload2, new String(message12.getPayload())); + assertEquals(payload3, new String(message13.getPayload())); + + Message message21 = connection2.receive(5, TimeUnit.SECONDS); + message21.ack(); + Message message22 = connection2.receive(5, TimeUnit.SECONDS); + message22.ack(); + Message message23 = connection2.receive(5, TimeUnit.SECONDS); + message23.ack(); + + assertEquals(payload1, new String(message21.getPayload())); + assertEquals(payload2, new String(message22.getPayload())); + assertEquals(payload3, new String(message23.getPayload())); + + connection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message31 = connection1.receive(5, TimeUnit.SECONDS); + message31.ack(); + Message message32 = connection1.receive(5, TimeUnit.SECONDS); + message32.ack(); + Message message33 = connection1.receive(5, TimeUnit.SECONDS); + message33.ack(); + + assertEquals(payload1, new String(message31.getPayload())); + assertEquals(payload2, new String(message32.getPayload())); + assertEquals(payload3, new String(message33.getPayload())); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + + } + + @Test + public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); // Publish Messages @@ -157,9 +501,105 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { String payload2 = "This is message 2"; String payload3 = "This is message 3"; - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); @@ -174,11 +614,17 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { assertEquals(payload3, new String(message3.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - connection1.publish("test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish("test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish("test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish("anycast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -187,22 +633,232 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { Message message31 = connection1.receive(5, TimeUnit.SECONDS); message31.ack(); + String message11String = new String(message11.getPayload()); String message21String = new String(message21.getPayload()); String message31String = new String(message31.getPayload()); + assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String) ); + assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) ); + assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) ); - assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String)); - assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String)); - assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String)); + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(topics); + connection1.disconnect(); + } + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(topics); + connection2.disconnect(); + } + } + } + + @Test + public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/+/some/#"; + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + // Subscribe to topics + subConnection1.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + pubConnection.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + + subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 0, 0, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, false); + + pubConnection.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdAndMulticastSubscribeRemoteQueueWildCard() throws Exception { + final String MULTICAST_TOPIC = "multicast/test/+/some/#"; + final String ANYCAST_TOPIC = "anycast/test/+/some/#"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection connection1 = null; + BlockingConnection connection2 = null; + try { + + Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)}; + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + // Subscribe to topics + connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection2.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, true); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + + connection1.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + + Message message11 = connection1.receive(5, TimeUnit.SECONDS); + message11.ack(); + Message message12 = connection1.receive(5, TimeUnit.SECONDS); + message12.ack(); + Message message13 = connection1.receive(5, TimeUnit.SECONDS); + message13.ack(); + + assertEquals(payload1, new String(message11.getPayload())); + assertEquals(payload2, new String(message12.getPayload())); + assertEquals(payload3, new String(message13.getPayload())); + + Message message21 = connection2.receive(5, TimeUnit.SECONDS); + message21.ack(); + Message message22 = connection2.receive(5, TimeUnit.SECONDS); + message22.ack(); + Message message23 = connection2.receive(5, TimeUnit.SECONDS); + message23.ack(); + + assertEquals(payload1, new String(message21.getPayload())); + assertEquals(payload2, new String(message22.getPayload())); + assertEquals(payload3, new String(message23.getPayload())); + + + connection2.unsubscribe(new String[]{MULTICAST_TOPIC}); + + waitForBindings(0, MULTICAST_TOPIC, 1, 1, true); + waitForBindings(1, MULTICAST_TOPIC, 0, 0, true); + + waitForBindings(0, MULTICAST_TOPIC, 0, 0, false); + waitForBindings(1, MULTICAST_TOPIC, 1, 1, false); + + connection1.publish("multicast/test/1/some/la", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish("multicast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message31 = connection1.receive(5, TimeUnit.SECONDS); + message31.ack(); + Message message32 = connection1.receive(5, TimeUnit.SECONDS); + message32.ack(); + Message message33 = connection1.receive(5, TimeUnit.SECONDS); + message33.ack(); + + assertEquals(payload1, new String(message31.getPayload())); + assertEquals(payload2, new String(message32.getPayload())); + assertEquals(payload3, new String(message33.getPayload())); + + + } finally { + String[] topics = new String[]{MULTICAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } @@ -211,11 +867,13 @@ public void unsubscribeRemoteQueueWildCard() throws Exception { } @Test - public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useDiffClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; final String TOPIC2 = "sample"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); startServers(0, 1); @@ -223,19 +881,28 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { BlockingConnection connection2 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); // Subscribe to topics - connection1.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE)}); - connection2.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + connection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); - waitForBindings(0, TOPIC, 1, 1, false); - waitForBindings(1, TOPIC, 1, 1, false); + connection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages String payload1 = "This is message 1"; @@ -243,9 +910,9 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { String payload3 = "This is message 3"; String payload4 = "This is message 4"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); @@ -263,11 +930,19 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { assertEquals(payload3, new String(message3.getPayload())); assertEquals(payload4, new String(message4.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); @@ -289,12 +964,12 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { } finally { - if (connection1 != null) { - connection1.unsubscribe(new String[]{TOPIC}); + if (connection1 != null && connection1.isConnected()) { + connection1.unsubscribe(new String[]{ANYCAST_TOPIC}); connection1.disconnect(); } - if (connection2 != null) { - connection2.unsubscribe(new String[]{TOPIC, TOPIC2}); + if (connection2 != null && connection2.isConnected()) { + connection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2}); connection2.disconnect(); } } @@ -302,10 +977,248 @@ public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception { } @Test - public void unsubscribeExistingQueue() throws Exception { - final String TOPIC = "test/1/some/la"; + public void useSameClientIdSubscribeRemoteQueueMultipleSubscriptions() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String TOPIC2 = "sample"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; - setupServers(TOPIC); + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection pubConnection = null; + try { + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + // Subscribe to topics + subConnection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + subConnection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + //the first sub connection will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + String payload4 = "This is message 4"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection2.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection2.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection2.receive(5, TimeUnit.SECONDS); + message3.ack(); + Message message4 = subConnection2.receive(5, TimeUnit.SECONDS); + message4.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload4, new String(message4.getPayload())); + + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection2.receive(5, TimeUnit.SECONDS); + message11.ack(); + assertEquals(payload4, new String(message11.getPayload())); + + Message message21 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message31); + Message message41 = subConnection2.receive(5, TimeUnit.SECONDS); + assertNull(message41); + + } finally { + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(new String[]{ANYCAST_TOPIC}); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2}); + subConnection2.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.disconnect(); + } + } + + } + + @Test + public void useSameClientIdSubscribeExistingQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String subClientId = "subClientId"; + final String pubClientId = "pubClientId"; + + setupServers(ANYCAST_TOPIC); + + startServers(0, 1); + + BlockingConnection subConnection1 = null; + BlockingConnection subConnection2 = null; + BlockingConnection subConnection3 = null; + BlockingConnection pubConnection = null; + try { + pubConnection = retrieveMQTTConnection("tcp://localhost:61616", pubClientId); + + subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", subClientId); + subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + subConnection3 = retrieveMQTTConnection("tcp://localhost:61617", subClientId); + + //the second sub connection will be closed, when the third sub connection is connected + assertFalse(subConnection2.isConnected()); + + // Subscribe to topics + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; + + subConnection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + + + subConnection3.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + //the first sub connection also will be closed + assertFalse(subConnection1.isConnected()); + + // Publish Messages + String payload1 = "This is message 1"; + String payload2 = "This is message 2"; + String payload3 = "This is message 3"; + String payload4 = "This is message 4"; + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message1 = subConnection3.receive(5, TimeUnit.SECONDS); + message1.ack(); + Message message2 = subConnection3.receive(5, TimeUnit.SECONDS); + message2.ack(); + Message message3 = subConnection3.receive(5, TimeUnit.SECONDS); + message3.ack(); + Message message4 = subConnection3.receive(5, TimeUnit.SECONDS); + message4.ack(); + + assertEquals(payload1, new String(message1.getPayload())); + assertEquals(payload2, new String(message2.getPayload())); + assertEquals(payload3, new String(message3.getPayload())); + assertEquals(payload4, new String(message4.getPayload())); + + + subConnection3.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, false); + + pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + + Message message11 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message11); + Message message21 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message21); + Message message31 = subConnection3.receive(5, TimeUnit.SECONDS); + assertNull(message31); + + + } finally { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (subConnection1 != null && subConnection1.isConnected()) { + subConnection1.unsubscribe(topics); + subConnection1.disconnect(); + } + if (subConnection2 != null && subConnection2.isConnected()) { + subConnection2.unsubscribe(topics); + subConnection2.disconnect(); + } + if (subConnection3 != null && subConnection3.isConnected()) { + subConnection3.unsubscribe(topics); + subConnection3.disconnect(); + } + if (pubConnection != null && pubConnection.isConnected()) { + pubConnection.unsubscribe(topics); + pubConnection.disconnect(); + } + } + + } + + @Test + public void useDiffClientIdSubscribeExistingQueue() throws Exception { + final String ANYCAST_TOPIC = "anycast/test/1/some/la"; + final String clientId1 = "clientId1"; + final String clientId2 = "clientId2"; + final String clientId3 = "clientId3"; + + setupServers(ANYCAST_TOPIC); startServers(0, 1); BlockingConnection connection1 = null; @@ -313,22 +1226,43 @@ public void unsubscribeExistingQueue() throws Exception { BlockingConnection connection3 = null; try { - connection1 = retrieveMQTTConnection("tcp://localhost:61616"); - connection2 = retrieveMQTTConnection("tcp://localhost:61617"); - connection3 = retrieveMQTTConnection("tcp://localhost:61617"); + connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1); + connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2); + connection3 = retrieveMQTTConnection("tcp://localhost:61617", clientId3); // Subscribe to topics - Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)}; + Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)}; connection1.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 0, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 0, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); + connection2.subscribe(topics); - connection3.subscribe(topics); + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); - waitForBindings(0, TOPIC, 1, 1, true); - waitForBindings(1, TOPIC, 1, 2, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); - waitForBindings(0, TOPIC, 1, 2, false); - waitForBindings(1, TOPIC, 1, 1, false); + connection3.subscribe(topics); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 2, true); + waitForBindings(0, ANYCAST_TOPIC, 1, 2, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); // Publish Messages String payload1 = "This is message 1"; @@ -336,10 +1270,10 @@ public void unsubscribeExistingQueue() throws Exception { String payload3 = "This is message 3"; String payload4 = "This is message 4"; - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, false); Message message1 = connection1.receive(5, TimeUnit.SECONDS); message1.ack(); @@ -356,11 +1290,20 @@ public void unsubscribeExistingQueue() throws Exception { assertEquals(payload4, new String(message4.getPayload())); - connection2.unsubscribe(new String[]{TOPIC}); + connection2.unsubscribe(new String[]{ANYCAST_TOPIC}); + + //Waiting for the notification processing between cluster nodes to be completed + Thread.sleep(2000); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, true); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, true); + + waitForBindings(0, ANYCAST_TOPIC, 1, 1, false); + waitForBindings(1, ANYCAST_TOPIC, 1, 1, false); - connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); - connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); - connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false); + connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, false); Message message11 = connection1.receive(5, TimeUnit.SECONDS); message11.ack(); @@ -377,16 +1320,16 @@ public void unsubscribeExistingQueue() throws Exception { assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String)); } finally { - String[] topics = new String[]{TOPIC}; - if (connection1 != null) { + String[] topics = new String[]{ANYCAST_TOPIC}; + if (connection1 != null && connection1.isConnected()) { connection1.unsubscribe(topics); connection1.disconnect(); } - if (connection2 != null) { + if (connection2 != null && connection2.isConnected()) { connection2.unsubscribe(topics); connection2.disconnect(); } - if (connection3 != null) { + if (connection3 != null && connection3.isConnected()) { connection3.unsubscribe(topics); connection3.disconnect(); } @@ -395,9 +1338,12 @@ public void unsubscribeExistingQueue() throws Exception { } - private static BlockingConnection retrieveMQTTConnection(String host) throws Exception { + private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception { MQTT mqtt = new MQTT(); mqtt.setHost(host); + mqtt.setClientId(clientId); + mqtt.setConnectAttemptsMax(0); + mqtt.setReconnectAttemptsMax(0); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); return connection; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 15ac691f6c8..baff378a2d8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.concurrent.Executor; +import java.util.function.Predicate; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; @@ -429,6 +430,12 @@ public Set getConsumers() { return null; } + @Override + public Set getConsumers(Predicate predicate) { + // no-op + return null; + } + @Override public Map getGroups() { return null;