Skip to content

Commit

Permalink
ARTEMIS-2848 RA fails w/durable sub w/legacy prefix
Browse files Browse the repository at this point in the history
(cherry picked from commit 52d3ed1)
  • Loading branch information
jbertram authored and brusdev committed Sep 11, 2020
1 parent faffbcc commit 5316465
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 2 deletions.
Expand Up @@ -1803,7 +1803,7 @@ public ActiveMQConnectionFactory newConnectionFactory(ConnectionFactoryPropertie
cf.setUseTopologyForLoadBalancing(raProperties.isUseTopologyForLoadBalancing());

cf.setEnableSharedClientID(true);
cf.setEnable1xPrefixes(raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
cf.setEnable1xPrefixes(overrideProperties.isEnable1xPrefixes() != null ? overrideProperties.isEnable1xPrefixes() : raProperties.isEnable1xPrefixes() == null ? false : raProperties.isEnable1xPrefixes());
setParams(cf, overrideProperties);
return cf;
}
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
import org.apache.activemq.artemis.jms.client.ConnectionFactoryOptions;
Expand Down Expand Up @@ -139,7 +140,7 @@ public void setup() throws Exception {

boolean selectorChanged = selector == null && oldFilterString != null || oldFilterString == null && selector != null || (oldFilterString != null && selector != null && !oldFilterString.toString().equals(selector));

SimpleString oldTopicName = subResponse.getAddress();
SimpleString oldTopicName = (enable1XPrefix ? PacketImpl.OLD_TOPIC_PREFIX : SimpleString.toSimpleString("")).concat(subResponse.getAddress());

boolean topicChanged = !oldTopicName.equals(activation.getAddress());

Expand Down
Expand Up @@ -86,6 +86,59 @@ public void testSimpleMessageReceivedOnQueue() throws Exception {
qResourceAdapter.stop();
}

@Test
public void testDurableTopicSubscriptionWith1xPrefixesOnSpec() throws Exception {
internalTestDurableTopicSubscriptionWith1xPrefixes(false);
}

@Test
public void testDurableTopicSubscriptionWith1xPrefixesOnRA() throws Exception {
internalTestDurableTopicSubscriptionWith1xPrefixes(true);
}

public void internalTestDurableTopicSubscriptionWith1xPrefixes(boolean ra) throws Exception {
server.getRemotingService().createAcceptor("test", "tcp://localhost:61617?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.").start();
ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
if (ra) {
qResourceAdapter.setEnable1xPrefixes(true);
}
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setSetupAttempts(1);
spec.setSetupInterval(500L);
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Topic");
spec.setDestination("jms.topic.MyTopic");
if (!ra) {
spec.setEnable1xPrefixes(true);
}
spec.setSubscriptionDurability("Durable");
spec.setClientId("myClientId");
spec.setSubscriptionName("mySubscriptionName");
qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
qResourceAdapter.setConnectionParameters("host=localhost;port=61617");
CountDownLatch latch = new CountDownLatch(1);
DummyMessageEndpoint endpoint = new DummyMessageEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
ClientProducer clientProducer = session.createProducer("MyTopic");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring");
clientProducer.send(message);
session.close();
latch.await(5, TimeUnit.SECONDS);

assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");

qResourceAdapter.endpointDeactivation(endpointFactory, spec);

qResourceAdapter.stop();
}

@Test
public void testObjectMessageReceiveSerializationControl() throws Exception {
String blackList = "org.apache.activemq.artemis.tests.integration.ra";
Expand Down

0 comments on commit 5316465

Please sign in to comment.