Navigation Menu

Skip to content

Commit

Permalink
Consolidate test set up code in the BrokerRunning rule
Browse files Browse the repository at this point in the history
  • Loading branch information
dsyer committed Nov 11, 2010
1 parent 3e13076 commit 59fa5c6
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 149 deletions.
Expand Up @@ -6,11 +6,11 @@
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.utility.Utility;
/**
* Variation on QueueingConsumer in RabbitMQ, uses 'put' instead of 'add' and stored a reference to the consumerTag that
Expand All @@ -20,8 +20,6 @@
*/
public class BlockingQueueConsumer extends DefaultConsumer {

private String consumerTag;

private final BlockingQueue<Delivery> queue;

// When this is non-null the queue is in shutdown mode and nextDelivery should
Expand All @@ -43,18 +41,7 @@ public BlockingQueueConsumer(Channel ch, BlockingQueue<Delivery> q)
super(ch);
this.queue = q;
}



public String getConsumerTag() {
return consumerTag;
}

public void setConsumerTag(String consumerTag)
{
this.consumerTag = consumerTag;
}


@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
shutdown = sig;
try {
Expand Down
Expand Up @@ -260,8 +260,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer(final Channel channe
String[] queue = StringUtils.commaDelimitedListToStringArray(queueNames);
for (int i = 0; i < queue.length; i++) {
channel.queueDeclarePassive(queue[i]);
String consumerTag = channel.basicConsume(queue[i], !isChannelTransacted(), consumer);
consumer.setConsumerTag(consumerTag);
channel.basicConsume(queue[i], !isChannelTransacted(), consumer);
}
return consumer;
}
Expand Down
Expand Up @@ -3,10 +3,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
Expand All @@ -23,27 +21,12 @@

public class RabbitBindingIntegrationTests {

private Queue queue;
private Queue queue = new Queue("test.queue");

private RabbitTemplate template = new RabbitTemplate(
new CachingConnectionFactory());
private RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory());

@Rule
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning();

@Before
public void declareQueue() {
RabbitAdmin admin = new RabbitAdmin(template);
try {
admin.deleteQueue("test.queue");
} catch (AmqpIOException e) {
// Ignore (queue didn't exist)
}
queue = new Queue("test.queue");
// Idempotent, so no problem to do this for every test
admin.declareQueue(queue);
admin.purgeQueue("test.queue", false);
}
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);

