Skip to content

Commit

Permalink
ARTEMIS-2854 Non-durable subscribers stop receiving after failover
Browse files Browse the repository at this point in the history
In a cluster scenario where non durable subscribers fail over to
backup while another live node forwarding messages to it,
there is a chance that the the live node keeps the old remote
binding for the subs and messages go to those
old remote bindings will result in "binding not found".

(cherry picked from commit fe5b81f)

downstream: ENTMQBR-3916
  • Loading branch information
howardgao authored and brusdev committed Sep 23, 2020
1 parent 9a04593 commit 7a5da3f
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 9 deletions.
Expand Up @@ -1281,17 +1281,25 @@ private synchronized void doBindingAdded(final ClientMessage message) throws Exc
RemoteQueueBinding existingBinding = (RemoteQueueBinding) postOffice.getBinding(clusterName);

if (existingBinding != null) {
if (!existingBinding.isConnected()) {
existingBinding.connect();
if (queueID.equals(existingBinding.getRemoteQueueID())) {
if (!existingBinding.isConnected()) {
existingBinding.connect();
return;
}
// Sanity check - this means the binding has already been added via another bridge, probably max
// hops is too high
// or there are multiple cluster connections for the same address

ActiveMQServerLogger.LOGGER.remoteQueueAlreadyBoundOnClusterConnection(this, clusterName);
return;
}
// Sanity check - this means the binding has already been added via another bridge, probably max
// hops is too high
// or there are multiple cluster connections for the same address

ActiveMQServerLogger.LOGGER.remoteQueueAlreadyBoundOnClusterConnection(this, clusterName);

return;
//this could happen during jms non-durable failover while the qname doesn't change but qid
//will be re-generated in backup. In that case a new remote binding will be created
//and put it to the map and old binding removed.
if (logger.isTraceEnabled()) {
logger.trace("Removing binding because qid changed " + queueID + " old: " + existingBinding.getRemoteQueueID());
}
removeBinding(clusterName);
}

RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, queueID, filterString, queue, bridge.getName(), distance + 1, messageLoadBalancingType);
Expand Down
Expand Up @@ -17,8 +17,10 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;

import java.util.ArrayList;
import java.util.UUID;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
Expand Down Expand Up @@ -156,6 +158,104 @@ public void testReconnectBridge() throws Exception {

}

@Test
public void testClusterBridgeAddRemoteBinding() throws Exception {

final String ADDRESS = "queues.testaddress";
final String QUEUE = UUID.randomUUID().toString();

setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());

setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);

setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);

startServers(0, 1);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());

createQueue(0, ADDRESS, QUEUE, null, false);

addConsumer(0, 0, QUEUE, null);

waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 0, 0, true);

waitForBindings(0, ADDRESS, 0, 0, false);
waitForBindings(1, ADDRESS, 1, 1, false);

ClientSession session0 = sfs[0].createSession();
ClientSession session1 = sfs[1].createSession();

session0.start();
session1.start();

ClientProducer producer1 = session1.createProducer(ADDRESS);

int NUMBER_OF_MESSAGES = 10;

//send to node1 and receive from node0
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session1.createMessage(true);
producer1.send(msg);
session1.commit();
}

int cons0Count = 0;

while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(1000);
if (msg == null) {
break;
}
cons0Count++;
msg.acknowledge();
session0.commit();
}
assertEquals(NUMBER_OF_MESSAGES, cons0Count);

//The following code similuates issue where a jms non-subscriber
//fails over to backup. In the process the temp queue is recreated
//on the backup with a new id while it's remote binding
//is created on the other node.
removeConsumer(0);
servers[0].getManagementService().enableNotifications(false);
servers[0].destroyQueue(new SimpleString(QUEUE));
servers[0].getManagementService().enableNotifications(true);

createQueue(0, ADDRESS, QUEUE, null, false);

addConsumer(0, 0, QUEUE, null);

waitForBindings(0, ADDRESS, 1, 1, true);
waitForBindings(1, ADDRESS, 0, 0, true);

waitForBindings(0, ADDRESS, 0, 0, false);
waitForBindings(1, ADDRESS, 1, 1, false);

for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
ClientMessage msg = session1.createMessage(true);
producer1.send(msg);
session1.commit();
}

cons0Count = 0;

while (true) {
ClientMessage msg = consumers[0].getConsumer().receive(2000);
if (msg == null) {
break;
}
cons0Count++;
msg.acknowledge();
session0.commit();
}
assertEquals(NUMBER_OF_MESSAGES, cons0Count);

stopServers(0, 1);
}


@Override
Expand Down

0 comments on commit 7a5da3f

Please sign in to comment.