Skip to content
Browse files

First pass at Camel RabbitMQ Component

  • Loading branch information...
1 parent b32abe6 commit 161dd3d2704f21c78009cb1ce93a871a61084dc7 @ljcoomber ljcoomber committed Feb 4, 2011
View
76 pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>net.lshift.net.camel</groupId>
+ <artifactId>camel-rabbitmq</artifactId>
+ <version>1.0-SNAPSHOT</version>
+
+ <packaging>jar</packaging>
+ <name>Camel RabbitMQ</name>
+ <description>Camel RabbitMQ support</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>rabbitmq-messagepatterns</artifactId>
+ <version>0.1.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <version>2.5.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ <version>1.2.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ <version>4.8.1</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
View
55 src/main/java/net/lshift/camel/component/rabbitmq/RabbitMqComponent.java
@@ -0,0 +1,55 @@
+package net.lshift.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.messagepatterns.unicast.ConnectionBuilder;
+import com.rabbitmq.messagepatterns.unicast.Connector;
+import com.rabbitmq.messagepatterns.unicast.Factory;
+
+
+public class RabbitMqComponent extends DefaultComponent {
+
+ private static transient Logger LOG = LoggerFactory.getLogger(RabbitMqComponent.class);
+
+ private ConnectionFactory connectionFactory;
+
+ public RabbitMqComponent() {
+ }
+
+ public RabbitMqComponent(CamelContext context) {
+ super(context);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ assert LOG.isDebugEnabled();
+ LOG.debug("Creating new RabbitMQ endpoint for {}", uri);
+ return new RabbitMqEndpoint(uri, this);
+ }
+
+ public ConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+
+ public void setConnectionFactory(ConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ }
+
+ protected Connector getConnector() {
+ return Factory.createConnector(new ConnectionBuilder() {
+ public Connection createConnection() throws IOException {
+ return (connectionFactory == null) ?
+ new ConnectionFactory().newConnection() : connectionFactory.newConnection();
+ }
+ });
+ }
+}
View
96 src/main/java/net/lshift/camel/component/rabbitmq/RabbitMqConsumer.java
@@ -0,0 +1,96 @@
+package net.lshift.camel.component.rabbitmq;
+
+import net.lshift.rabbitmq.MessageReceiver;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.impl.DefaultMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.messagepatterns.unicast.ReceivedMessage;
+
+public class RabbitMqConsumer extends DefaultConsumer implements Runnable {
+
+ private static transient Logger LOG = LoggerFactory.getLogger(RabbitMqConsumer.class);
+
+ private String queueName;
+
+ private String exchangeName;
+
+ private MessageReceiver receiver;
+
+ private java.util.concurrent.ExecutorService executor;
+
+ public RabbitMqConsumer(RabbitMqEndpoint endpoint, Processor processor,
+ String uri) throws Exception {
+ super(endpoint, processor);
+ parseUri(uri);
+ receiver = new MessageReceiver(endpoint.getConnection(), queueName, exchangeName);
+ }
+
+ public void parseUri(String uri) {
+ String[] split = uri.split("[:|/]");
+ if(split.length != 5) {
+ throw new IllegalArgumentException("Invalid URI - must contain a queue and an exchange: " + uri);
+ }
+ this.queueName = split[3];
+ this.exchangeName = split[4];
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.debug("Starting RabbitMQConsumer on queue {} bound to exchange {}...", queueName, exchangeName);
+
+ super.start();
+ receiver.start();
+
+ // TODO: Is this the right strategy?
+ executor = getEndpoint().getCamelContext().getExecutorServiceStrategy()
+ .newSingleThreadExecutor(this, getEndpoint().getEndpointUri());
+ executor.execute(this);
+
+ LOG.debug("RabbitMQConsumer stopped.");
+ }
+
+ @Override
+ public void stop() throws Exception {
+ LOG.debug("Stopping RabbitMQConsumer...");
+
+ if (executor != null) {
+ executor.shutdownNow();
+ executor = null;
+ }
+
+ super.doStop();
+
+ LOG.debug("RabbitMQConsumer stopped.");
+ }
+
+ public void run() {
+ try {
+ readFromQueue();
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ }
+ }
+
+ private void readFromQueue() throws Exception {
+ while (executor != null && !executor.isShutdown()) {
+ ReceivedMessage message = receiver.receive();
+ if(message != null) {
+ LOG.debug("Recieved message from queue {}", queueName);
+
+ Exchange exchange = getEndpoint().createExchange();
+
+ Message msg = new DefaultMessage();
+ msg.setBody(message.getBody());
+ exchange.setIn(msg);
+ getProcessor().process(exchange);
+ receiver.ack(message);
+ }
+ }
+ }
+}
View
60 src/main/java/net/lshift/camel/component/rabbitmq/RabbitMqEndpoint.java
@@ -0,0 +1,60 @@
+package net.lshift.camel.component.rabbitmq;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.messagepatterns.unicast.Connector;
+
+public class RabbitMqEndpoint extends DefaultEndpoint {
+
+ private static transient Logger LOG = LoggerFactory.getLogger(RabbitMqEndpoint.class);
+
+ private Connector conn;
+
+ public RabbitMqEndpoint(String uri, RabbitMqComponent component) {
+ super(uri, component);
+ }
+
+ public RabbitMqEndpoint(String uri) {
+ super(uri);
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.info("Starting RabbitMQ Endpoint...");
+ super.start();
+ conn = ((RabbitMqComponent) getComponent()).getConnector();
+ LOG.info("RabbitMQ Endpoint started.");
+ }
+
+ public void stop() throws Exception {
+ LOG.info("Stopping RabbitMQ Endpoint...");
+
+ if(conn != null) {
+ conn.close();
+ }
+
+ super.stop();
+ LOG.info("RabbitMQ Endpoint stopped.");
+ }
+
+ public boolean isSingleton() {
+ return true;
+ }
+
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new RabbitMqConsumer(this, processor, getEndpointUri());
+ }
+
+ public Producer createProducer() throws Exception {
+ return new RabbitMqProducer(this, getEndpointUri());
+ }
+
+ public Connector getConnection() {
+ return conn;
+ }
+}
View
60 src/main/java/net/lshift/camel/component/rabbitmq/RabbitMqProducer.java
@@ -0,0 +1,60 @@
+package net.lshift.camel.component.rabbitmq;
+
+import java.io.IOException;
+
+import net.lshift.rabbitmq.MessageSender;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.messagepatterns.unicast.Message;
+
+public class RabbitMqProducer extends DefaultProducer {
+
+ private static transient Logger LOG = LoggerFactory.getLogger(RabbitMqProducer.class);
+
+ private String exchangeName;
+
+ private MessageSender sender;
+
+ public RabbitMqProducer(RabbitMqEndpoint endpoint, String uri) throws Exception {
+ super(endpoint);
+ parseUri(uri);
+ sender = new MessageSender(endpoint.getConnection(), exchangeName);
+ }
+
+ protected void parseUri(String uri) {
+ String[] split = uri.split("[:|/]");
+ if(split.length != 4) {
+ throw new IllegalArgumentException("Invalid URI - must contain an exchange: " + uri);
+ }
+
+ exchangeName = split[3];
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.info("Starting RabbitMqProducer on exchange {}...", exchangeName);
+ super.start();
+ sender.start();
+ LOG.info("RabbitMqProducer stopped.");
+ }
+
+ @Override
+ public void stop() throws IOException {
+ LOG.info("Stopping RabbitMqProducer...");
+ if(sender != null) {
+ sender.stop();
+ }
+ LOG.info("RabbitMqProducer stopped.");
+ }
+
+ public void process(Exchange exchange) throws Exception {
+ Message message = sender.createMessage();
+ message.setBody(exchange.getIn().getBody(byte[].class));
+ LOG.debug("Sending message to exchange {}", exchangeName);
+ sender.send(message);
+ }
+}
View
122 src/main/java/net/lshift/rabbitmq/MessageReceiver.java
@@ -0,0 +1,122 @@
+package net.lshift.rabbitmq;
+
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.messagepatterns.unicast.ChannelSetupListener;
+import com.rabbitmq.messagepatterns.unicast.Connector;
+import com.rabbitmq.messagepatterns.unicast.ReceivedMessage;
+
+/**
+ * Thin wrapper around the Java Message Patterns receiver implementation so that
+ * we benefit from the channel / connection reliability, whilst hiding the
+ * Unicast abstractions that are not needed here.
+ */
+public class MessageReceiver {
+
+ private static transient Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
+
+ public static final boolean EXCLUSIVE = true;
+
+ public static final boolean AUTO_DELETE = false;
+
+ public static final boolean DURABLE = false;
+
+ protected ReceiverImpl receiver = new ReceiverImpl();
+
+ public MessageReceiver(Connector connector, final String queueName, final String exchangeName) throws Exception {
+ receiver.setConnector(connector);
+ receiver.setQueueName(queueName);
+
+ addChannelSetupListener(new ChannelSetupListener() {
+ public void channelSetup(Channel channel) throws IOException {
+ channel.queueDeclare(queueName, DURABLE, EXCLUSIVE, AUTO_DELETE, null);
+
+ // TODO: Error handling
+ channel.exchangeDeclarePassive(exchangeName);
+ channel.queueBind(queueName, exchangeName, "");
+ }
+ });
+ }
+
+ public Connector getConnector() {
+ return receiver.getConnector();
+ }
+
+ public String getQueueName() {
+ return receiver.getQueueName();
+ }
+
+ public void setQueueName(String queueName) {
+ receiver.setQueueName(queueName);
+ }
+
+ public void addChannelSetupListener(ChannelSetupListener channelSetup) {
+ receiver.addSetupListener(channelSetup);
+ }
+
+ public void removeChannelSetupListener(ChannelSetupListener channelSetup) {
+ receiver.removeSetupListener(channelSetup);
+ }
+
+ public void addChannelReceiverChannelSetupListener(ChannelSetupListener channelSetup) {
+ receiver.addSetupListener(channelSetup);
+ }
+
+ public void removeReceiverChannelSetupListener(ChannelSetupListener channelSetup) {
+ receiver.removeSetupListener(channelSetup);
+ }
+
+ public void start() throws Exception {
+ LOG.debug("Starting MessageReceiver on queue {}...", getQueueName());
+ receiver.init();
+ LOG.debug("MessageReceiver started.");
+ }
+
+ public void stop() throws IOException {
+ LOG.debug("Stopping MessageReceiver on queue {}...", getQueueName());
+ try {
+ receiver.deleteQueue();
+ } catch(Exception e) {
+ if(e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+
+ receiver.close();
+
+ LOG.debug("MessageReceiver stopped.");
+ }
+
+ public ReceivedMessage receive() throws Exception {
+ try {
+ return receiver.receive();
+ } catch(InterruptedException e) {
+ // Interrupt on blocking queue - ignore and return null
+ LOG.warn("Interrupt on blocking queue");
+ return null;
+ }
+ }
+
+ public ReceivedMessage receive(long timeout) throws Exception {
+ return receiver.receive(timeout);
+ }
+
+ public ReceivedMessage receiveNoWait() throws Exception {
+ return receiver.receiveNoWait();
+ }
+
+ public void ack(ReceivedMessage m) throws Exception {
+ receiver.ack(m);
+ }
+
+ public void cancel() throws IOException {
+ receiver.cancel();
+ }
+}
View
117 src/main/java/net/lshift/rabbitmq/MessageSender.java
@@ -0,0 +1,117 @@
+package net.lshift.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.messagepatterns.unicast.*;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thin wrapper around the Java Message Patterns sender implementation so that
+ * we benefit from the channel / connection reliability, whilst hiding the
+ * Unicast abstractions that are not needed here.
+ */
+public class MessageSender {
+
+ private static transient Logger LOG = LoggerFactory.getLogger(MessageSender.class);
+
+ public static final String EXCHANGE_TYPE = "fanout";
+
+ public static final boolean AUTO_DELETE = false;
+
+ public static final boolean DURABLE = false;
+
+
+ protected SenderImpl sender = new SenderImpl();
+
+ public MessageSender(Connector connector, final String exchangeName) throws Exception {
+ sender.setConnector(connector);
+ sender.setExchangeName(exchangeName);
+
+ addSenderSetupListener(new ChannelSetupListener() {
+ public void channelSetup(Channel channel) throws IOException {
+ channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE, DURABLE, AUTO_DELETE, null);
+ }
+ });
+ }
+
+ public Connector getConnector() {
+ return sender.getConnector();
+ }
+
+ public String getExchangeName() {
+ return sender.getExchangeName();
+ }
+
+ public void setExchangeName(String exchangeName) {
+ sender.setExchangeName(exchangeName);
+ }
+
+ public boolean isTransactional() {
+ return sender.isTransactional();
+ }
+
+ public void setTransactional(boolean transactional) {
+ sender.setTransactional(transactional);
+ }
+
+ public void addChannelSetupListener(ChannelSetupListener channelSetup) {
+ sender.addSetupListener(channelSetup);
+ }
+
+ public void removeChannelSetupListener(ChannelSetupListener channelSetup) {
+ sender.removeSetupListener(channelSetup);
+ }
+
+ public void addSenderSetupListener(ChannelSetupListener channelSetup) {
+ sender.addSetupListener(channelSetup);
+ }
+
+ public void removeSenderSetupListener(ChannelSetupListener channelSetup) {
+ sender.removeSetupListener(channelSetup);
+ }
+
+ public void addMessageSentListener(MessageSentListener listener) {
+ sender.addMessageSentListener(listener);
+ }
+
+ public void removeMessageSentListener(MessageSentListener listener) {
+ sender.removeMessageSentListener(listener);
+ }
+
+ public void start() throws Exception {
+ LOG.debug("Starting MessageSender for exchange {}...", getExchangeName());
+ sender.init();
+ LOG.debug("MessageSender started.");
+ }
+
+ public void stop() throws IOException {
+ LOG.debug("Stopping MessageSender for exchange {}...", getExchangeName());
+ try {
+ sender.deleteExchange();
+ } catch(Exception e) {
+ if(e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+
+ sender.close();
+ LOG.debug("MessageSender stoppped.");
+ }
+
+ public Message createMessage() {
+ Message msg = sender.createMessage();
+
+ // Set empty routing key as we want 1:N fanout delivery by default
+ msg.setTo("");
+ return msg;
+ }
+
+ public void send(Message m) throws Exception {
+ sender.send(m);
+ }
+}
View
26 src/main/java/net/lshift/rabbitmq/ReceiverImpl.java
@@ -0,0 +1,26 @@
+package net.lshift.rabbitmq;
+
+import java.io.IOException;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.messagepatterns.unicast.ConnectionListener;
+import com.rabbitmq.messagepatterns.unicast.Thunk;
+
+/**
+ * Extension of the MP implementation to support Queue deletion.
+ */
+public class ReceiverImpl extends com.rabbitmq.messagepatterns.unicast.impl.ReceiverImpl {
+
+ public void deleteQueue() throws Exception {
+ while (true) {
+ if (connector.attempt(new Thunk() {
+ public void run() throws IOException {
+ channel.queueDelete(getQueueName(), false, false);
+ }
+ }, new ConnectionListener() {
+ public void connected(Connection conn) throws IOException {
+ connect(conn);
+ }})) break;
+ }
+ }
+}
View
27 src/main/java/net/lshift/rabbitmq/SenderImpl.java
@@ -0,0 +1,27 @@
+package net.lshift.rabbitmq;
+
+import java.io.IOException;
+
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.messagepatterns.unicast.ConnectionListener;
+import com.rabbitmq.messagepatterns.unicast.Thunk;
+
+/**
+ * Extension of the MP implementation to support Exchange deletion.
+ */
+public class SenderImpl extends com.rabbitmq.messagepatterns.unicast.impl.SenderImpl {
+
+ public void deleteExchange() throws Exception {
+ while (true) {
+ if (connector.attempt(new Thunk() {
+ public void run() throws IOException {
+ channel.exchangeDelete(getExchangeName(), false);
+ if (transactional) channel.txCommit();
+ }
+ }, new ConnectionListener() {
+ public void connected(Connection conn) throws IOException {
+ connect(conn);
+ }})) break;
+ }
+ }
+}
View
18 src/main/resources/META-INF/services/org/apache/camel/component/rabbitmq
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=net.lshift.camel.component.rabbitmq.RabbitMqComponent
View
60 src/test/java/net/lshift/camel/component/rabbitmq/ConsumerTest.java
@@ -0,0 +1,60 @@
+package net.lshift.camel.component.rabbitmq;
+
+import java.io.IOException;
+
+import net.lshift.rabbitmq.MessageSender;
+import net.lshift.rabbitmq.MessageTest;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Test;
+
+import com.rabbitmq.messagepatterns.unicast.Message;
+
+// TODO: Test what happens with no exchange
+public class ConsumerTest extends CamelTestSupport {
+
+ private String MESSAGE = "TEST_MESSAGE_CONTENT";
+
+ private MessageSender sender;
+
+ @Test
+ public void consumerConsumes() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived(MESSAGE);
+ sendMessage(MESSAGE);
+ assertMockEndpointsSatisfied();
+ }
+
+ @After
+ public void shutdownSender() throws IOException {
+ sender.stop();
+ }
+
+ protected RouteBuilder routeBuilder = new RouteBuilder() {
+ public void configure() {
+ from("rabbitmq://TESTQ:TESTX").to("mock:result");
+ }
+ };
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ // Ensure sender is created before consumer
+ try {
+ sender = new MessageSender(MessageTest.createConnector(), "TESTX");
+ sender.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return routeBuilder;
+ }
+
+ protected void sendMessage(String msg) throws Exception {
+ Message message = sender.createMessage();
+ message.setBody(msg.getBytes());
+ sender.send(message);
+ }
+}
View
56 src/test/java/net/lshift/camel/component/rabbitmq/ProducerTest.java
@@ -0,0 +1,56 @@
+package net.lshift.camel.component.rabbitmq;
+
+import net.lshift.rabbitmq.MessageReceiver;
+import net.lshift.rabbitmq.MessageTest;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.rabbitmq.messagepatterns.unicast.ReceivedMessage;
+
+public class ProducerTest extends CamelTestSupport {
+
+ private String MESSAGE = "TEST_MESSAGE_CONTENT";
+
+ private MessageReceiver receiver;
+
+
+ @Before
+ public void setupReceiver() throws Exception {
+ receiver = new MessageReceiver(MessageTest.createConnector(), "TESTQ", "TESTX");
+ receiver.start();
+ }
+
+ @After
+ public void shutdownReceiver() throws Exception {
+ receiver.stop();
+ }
+
+ @Test
+ public void testSendStringContent() throws Exception {
+ template.sendBody("direct:in", MESSAGE);
+ assertMessageReceived(MESSAGE);
+ }
+
+ @Test
+ public void testSendBinaryContent() throws Exception {
+ template.sendBody("direct:in", MESSAGE.getBytes());
+ assertMessageReceived(MESSAGE);
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:in").to("rabbitmq://TESTX");
+ }
+ };
+ }
+
+ protected void assertMessageReceived(String msg) throws Exception {
+ ReceivedMessage message = receiver.receive(1000);
+ assertEquals(msg, new String(message.getBody()));
+ }
+}
View
90 src/test/java/net/lshift/rabbitmq/MessageTest.java
@@ -0,0 +1,90 @@
+package net.lshift.rabbitmq;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.messagepatterns.unicast.ConnectionBuilder;
+import com.rabbitmq.messagepatterns.unicast.Connector;
+import com.rabbitmq.messagepatterns.unicast.Factory;
+import com.rabbitmq.messagepatterns.unicast.Message;
+import com.rabbitmq.messagepatterns.unicast.ReceivedMessage;
+
+public class MessageTest {
+
+ protected static final String QUEUE_NAME = "TEST_QUEUE";
+
+ protected static final String EXCHANGE_NAME = "TEST_EXCHANGE";
+
+ protected static final String MESSAGE_BODY = "TEST_MESSAGE_BODY";
+
+ protected MessageSender sender;
+
+ protected MessageReceiver receiver;
+
+
+ public static Connector createConnector() {
+ return Factory.createConnector(new ConnectionBuilder() {
+ public Connection createConnection() throws IOException {
+ return new ConnectionFactory().newConnection();
+ }
+ });
+ }
+
+ @Before
+ public void initMessaging() throws Exception {
+ sender = new MessageSender(createConnector(), EXCHANGE_NAME);
+ sender.setTransactional(true);
+ sender.start();
+
+ receiver = new MessageReceiver(createConnector(), QUEUE_NAME, EXCHANGE_NAME);
+ receiver.start();
+ }
+
+ @Test
+ public void senderSendsMessagesAndReceiverReceivesThem() throws Exception {
+ Message msg = sender.createMessage();
+ msg.setBody(MESSAGE_BODY.getBytes());
+ sender.send(msg);
+
+ ReceivedMessage msg2 = receiver.receive(1000);
+ receiver.ack(msg2);
+ assertEquals(MESSAGE_BODY, new String(msg2.getBody()));
+
+ sender.stop();
+ receiver.stop();
+ }
+
+ @Test
+ public void receiverCanBeStoppedAndStarted() throws Exception {
+ receiver.stop();
+ receiver.start();
+
+ senderSendsMessagesAndReceiverReceivesThem();
+ }
+
+ @Test
+ public void senderCanBeStoppedAndStarted() throws Exception {
+ Message msg = sender.createMessage();
+ msg.setBody("HELLO".getBytes());
+
+ sender.stop();
+
+ try {
+ sender.send(msg);
+ fail("Expected exception sending to closed connection");
+ } catch(AlreadyClosedException e) {
+ // success
+ }
+
+ sender.start();
+ sender.send(msg);
+ }
+}
View
8 src/test/resources/log4j.properties
@@ -0,0 +1,8 @@
+# Test log config
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+log4j.logger.net.lshift=DEBUG

0 comments on commit 161dd3d

Please sign in to comment.
Something went wrong with that request. Please try again.