@Test
public void testSendAndReceiveWithTopicSingleCallback() throws Exception {
Expand All @@ -53,25 +36,29 @@ public void testSendAndReceiveWithTopicSingleCallback() throws Exception {
admin.declareExchange(exchange);
template.setExchange(exchange.getName());

admin.declareBinding(BindingBuilder.from(queue).to(exchange)
.with("*.end"));
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end"));

template.execute(new ChannelCallback<Void>() {
public Void doInRabbit(Channel channel) throws Exception {

BlockingQueueConsumer consumer = new BlockingQueueConsumer(
channel);
String tag = channel.basicConsume(queue.getName(), true,
consumer);
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);

template.convertAndSend("foo", "message");
String result = getResult(consumer);
assertEquals(null, result);

template.convertAndSend("foo.end", "message");
result = getResult(consumer);
assertEquals("message", result);
try {

String result = getResult(consumer);
assertEquals(null, result);

template.convertAndSend("foo.end", "message");
result = getResult(consumer);
assertEquals("message", result);

} finally {
channel.basicCancel(tag);
}

return null;

Expand All @@ -82,36 +69,29 @@ public Void doInRabbit(Channel channel) throws Exception {

@Test
// @Ignore("Not sure yet if we need to support a use case like this")
public void testSendAndReceiveWithTopicConsumeInBackground()
throws Exception {
public void testSendAndReceiveWithTopicConsumeInBackground() throws Exception {

RabbitAdmin admin = new RabbitAdmin(template);
TopicExchange exchange = new TopicExchange("topic");
admin.declareExchange(exchange);
template.setExchange(exchange.getName());

admin.declareBinding(BindingBuilder.from(queue).to(exchange)
.with("*.end"));
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end"));

final RabbitTemplate template = new RabbitTemplate(
new CachingConnectionFactory());
final RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory());
template.setExchange(exchange.getName());

BlockingQueueConsumer consumer = template
.execute(new ChannelCallback<BlockingQueueConsumer>() {
public BlockingQueueConsumer doInRabbit(Channel channel)
throws Exception {
BlockingQueueConsumer consumer = template.execute(new ChannelCallback<BlockingQueueConsumer>() {
public BlockingQueueConsumer doInRabbit(Channel channel) throws Exception {

BlockingQueueConsumer consumer = new BlockingQueueConsumer(
channel);
String tag = channel.basicConsume(queue.getName(),
true, consumer);
assertNotNull(tag);
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);

return consumer;
return consumer;

}
});
}
});

template.convertAndSend("foo", "message");
String result = getResult(consumer);
Expand All @@ -121,6 +101,8 @@ public BlockingQueueConsumer doInRabbit(Channel channel)
result = getResult(consumer);
assertEquals("message", result);

consumer.getChannel().basicCancel(consumer.getConsumerTag());

}

@Test
Expand All @@ -131,16 +113,13 @@ public void testSendAndReceiveWithTopicTwoCallbacks() throws Exception {
admin.declareExchange(exchange);
template.setExchange(exchange.getName());

admin.declareBinding(BindingBuilder.from(queue).to(exchange)
.with("*.end"));
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end"));

template.execute(new ChannelCallback<Void>() {
public Void doInRabbit(Channel channel) throws Exception {

BlockingQueueConsumer consumer = new BlockingQueueConsumer(
channel);
String tag = channel.basicConsume(queue.getName(), true,
consumer);
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);

try {
Expand All @@ -159,10 +138,8 @@ public Void doInRabbit(Channel channel) throws Exception {
template.execute(new ChannelCallback<Void>() {
public Void doInRabbit(Channel channel) throws Exception {

BlockingQueueConsumer consumer = new BlockingQueueConsumer(
channel);
String tag = channel.basicConsume(queue.getName(), true,
consumer);
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);

try {
Expand All @@ -180,15 +157,13 @@ public Void doInRabbit(Channel channel) throws Exception {

}

private String getResult(final BlockingQueueConsumer consumer)
throws InterruptedException {
private String getResult(final BlockingQueueConsumer consumer) throws InterruptedException {
Delivery response = consumer.nextDelivery(200L);
if (response == null) {
return null;
}
MessageProperties messageProps = RabbitUtils.createMessageProperties(
response.getProperties(), response.getEnvelope(), "UTF-8");
return (String) new SimpleMessageConverter().fromMessage(new Message(
response.getBody(), messageProps));
MessageProperties messageProps = RabbitUtils.createMessageProperties(response.getProperties(),
response.getEnvelope(), "UTF-8");
return (String) new SimpleMessageConverter().fromMessage(new Message(response.getBody(), messageProps));
}
}
Expand Up @@ -11,14 +11,11 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.UncategorizedAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.support.RabbitUtils;
Expand All @@ -42,20 +39,7 @@ public class RabbitTemplateIntegrationTests {
private RabbitTemplate template = new RabbitTemplate(new CachingConnectionFactory());

@Rule
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning();

@Before
public void declareQueue() {
RabbitAdmin admin = new RabbitAdmin(template);
try {
admin.deleteQueue(ROUTE);
}
catch (AmqpIOException e) {
// Ignore (queue didn't exist)
}
admin.declareQueue(new Queue(ROUTE));
admin.purgeQueue(ROUTE, false);
}
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE);

@Test
public void testSendAndReceive() throws Exception {
Expand Down
Expand Up @@ -7,8 +7,6 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.test.BrokerRunning;
import org.springframework.amqp.rabbit.test.Log4jLevelAdjuster;
Expand All @@ -30,7 +28,7 @@ public class RabbitTemplatePerformanceIntegrationTests {

@Rule
// After the repeat processor, so it only runs once
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning();
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(ROUTE);

private CachingConnectionFactory connectionFactory;

Expand All @@ -44,17 +42,6 @@ public void declareQueue() {
connectionFactory.setChannelCacheSize(repeat.getConcurrency());
// connectionFactory.setPort(5673);
template.setConnectionFactory(connectionFactory);
// TODO: investigate the effects of these flags...
// template.setMandatoryPublish(true);
// template.setImmediatePublish(true);
RabbitAdmin admin = new RabbitAdmin(template);
try {
admin.deleteQueue(ROUTE);
} catch (AmqpIOException e) {
// Ignore (queue didn't exist)
}
admin.declareQueue(new Queue(ROUTE));
admin.purgeQueue(ROUTE, false);
}

@After
Expand Down
Expand Up @@ -18,10 +18,8 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.springframework.amqp.AmqpIOException;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.test.BrokerRunning;
Expand All @@ -31,7 +29,7 @@ public class MessageListenerContainerLifecycleIntegrationTests {

private static Log logger = LogFactory.getLog(MessageListenerContainerLifecycleIntegrationTests.class);

private Queue queue;
private Queue queue = new Queue("test.queue");

private RabbitTemplate template = new RabbitTemplate();

Expand All @@ -40,7 +38,7 @@ public class MessageListenerContainerLifecycleIntegrationTests {
private final boolean transactional;

@Rule
public static BrokerRunning brokerIsRunning = BrokerRunning.isRunning();
public BrokerRunning brokerIsRunning = BrokerRunning.isRunningWithEmptyQueue(queue);

private final int messageCount;

Expand All @@ -62,17 +60,6 @@ public void declareQueue() {
connectionFactory.setChannelCacheSize(concurrentConsumers);
connectionFactory.setPort(5673);
template.setConnectionFactory(connectionFactory);
RabbitAdmin admin = new RabbitAdmin(template);
try {
admin.deleteQueue("test.queue");
}
catch (AmqpIOException e) {
// Ignore (queue didn't exist)
}
queue = new Queue("test.queue");
// Idempotent, so no problem to do this for every test
admin.declareQueue(queue);
admin.purgeQueue("test.queue", false);
}

@Test
Expand Down

0 comments on commit 59fa5c6

Please sign in to comment.