Skip to content
Browse files

AMQP-274 Local Transaction Issues

When running a listener container with local transactions
(channelTransacted, and no external transaction manager), the
consumer's channel is bound to the thread for use by downstream
RabbitTemplates.

However, the syncronizedWithTransaction boolean was not set so
the RabbitTemplate closed the channel after its operation.

We should never close the consumer's channel.

The solution is to set the boolean when binding the resource.

In addition, when using a RabbitTransactionManager, the
RabbitResourceHolder.closeAll() method would close the consumer's
channel.

Previously, the consumer's channel was registered with a
ThreadLocal in the ConnectionFactoryUtils. This enabled
the doGetTransactionalResourceHolder method to bind the
consumer's channel.

The RabbitResourceHolder.closeAll() now examines the channels
is it closing and skips the close for the consumer's channel.

Added tests to the Local and External transaction test cases
to ensure the appropriate channel.close() calls are executed,
depending on the scenario. e.g. a local transaction with
exposeListenerChannel=false should close() the exposed channel
but not the consumer's channel.
  • Loading branch information...
1 parent dd54e66 commit 9c29a74addcf8ebefb2a2fcbe26b62014181bbd9 @garyrussell garyrussell committed Oct 25, 2012
View
8 ...bbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java
@@ -71,6 +71,14 @@ public static void unRegisterConsumerChannel() {
}
/**
+ * See registerConsumerChannel. This method is called to retrieve the
+ * channel for this consumer.
+ */
+ public static Channel getConsumerChannel() {
+ return consumerChannel.get();
+ }
+
+ /**
* Determine whether the given RabbitMQ Channel is transactional, that is, bound to the current thread by Spring's
* transaction facilities.
* @param channel the RabbitMQ Channel to check
View
9 ...rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitResourceHolder.java
@@ -154,7 +154,14 @@ public void commitAll() throws AmqpException {
public void closeAll() {
for (Channel channel : this.channels) {
try {
- channel.close();
+ if (channel != ConnectionFactoryUtils.getConsumerChannel()) {
+ channel.close();
+ }
+ else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Skipping close of consumer channel: " + channel.toString());
+ }
+ }
} catch (Throwable ex) {
logger.debug("Could not close synchronized Rabbit Channel after transaction", ex);
}
View
13 .../main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java
@@ -473,8 +473,10 @@ protected void invokeListener(Channel channel, Message message) throws Exception
} else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted(channel);
if (bindChannel) {
+ RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
+ resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
- new RabbitResourceHolder(channel, false));
+ resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
@@ -523,6 +525,7 @@ protected void doInvokeListener(ChannelAwareMessageListener listener, Channel ch
*/
if (isChannelLocallyTransacted(channelToUse) &&
!TransactionSynchronizationManager.isActualTransactionActive()) {
+ resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
boundHere = true;
@@ -531,8 +534,10 @@ protected void doInvokeListener(ChannelAwareMessageListener listener, Channel ch
else {
// if locally transacted, bind the current channel to make it available to RabbitTemplate
if (isChannelLocallyTransacted(channel)) {
+ RabbitResourceHolder localResourceHolder = new RabbitResourceHolder(channelToUse, false);
+ localResourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
- new RabbitResourceHolder(channelToUse, false));
+ localResourceHolder);
boundHere = true;
}
}
@@ -543,6 +548,10 @@ protected void doInvokeListener(ChannelAwareMessageListener listener, Channel ch
throw wrapToListenerExecutionFailedExceptionIfNeeded(e);
}
} finally {
+ if (resourceHolder != null && boundHere) {
+ // so the channel exposed (because exposeListenerChannel is false) will be closed
+ resourceHolder.setSynchronizedWithTransaction(false);
+ }
ConnectionFactoryUtils.releaseResources(resourceHolder);
if (boundHere) {
// unbind if we bound
View
35 ...rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ExternalTxManagerTests.java
@@ -15,13 +15,15 @@
*/
package org.springframework.amqp.rabbit.listener;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -38,6 +40,7 @@
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
+import org.springframework.beans.DirectFieldAccessor;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
@@ -144,6 +147,12 @@ public void onMessage(Message message) {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
+
+ // verify close() was never called on the channel
+ DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory);
+ List<?> channels = (List<?>) dfa.getPropertyValue("cachedChannelsTransactional");
+ assertEquals(0, channels.size());
+
container.stop();
}
@@ -159,7 +168,7 @@ public void testChannelAwareMessageListener() throws Exception {
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
- final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
+ final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -205,11 +214,11 @@ public String answer(InvocationOnMock invocation) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
- RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -238,6 +247,10 @@ public void onMessage(Message message, Channel channel) {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
+
+ // verify close() was never called on the channel
+ verify(onlyChannel, Mockito.never()).close();
+
container.stop();
assertSame(onlyChannel, exposed.get());
@@ -255,7 +268,7 @@ public void testChannelAwareMessageListenerDontExpose() throws Exception {
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
- final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
+ final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -301,11 +314,11 @@ public String answer(InvocationOnMock invocation) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
- RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -335,6 +348,10 @@ public void onMessage(Message message, Channel channel) {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
+
+ // verify close() was never called on the channel
+ verify(onlyChannel, Mockito.never()).close();
+
container.stop();
assertSame(onlyChannel, exposed.get());
@@ -428,6 +445,12 @@ public void onMessage(Message message) {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
+
+ // verify close() was never called on the channel
+ DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory);
+ List<?> channels = (List<?>) dfa.getPropertyValue("cachedChannelsTransactional");
+ assertEquals(0, channels.size());
+
container.stop();
}
View
28 ...rabbit/src/test/java/org/springframework/amqp/rabbit/listener/LocallyTransactedTests.java
@@ -15,13 +15,15 @@
*/
package org.springframework.amqp.rabbit.listener;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -37,6 +39,7 @@
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.DirectFieldAccessor;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
@@ -138,6 +141,12 @@ public void onMessage(Message message) {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
+
+ // verify close() was never called on the channel
+ DirectFieldAccessor dfa = new DirectFieldAccessor(cachingConnectionFactory);
+ List<?> channels = (List<?>) dfa.getPropertyValue("cachedChannelsTransactional");
+ assertEquals(0, channels.size());
+
container.stop();
}
@@ -153,7 +162,7 @@ public void testChannelAwareMessageListener() throws Exception {
final Channel onlyChannel = mock(Channel.class);
when(onlyChannel.isOpen()).thenReturn(true);
- final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
+ final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -199,11 +208,11 @@ public String answer(InvocationOnMock invocation) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
- RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -231,6 +240,10 @@ public void onMessage(Message message, Channel channel) {
verify(onlyChannel).txCommit();
verify(onlyChannel).basicPublish(Mockito.anyString(), Mockito.anyString(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(BasicProperties.class), Mockito.any(byte[].class));
+
+ // verify close() was never called on the channel
+ verify(onlyChannel, Mockito.never()).close();
+
container.stop();
assertSame(onlyChannel, exposed.get());
@@ -250,7 +263,7 @@ public void testChannelAwareMessageListenerDontExpose() throws Exception {
final Channel secondChannel = mock(Channel.class);
when(secondChannel.isOpen()).thenReturn(true);
- final SingleConnectionFactory cachingConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
+ final SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mockConnectionFactory);
when(mockConnectionFactory.newConnection((ExecutorService) null)).thenReturn(mockConnection);
when(mockConnection.isOpen()).thenReturn(true);
@@ -293,11 +306,11 @@ public String answer(InvocationOnMock invocation) throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Channel> exposed = new AtomicReference<Channel>();
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(singleConnectionFactory);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, Channel channel) {
exposed.set(channel);
- RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
+ RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
// should use same channel as container
rabbitTemplate.convertAndSend("foo", "bar", "baz");
@@ -331,5 +344,8 @@ public void onMessage(Message message, Channel channel) {
container.stop();
assertSame(secondChannel, exposed.get());
+
+ verify(firstChannel, Mockito.never()).close();
+ verify(secondChannel, Mockito.times(1)).close();
}
}

0 comments on commit 9c29a74

Please sign in to comment.
Something went wrong with that request. Please try again.