diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java index 832f3a6fa9..8931c5754f 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java @@ -6,11 +6,11 @@ import java.util.concurrent.TimeUnit; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.utility.Utility; /** * Variation on QueueingConsumer in RabbitMQ, uses 'put' instead of 'add' and stored a reference to the consumerTag that @@ -20,8 +20,6 @@ */ public class BlockingQueueConsumer extends DefaultConsumer { - private String consumerTag; - private final BlockingQueue queue; // When this is non-null the queue is in shutdown mode and nextDelivery should @@ -43,18 +41,7 @@ public BlockingQueueConsumer(Channel ch, BlockingQueue q) super(ch); this.queue = q; } - - - - public String getConsumerTag() { - return consumerTag; - } - - public void setConsumerTag(String consumerTag) - { - this.consumerTag = consumerTag; - } - + @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { shutdown = sig; try { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java index 73a05e0c26..a503440a01 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java @@ -260,8 +260,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer(final Channel channe String[] queue = StringUtils.commaDelimitedListToStringArray(queueNames); for (int i = 0; i < queue.length; i++) { channel.queueDeclarePassive(queue[i]); - String consumerTag = channel.basicConsume(queue[i], !isChannelTransacted(), consumer); - consumer.setConsumerTag(consumerTag); + channel.basicConsume(queue[i], !isChannelTransacted(), consumer); } return consumer; } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java index db0ba65ebc..dc3942fa01 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitBindingIntegrationTests.java @@ -3,10 +3,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.springframework.amqp.AmqpIOException; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; @@ -23,27 +21,12 @@ public class RabbitBindingIntegrationTests { - private Queue queue; + private Queue queue = new Queue("test.queue"); - private RabbitTemplate template = new RabbitTemplate( - new CachingConnectionFactory()); + private RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory()); @Rule - public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning(); - - @Before - public void declareQueue() { - RabbitAdmin admin = new RabbitAdmin(template); - try { - admin.deleteQueue("test.queue"); - } catch (AmqpIOException e) { - // Ignore (queue didn't exist) - } - queue = new Queue("test.queue"); - // Idempotent, so no problem to do this for every test - admin.declareQueue(queue); - admin.purgeQueue("test.queue", false); - } + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); @Test public void testSendAndReceiveWithTopicSingleCallback() throws Exception { @@ -53,25 +36,29 @@ public void testSendAndReceiveWithTopicSingleCallback() throws Exception { admin.declareExchange(exchange); template.setExchange(exchange.getName()); - admin.declareBinding(BindingBuilder.from(queue).to(exchange) - .with("*.end")); + admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end")); template.execute(new ChannelCallback() { public Void doInRabbit(Channel channel) throws Exception { - BlockingQueueConsumer consumer = new BlockingQueueConsumer( - channel); - String tag = channel.basicConsume(queue.getName(), true, - consumer); + BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); + String tag = channel.basicConsume(queue.getName(), true, consumer); assertNotNull(tag); template.convertAndSend("foo", "message"); - String result = getResult(consumer); - assertEquals(null, result); - template.convertAndSend("foo.end", "message"); - result = getResult(consumer); - assertEquals("message", result); + try { + + String result = getResult(consumer); + assertEquals(null, result); + + template.convertAndSend("foo.end", "message"); + result = getResult(consumer); + assertEquals("message", result); + + } finally { + channel.basicCancel(tag); + } return null; @@ -82,36 +69,29 @@ public Void doInRabbit(Channel channel) throws Exception { @Test // @Ignore("Not sure yet if we need to support a use case like this") - public void testSendAndReceiveWithTopicConsumeInBackground() - throws Exception { + public void testSendAndReceiveWithTopicConsumeInBackground() throws Exception { RabbitAdmin admin = new RabbitAdmin(template); TopicExchange exchange = new TopicExchange("topic"); admin.declareExchange(exchange); template.setExchange(exchange.getName()); - admin.declareBinding(BindingBuilder.from(queue).to(exchange) - .with("*.end")); + admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end")); - final RabbitTemplate template = new RabbitTemplate( - new CachingConnectionFactory()); + final RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory()); template.setExchange(exchange.getName()); - BlockingQueueConsumer consumer = template - .execute(new ChannelCallback() { - public BlockingQueueConsumer doInRabbit(Channel channel) - throws Exception { + BlockingQueueConsumer consumer = template.execute(new ChannelCallback() { + public BlockingQueueConsumer doInRabbit(Channel channel) throws Exception { - BlockingQueueConsumer consumer = new BlockingQueueConsumer( - channel); - String tag = channel.basicConsume(queue.getName(), - true, consumer); - assertNotNull(tag); + BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); + String tag = channel.basicConsume(queue.getName(), true, consumer); + assertNotNull(tag); - return consumer; + return consumer; - } - }); + } + }); template.convertAndSend("foo", "message"); String result = getResult(consumer); @@ -121,6 +101,8 @@ public BlockingQueueConsumer doInRabbit(Channel channel) result = getResult(consumer); assertEquals("message", result); + consumer.getChannel().basicCancel(consumer.getConsumerTag()); + } @Test @@ -131,16 +113,13 @@ public void testSendAndReceiveWithTopicTwoCallbacks() throws Exception { admin.declareExchange(exchange); template.setExchange(exchange.getName()); - admin.declareBinding(BindingBuilder.from(queue).to(exchange) - .with("*.end")); + admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end")); template.execute(new ChannelCallback() { public Void doInRabbit(Channel channel) throws Exception { - BlockingQueueConsumer consumer = new BlockingQueueConsumer( - channel); - String tag = channel.basicConsume(queue.getName(), true, - consumer); + BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); + String tag = channel.basicConsume(queue.getName(), true, consumer); assertNotNull(tag); try { @@ -159,10 +138,8 @@ public Void doInRabbit(Channel channel) throws Exception { template.execute(new ChannelCallback() { public Void doInRabbit(Channel channel) throws Exception { - BlockingQueueConsumer consumer = new BlockingQueueConsumer( - channel); - String tag = channel.basicConsume(queue.getName(), true, - consumer); + BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); + String tag = channel.basicConsume(queue.getName(), true, consumer); assertNotNull(tag); try { @@ -180,15 +157,13 @@ public Void doInRabbit(Channel channel) throws Exception { } - private String getResult(final BlockingQueueConsumer consumer) - throws InterruptedException { + private String getResult(final BlockingQueueConsumer consumer) throws InterruptedException { Delivery response = consumer.nextDelivery(200L); if (response == null) { return null; } - MessageProperties messageProps = RabbitUtils.createMessageProperties( - response.getProperties(), response.getEnvelope(), "UTF-8"); - return (String) new SimpleMessageConverter().fromMessage(new Message( - response.getBody(), messageProps)); + MessageProperties messageProps = RabbitUtils.createMessageProperties(response.getProperties(), + response.getEnvelope(), "UTF-8"); + return (String) new SimpleMessageConverter().fromMessage(new Message(response.getBody(), messageProps)); } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java index 891cc8d900..fb02d90c5c 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java @@ -11,14 +11,11 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.springframework.amqp.AmqpIOException; import org.springframework.amqp.UncategorizedAmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; import org.springframework.amqp.rabbit.support.RabbitUtils; @@ -42,20 +39,7 @@ public class RabbitTemplateIntegrationTests { private RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory()); @Rule - public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning(); - - @Before - public void declareQueue() { - RabbitAdmin admin = new RabbitAdmin(template); - try { - admin.deleteQueue(ROUTE); - } - catch (AmqpIOException e) { - // Ignore (queue didn't exist) - } - admin.declareQueue(new Queue(ROUTE)); - admin.purgeQueue(ROUTE, false); - } + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE); @Test public void testSendAndReceive() throws Exception { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java index 58e410d95f..5b20d19697 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePerformanceIntegrationTests.java @@ -7,8 +7,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.springframework.amqp.AmqpIOException; -import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.test.BrokerRunning; import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster; @@ -30,7 +28,7 @@ public class RabbitTemplatePerformanceIntegrationTests { @Rule // After the repeat processor, so it only runs once - public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning(); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE); private CachingConnectionFactory connectionFactory; @@ -44,17 +42,6 @@ public void declareQueue() { connectionFactory.setChannelCacheSize(repeat.getConcurrency()); // connectionFactory.setPort(5673); template.setConnectionFactory(connectionFactory); - // TODO: investigate the effects of these flags... - // template.setMandatoryPublish(true); - // template.setImmediatePublish(true); - RabbitAdmin admin = new RabbitAdmin(template); - try { - admin.deleteQueue(ROUTE); - } catch (AmqpIOException e) { - // Ignore (queue didn't exist) - } - admin.declareQueue(new Queue(ROUTE)); - admin.purgeQueue(ROUTE, false); } @After diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java index 438882c0b9..8726a37289 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java @@ -18,10 +18,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import org.springframework.amqp.AmqpIOException; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.test.BrokerRunning; @@ -31,7 +29,7 @@ public class MessageListenerContainerLifecycleIntegrationTests { private static Log logger = LogFactory.getLog(MessageListenerContainerLifecycleIntegrationTests.class); - private Queue queue; + private Queue queue = new Queue("test.queue"); private RabbitTemplate template = new RabbitTemplate(); @@ -40,7 +38,7 @@ public class MessageListenerContainerLifecycleIntegrationTests { private final boolean transactional; @Rule - public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning(); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); private final int messageCount; @@ -62,17 +60,6 @@ public void declareQueue() { connectionFactory.setChannelCacheSize(concurrentConsumers); connectionFactory.setPort(5673); template.setConnectionFactory(connectionFactory); - RabbitAdmin admin = new RabbitAdmin(template); - try { - admin.deleteQueue("test.queue"); - } - catch (AmqpIOException e) { - // Ignore (queue didn't exist) - } - queue = new Queue("test.queue"); - // Idempotent, so no problem to do this for every test - admin.declareQueue(queue); - admin.purgeQueue("test.queue", false); } @Test diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java index c63d5e713e..9fc3b24ae5 100755 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java @@ -20,10 +20,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import org.springframework.amqp.AmqpIOException; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.test.BrokerRunning; @@ -45,7 +43,7 @@ public boolean isTransactional() { } } - private Queue queue; + private Queue queue = new Queue("test.queue"); private RabbitTemplate template = new RabbitTemplate(); @@ -58,7 +56,7 @@ public boolean isTransactional() { SimpleMessageListenerContainer.class); @Rule - public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning(); + public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue); private final int messageCount; @@ -98,16 +96,6 @@ public void declareQueue() { connectionFactory.setChannelCacheSize(concurrentConsumers); // connectionFactory.setPort(5673); template.setConnectionFactory(connectionFactory); - RabbitAdmin admin = new RabbitAdmin(template); - try { - admin.deleteQueue("test.queue"); - } catch (AmqpIOException e) { - // Ignore (queue didn't exist) - } - queue = new Queue("test.queue"); - // Idempotent, so no problem to do this for every test - admin.declareQueue(queue); - admin.purgeQueue("test.queue", false); } @After diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java index 53c8e9517e..a0751b24a8 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/test/BrokerRunning.java @@ -24,12 +24,12 @@ * * @Test * public void testSendAndReceive() throws Exception { - * // ... test using RabbitTemplate etc. + * // ... test using RabbitTemplate etc. * } * *

- * It is recommended to declare the rule as static so that it only has to check once for all tests in the enclosing test - * case. + * The rule can be declared as static so that it only has to check once for all tests in the enclosing test case, but + * there isn't a lot of overhead in making it non-static. *

* * @see Assume @@ -40,6 +40,8 @@ */ public class BrokerRunning extends TestWatchman { + private static final String DEFAULT_QUEUE_NAME = BrokerRunning.class.getName(); + private static Log logger = LogFactory.getLog(BrokerRunning.class); private boolean brokerOnline = true; @@ -48,6 +50,28 @@ public class BrokerRunning extends TestWatchman { private final boolean assumeOnline; + private final boolean purge; + + private Queue queue; + + /** + * Ensure the broker is running and has an empty queue with the specified name in the default exchange. + * + * @return a new rule that assumes an existing running broker + */ + public static BrokerRunning isRunningWithEmptyQueue(String queue) { + return new BrokerRunning(true, new Queue(queue), true); + } + + /** + * Ensure the broker is running and has an empty queue in the default exchange. + * + * @return a new rule that assumes an existing running broker + */ + public static BrokerRunning isRunningWithEmptyQueue(Queue queue) { + return new BrokerRunning(true, queue, true); + } + /** * @return a new rule that assumes an existing running broker */ @@ -62,13 +86,24 @@ public static BrokerRunning isNotRunning() { return new BrokerRunning(false); } - private BrokerRunning(boolean assumeOnline) { + private BrokerRunning(boolean assumeOnline, Queue queue, boolean purge) { this.assumeOnline = assumeOnline; + this.queue = queue; + this.purge = purge; + } + + private BrokerRunning(boolean assumeOnline, Queue queue) { + this(assumeOnline, queue, false); + } + + private BrokerRunning(boolean assumeOnline) { + this(assumeOnline, new Queue(DEFAULT_QUEUE_NAME)); } @Override public Statement apply(Statement base, FrameworkMethod method, Object target) { + // Check at the beginning, so this can be used as a static field if (assumeOnline) { Assume.assumeTrue(brokerOnline); } else { @@ -76,14 +111,30 @@ public Statement apply(Statement base, FrameworkMethod method, Object target) { } try { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); RabbitAdmin admin = new RabbitAdmin(connectionFactory); - admin.declareQueue(new Queue("test.broker.running")); - admin.deleteQueue("test.broker.running"); + + String queueName = queue.getName(); + + if (purge) { + logger.debug("Deleting queue: " + queueName); + // Delete completely - gets rid of consumers and bindings as well + admin.deleteQueue(queueName); + } + + admin.declareQueue(queue); + + if (isDefaultQueue(queueName)) { + // Just for test probe. + admin.deleteQueue(queueName); + queue = null; + } brokerOffline = false; if (!assumeOnline) { Assume.assumeTrue(brokerOffline); } + } catch (Exception e) { logger.warn("Not executing tests because basic connectivity test failed", e); brokerOnline = false; @@ -92,9 +143,12 @@ public Statement apply(Statement base, FrameworkMethod method, Object target) { } } - return super.apply(base, method, target); } + private boolean isDefaultQueue(String queue) { + return DEFAULT_QUEUE_NAME.equals(queue); + } + }