Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

clean up, added tests to support the jms integration.

  • Loading branch information...
commit ac7518392cd8c5307b9780e703dac4e20006f0c5 1 parent 1a1e7eb
Josh Long authored
View
2  .gitignore
@@ -7,4 +7,4 @@
*.iml
target
.dropbox
-
+activemq-data
View
12 obm/pom.xml
@@ -15,6 +15,13 @@
<dependencyManagement>
<dependencies>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.3.2</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.3.0</version>
@@ -129,7 +136,12 @@
</dependencies>
</dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+
+ </dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
View
61 .../main/java/org/springframework/jms/support/converter/obm/MarshallingMessageConverter.java
@@ -15,6 +15,8 @@
*/
package org.springframework.jms.support.converter.obm;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
@@ -32,15 +34,52 @@
import java.io.IOException;
/**
+ * Because the {@link Marshaller} interface requires a class to which to transform the bytes,
+ * you must specify a {@link #payloadClass payload class} when using this {@link MessageConverter}.
+ * This is inconvenient, however, in most typical configurations a {@link org.springframework.jms.core.JmsTemplate}
+ * will be tied to the workload of one {@link javax.jms.Destination}, and typically a {@link javax.jms.Destination}
+ * will only transport one type of payload.
+ *
* @author Josh Long
* @see org.springframework.jms.support.converter.MarshallingMessageConverter
*/
public class MarshallingMessageConverter implements MessageConverter, InitializingBean {
+ private Log log = LogFactory.getLog(getClass());
+
private Marshaller marshaller;
private Unmarshaller unmarshaller;
private Class<?> payloadClass;
+ public MarshallingMessageConverter() {
+ }
+
+ public MarshallingMessageConverter(Class<?> aClass, Marshaller marshaller1) {
+ this();
+ try {
+ setMarshaller(marshaller1);
+ setPayloadClass(aClass);
+ if (marshaller1 instanceof Unmarshaller) {
+ setUnmarshaller((Unmarshaller) marshaller1);
+ }
+ afterPropertiesSet();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public MarshallingMessageConverter(Class<?> cl, Marshaller marshaller, Unmarshaller unmarshaller) {
+ this();
+ try {
+ setMarshaller(marshaller);
+ setUnmarshaller(unmarshaller);
+ setPayloadClass(cl);
+ afterPropertiesSet();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public void setPayloadClass(Class<?> payloadClass) {
this.payloadClass = payloadClass;
}
@@ -56,7 +95,11 @@ public void setUnmarshaller(Unmarshaller unmarshaller) {
@Override
public javax.jms.Message toMessage(Object object, javax.jms.Session session) throws JMSException, MessageConversionException {
try {
- return marshalToBytesMessage(object, session, this.marshaller);
+ javax.jms.Message msg = marshalToBytesMessage(object, session, this.marshaller);
+ if (log.isDebugEnabled()) {
+ log.debug("converted " + object + " to a message.");
+ }
+ return msg;
} catch (Exception ex) {
throw new MessageConversionException("Could not marshal [" + object + "]", ex);
}
@@ -88,20 +131,32 @@ protected Object unmarshalFromBytesMessage(Class clzz, BytesMessage message, org
byte[] bytes = new byte[(int) message.getBodyLength()];
message.readBytes(bytes);
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-
- return unmarshaller.unmarshal(clzz, bis);
+ Object result = unmarshaller.unmarshal(clzz, bis);
+ Assert.notNull(result, "the result from the queue is null");
+ if (log.isDebugEnabled()) {
+ log.debug("received: " + result );
+ }
+ return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected BytesMessage marshalToBytesMessage(Object object, Session session, org.springframework.obm.Marshaller marshaller) throws JMSException, IOException, XmlMappingException {
+ Assert.notNull(object);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
BytesMessage message;
try {
+
marshaller.marshal(object, bos);
+
message = session.createBytesMessage();
message.writeBytes(bos.toByteArray());
+
+ if (log.isDebugEnabled()) {
+ log.debug("sent:" + object);
+ }
+
} catch (Exception e) {
throw new RuntimeException(e);
}
View
6 obm/src/test/java/org/springframework/http/converter/obm/AvroHttpMessageConverterTest.java
@@ -30,7 +30,7 @@
import org.springframework.obm.avro.AvroMarshaller;
import org.springframework.obm.avro.crm.Customer;
import org.springframework.stereotype.Controller;
-import org.springframework.util.http.IntegrationTestUtils;
+import org.springframework.util.http.RestIntegrationTestUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -78,7 +78,7 @@ public void testAvroMarshaller() throws Throwable {
@Test
public void testSimpleIntegration() throws Throwable {
- IntegrationTestUtils.startServiceAndConnect(MyService.class, new IntegrationTestUtils.ServerExecutionCallback() {
+ RestIntegrationTestUtils.startServiceAndConnect(MyService.class, new RestIntegrationTestUtils.ServerExecutionCallback() {
@Override
public void doWithServer(RestTemplate restTemplate, Server server) throws Throwable {
Assert.assertNotNull(restTemplate);
@@ -104,7 +104,7 @@ public void doWithServer(RestTemplate restTemplate, Server server) throws Throwa
@Configuration
@EnableWebMvc
- static public class MyService extends IntegrationTestUtils.AbstractRestServiceConfiguration {
+ static public class MyService extends RestIntegrationTestUtils.AbstractRestServiceConfiguration {
@Bean
public CustomerController controller() {
return new CustomerController();
View
6 ...test/java/org/springframework/http/converter/obm/MessagePackHttpMessageConverterTest.java
@@ -31,7 +31,7 @@
import org.springframework.obm.messagepack.Cat;
import org.springframework.obm.messagepack.MessagePackMarshaller;
import org.springframework.stereotype.Controller;
-import org.springframework.util.http.IntegrationTestUtils;
+import org.springframework.util.http.RestIntegrationTestUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -74,7 +74,7 @@ public void testHttpReading() throws Throwable {
@Test
public void testSimpleIntegration() throws Throwable {
- IntegrationTestUtils.startServiceAndConnect(MyService.class, new IntegrationTestUtils.ServerExecutionCallback() {
+ RestIntegrationTestUtils.startServiceAndConnect(MyService.class, new RestIntegrationTestUtils.ServerExecutionCallback() {
@Override
public void doWithServer(RestTemplate clientRestTemplate, Server server) throws Throwable {
@@ -99,7 +99,7 @@ public void doWithServer(RestTemplate clientRestTemplate, Server server) throws
@Configuration
@EnableWebMvc
- static public class MyService extends IntegrationTestUtils.AbstractRestServiceConfiguration {
+ static public class MyService extends RestIntegrationTestUtils.AbstractRestServiceConfiguration {
@Bean
public CatController controller() {
return new CatController();
View
6 obm/src/test/java/org/springframework/http/converter/obm/ThriftHttpMessageConverterTest.java
@@ -32,7 +32,7 @@
import org.springframework.obm.thrift.ThriftMarshaller;
import org.springframework.obm.thrift.crm.Customer;
import org.springframework.stereotype.Controller;
-import org.springframework.util.http.IntegrationTestUtils;
+import org.springframework.util.http.RestIntegrationTestUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -75,7 +75,7 @@ public void testHttpReading() throws Throwable {
@Test
public void testSimpleIntegration() throws Throwable {
- IntegrationTestUtils.startServiceAndConnect(MyService.class, new IntegrationTestUtils.ServerExecutionCallback() {
+ RestIntegrationTestUtils.startServiceAndConnect(MyService.class, new RestIntegrationTestUtils.ServerExecutionCallback() {
@Override
public void doWithServer(RestTemplate clientRestTemplate, Server server) throws Throwable {
@@ -98,7 +98,7 @@ public void doWithServer(RestTemplate clientRestTemplate, Server server) throws
@Configuration
@EnableWebMvc
- static public class MyService extends IntegrationTestUtils.AbstractRestServiceConfiguration {
+ static public class MyService extends RestIntegrationTestUtils.AbstractRestServiceConfiguration {
@Bean
public CrmRestController controller() {
return new CrmRestController();
View
114 obm/src/test/java/org/springframework/jms/support/converter/obm/TestJmsMessageConverter.java
@@ -0,0 +1,114 @@
+package org.springframework.jms.support.converter.obm;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.obm.avro.AvroMarshaller;
+import org.springframework.obm.avro.crm.Customer;
+import org.springframework.obm.messagepack.Cat;
+import org.springframework.obm.messagepack.MessagePackMarshaller;
+import org.springframework.obm.protocolbuffers.ProtocolBuffersMarshaller;
+import org.springframework.obm.protocolbuffers.crm.Crm;
+import org.springframework.obm.thrift.ThriftMarshaller;
+import org.springframework.util.jms.JmsIntegrationTestUtils;
+
+/**
+ * Tests the {@link org.springframework.obm.avro.AvroMarshaller} with JMS (specifically, an embedded ActiveMQ broker).
+ *
+ * @author Josh Long
+ */
+public class TestJmsMessageConverter {
+
+ // thrift
+ private ThriftMarshaller thriftMarshaller = new ThriftMarshaller();
+ private org.springframework.obm.thrift.crm.Customer thriftCustomer = new org.springframework.obm.thrift.crm.Customer("John", "Doe", "email@email.com", 22);
+
+
+ // messagepack
+ private MessagePackMarshaller msgPackMarshaller = new MessagePackMarshaller();
+ private Cat msgPackCat;
+
+ // protocol buffers
+ private ProtocolBuffersMarshaller buffersMarshaller = new ProtocolBuffersMarshaller();
+ private Crm.Customer buffersCustomer;
+ // avro
+ private AvroMarshaller avroMarshaller = new AvroMarshaller();
+ private Customer avroCustomer = new Customer();
+
+ @Before
+ public void before() throws Throwable {
+ // protocol buffers
+ buffersCustomer = Crm.Customer.newBuilder().setEmail("email@e.com").setFirstName("john").setLastName("long").build();
+
+ // avro
+ avroCustomer.firstName = "josh";
+ avroCustomer.lastName = "long";
+ avroCustomer.email = "em@em.com";
+ avroCustomer.id = (int) (1000 * Math.random());
+
+ // message pack
+ msgPackCat = new Cat("Felix", (int) (1000 * Math.random()));
+ this.msgPackMarshaller.afterPropertiesSet();
+
+ // thrift
+ thriftMarshaller.afterPropertiesSet();
+ }
+
+ @Test
+ public void testThrift() throws Throwable {
+ JmsIntegrationTestUtils.startAndConnectToJmsBroker(org.springframework.obm.thrift.crm.Customer.class, thriftMarshaller, new JmsIntegrationTestUtils.JmsBrokerExecutionCallback() {
+ @Override
+ public void doWithActiveMq(BrokerService brokerService, JmsTemplate jmsTemplate) throws Throwable {
+ String avroDestination = "thrift";
+ jmsTemplate.convertAndSend(avroDestination, thriftCustomer);
+
+ org.springframework.obm.thrift.crm.Customer customerReceived =
+ (org.springframework.obm.thrift.crm.Customer) jmsTemplate.receiveAndConvert(avroDestination);
+ Assert.assertEquals(thriftCustomer, customerReceived);
+ }
+ });
+ }
+
+ @Test
+ public void testAvro() throws Throwable {
+ JmsIntegrationTestUtils.startAndConnectToJmsBroker(Customer.class, avroMarshaller, new JmsIntegrationTestUtils.JmsBrokerExecutionCallback() {
+ @Override
+ public void doWithActiveMq(BrokerService brokerService, JmsTemplate jmsTemplate) throws Throwable {
+ String avroDestination = "avro";
+ jmsTemplate.convertAndSend(avroDestination, avroCustomer);
+
+ Customer customerReceived = (Customer) jmsTemplate.receiveAndConvert(avroDestination);
+ Assert.assertEquals(avroCustomer, customerReceived);
+ }
+ });
+ }
+
+ @Test
+ public void testMessagePack() throws Throwable {
+ JmsIntegrationTestUtils.startAndConnectToJmsBroker(Cat.class, this.msgPackMarshaller, new JmsIntegrationTestUtils.JmsBrokerExecutionCallback() {
+ @Override
+ public void doWithActiveMq(BrokerService brokerService, JmsTemplate jmsTemplate) throws Throwable {
+ String pbDestination = "messagepack";
+ jmsTemplate.convertAndSend(pbDestination, msgPackCat);
+ Cat rCat = (Cat) jmsTemplate.receiveAndConvert(pbDestination);
+ Assert.assertEquals(rCat, msgPackCat);
+ }
+ });
+ }
+
+ @Test
+ public void testProtocolBuffers() throws Throwable {
+ JmsIntegrationTestUtils.startAndConnectToJmsBroker(Crm.Customer.class, this.buffersMarshaller, new JmsIntegrationTestUtils.JmsBrokerExecutionCallback() {
+ @Override
+ public void doWithActiveMq(BrokerService brokerService, JmsTemplate jmsTemplate) throws Throwable {
+ String pbDestination = "pb";
+ jmsTemplate.convertAndSend(pbDestination, buffersCustomer);
+ Crm.Customer receivedCustomer = (Crm.Customer) jmsTemplate.receiveAndConvert(pbDestination);
+ Assert.assertEquals(receivedCustomer, buffersCustomer);
+ }
+ });
+ }
+
+}
View
8 obm/src/test/java/org/springframework/obm/messagepack/Cat.java
@@ -50,6 +50,14 @@ public boolean equals(Object o) {
}
@Override
+ public String toString() {
+ return "Cat{" +
+ "name='" + name + '\'' +
+ ", id=" + id +
+ '}';
+ }
+
+ @Override
public int hashCode() {
int result = name != null ? name.hashCode() : 0;
result = 31 * result + id;
View
4 obm/src/test/java/org/springframework/remoting/thrift/TestThriftServiceExporter.java
@@ -19,7 +19,7 @@
import org.springframework.obm.thrift.crm.Customer;
import org.springframework.util.http.DispatcherServletJettyConfigurationCallback;
import org.springframework.util.http.EndpointTestUtils;
-import org.springframework.util.http.IntegrationTestUtils;
+import org.springframework.util.http.RestIntegrationTestUtils;
import org.springframework.web.servlet.handler.BeanNameUrlHandlerMapping;
import java.util.Arrays;
@@ -52,7 +52,7 @@ public void before() throws Throwable {
@After
public void after() throws Throwable {
- IntegrationTestUtils.stopServerQuietly(this.jettyServer);
+ RestIntegrationTestUtils.stopServerQuietly(this.jettyServer);
if (log.isDebugEnabled()) {
log.debug("stopped jetty server");
}
View
4 ...ework/util/http/IntegrationTestUtils.java → ...k/util/http/RestIntegrationTestUtils.java
@@ -31,14 +31,14 @@
*
* @author Josh Long
*/
-public class IntegrationTestUtils {
+public class RestIntegrationTestUtils {
public static interface
ServerExecutionCallback {
void doWithServer(RestTemplate restTemplate, Server server) throws Throwable;
}
- static Log log = LogFactory.getLog(IntegrationTestUtils.class);
+ static Log log = LogFactory.getLog(RestIntegrationTestUtils.class);
static private Map<AbstractRestServiceConfiguration, BeanFactory> beanFactoryMap = new ConcurrentHashMap<AbstractRestServiceConfiguration, BeanFactory>();
View
60 obm/src/test/java/org/springframework/util/jms/JmsIntegrationTestUtils.java
@@ -0,0 +1,60 @@
+package org.springframework.util.jms;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Assert;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.support.converter.obm.MarshallingMessageConverter;
+import org.springframework.obm.Marshaller;
+
+/**
+ * Supports integration tests with an embedded ActiveMQ instance.
+ *
+ * @author Josh Long
+ */
+abstract public class JmsIntegrationTestUtils {
+
+ static private Log log = LogFactory.getLog(JmsIntegrationTestUtils.class);
+
+ /**
+ * during the execution of this callback, a JMS broker will be available and the {@link JmsTemplate} required to
+ * connect to that JMS broker will be provided as a parameter to this
+ * class's {@link JmsBrokerExecutionCallback#doWithActiveMq(org.apache.activemq.broker.BrokerService, org.springframework.jms.core.JmsTemplate)}
+ */
+ public static interface JmsBrokerExecutionCallback {
+ void doWithActiveMq(BrokerService brokerService, JmsTemplate jmsTemplate) throws Throwable;
+ }
+
+ public static void startAndConnectToJmsBroker(Class<?> clzzForPayload, Marshaller m, JmsBrokerExecutionCallback callback) throws Throwable {
+ Assert.assertNotNull("the " + JmsBrokerExecutionCallback.class.getName() + "can not be null", callback);
+ String destinationUrl = "tcp://localhost:61617";
+ BrokerService broker = new BrokerService();
+ TransportConnector connector = broker.addConnector(destinationUrl);
+ broker.start();
+ while (!broker.isStarted()) {
+ Thread.sleep(500);
+ }
+ ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(destinationUrl);
+ MarshallingMessageConverter converter = new MarshallingMessageConverter(clzzForPayload, m);
+ JmsTemplate jmsTemplate = new JmsTemplate(activeMQConnectionFactory);
+ converter.setPayloadClass(clzzForPayload);
+ jmsTemplate.setMessageConverter(converter);
+ jmsTemplate.afterPropertiesSet();
+
+ try {
+ callback.doWithActiveMq(broker, jmsTemplate);
+ } catch (AssertionError ae) {
+ throw ae;
+ } catch (Throwable th) {
+ if (log.isErrorEnabled()) {
+ log.error("execution of jms session failed.", th);
+ }
+ } finally {
+ connector.stop();
+ broker.stop();
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.