Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

SWITCHYARD-1397 JMSMessageComposer creates only object messages #465

Closed
wants to merge 1 commit into from

2 participants

@igarashitm
Collaborator

Add a "messageType" property on JMSProcessor in order to specify outbound message type. Default message type is still ObjectMessage.

@igarashitm igarashitm SWITCHYARD-1397 JMSMessageComposer creates only object messages
Add a "messageType" property on JMSProcessor in order to specify outbound message type. Default message type is still ObjectMessage.
cf41b74
@kcbabo
Owner

processing

@kcbabo
Owner

pushed

@kcbabo kcbabo closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 10, 2013
  1. @igarashitm

    SWITCHYARD-1397 JMSMessageComposer creates only object messages

    igarashitm authored
    Add a "messageType" property on JMSProcessor in order to specify outbound message type. Default message type is still ObjectMessage.
This page is out of date. Refresh to see the latest.
View
42 jca/src/main/java/org/switchyard/component/jca/composer/JMSMessageComposer.java
@@ -18,6 +18,7 @@
*/
package org.switchyard.component.jca.composer;
+import java.io.InputStream;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.HashMap;
@@ -98,9 +99,44 @@
public JMSBindingData decompose(Exchange exchange, JMSBindingData target) throws Exception {
getContextMapper().mapTo(exchange.getContext(), target);
Message jmsMessage = target.getMessage();
- ObjectMessage targetObj = ObjectMessage.class.cast(jmsMessage);
- // expect transformer to transform the content into Serializable ...
- targetObj.setObject(exchange.getMessage().getContent(Serializable.class));
+
+ if (jmsMessage instanceof ObjectMessage) {
+ ObjectMessage msg = ObjectMessage.class.cast(jmsMessage);
+ // expect transformer to transform the content into Serializable ...
+ msg.setObject(exchange.getMessage().getContent(Serializable.class));
+
+ } else if (jmsMessage instanceof TextMessage) {
+ TextMessage msg = TextMessage.class.cast(jmsMessage);
+ msg.setText(exchange.getMessage().getContent(String.class));
+
+ } else if (jmsMessage instanceof BytesMessage) {
+ BytesMessage msg = BytesMessage.class.cast(jmsMessage);
+ msg.writeBytes(exchange.getMessage().getContent(byte[].class));
+
+ } else if (jmsMessage instanceof StreamMessage) {
+ StreamMessage msg = StreamMessage.class.cast(jmsMessage);
+ byte[] buffer = new byte[8192];
+ int size = 0;
+ if (exchange.getMessage().getContent() instanceof StreamMessage) {
+ // in case the StreamMessage is passed through from JMS inbound
+ StreamMessage sm = exchange.getMessage().getContent(StreamMessage.class);
+ while ((size = sm.readBytes(buffer)) > 0) {
+ msg.writeBytes(buffer, 0, size);
+ }
+ } else {
+ InputStream is = exchange.getMessage().getContent(InputStream.class);
+ while ((size = is.read(buffer)) > 0) {
+ msg.writeBytes(buffer, 0, size);
+ }
+ }
+
+ } else if (jmsMessage instanceof MapMessage) {
+ MapMessage msg = MapMessage.class.cast(jmsMessage);
+ Map<?,?> map = exchange.getMessage().getContent(Map.class);
+ for (Object key : map.keySet()) {
+ msg.setObject(key.toString(), map.get(key));
+ }
+ }
return target;
}
View
38 jca/src/main/java/org/switchyard/component/jca/processor/JMSProcessor.java
@@ -54,7 +54,9 @@
public static final String KEY_ACKNOWLEDGE_MODE = "acknowledgeMode";
/** key for destination property. */
public static final String KEY_DESTINATION = "destination";
-
+ /** key for message type property. */
+ public static final String KEY_MESSAGE_TYPE = "messageType";
+
private Logger _logger = Logger.getLogger(JMSProcessor.class);
private String _userName;
private String _password;
@@ -65,6 +67,11 @@
private int _ackMode;
private ConnectionFactory _connectionFactory;
private Destination _jmsDestination;
+ private MessageType _outMessageType = MessageType.Object;
+
+ private enum MessageType {
+ Stream, Map, Text, Object, Bytes, Plain
+ }
@Override
public AbstractOutboundProcessor setConnectionSpec(String name, Properties props) {
@@ -122,7 +129,27 @@ public void uninitialize() {
session = connection.createSession(_txEnabled, _ackMode);
MessageProducer producer = session.createProducer(_jmsDestination);
- Message msg = session.createObjectMessage();
+ Message msg;
+ switch (_outMessageType) {
+ case Stream:
+ msg = session.createStreamMessage();
+ break;
+ case Map:
+ msg = session.createMapMessage();
+ break;
+ case Text:
+ msg = session.createTextMessage();
+ break;
+ case Bytes:
+ msg = session.createBytesMessage();
+ break;
+ case Plain:
+ msg = session.createMessage();
+ break;
+ default:
+ msg = session.createObjectMessage();
+ }
+
MessageComposer<JMSBindingData> composer = JCAComposition.getMessageComposer(getJCABindingModel(), JMSBindingData.class);
producer.send(composer.decompose(exchange, new JMSBindingData(msg)).getMessage());
return null;
@@ -190,4 +217,11 @@ public void setAcknowledgeMode(String ack) {
_acknowledgeMode = ack;
}
+ /**
+ * set message type.
+ * @param type message type
+ */
+ public void setMessageType(String type) {
+ _outMessageType = MessageType.valueOf(type);
+ }
}
View
1  jca/src/test/java/org/switchyard/component/jca/deploy/JCAJMSReference.java
@@ -2,4 +2,5 @@
public interface JCAJMSReference {
public void onMessage(String body);
+ public void onMessageText(String body);
}
View
18 jca/src/test/java/org/switchyard/component/jca/deploy/JCAJMSReferenceBindingTest.java
@@ -22,6 +22,7 @@
package org.switchyard.component.jca.deploy;
import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
import javax.transaction.UserTransaction;
import junit.framework.Assert;
@@ -55,6 +56,9 @@
@ServiceOperation("JCAJMSReferenceService.onMessage")
private Invoker _service;
+ @ServiceOperation("JCAJMSReferenceService.onMessageText")
+ private Invoker _serviceText;
+
@BeforeDeploy
public void before() {
ResourceAdapterConfig ra = new ResourceAdapterConfig(ResourceAdapterConfig.ResourceAdapterType.HORNETQ);
@@ -74,6 +78,20 @@ public void testUnmanagedOutboundJMS() throws Exception {
}
@Test
+ public void testUnmanagedOutboundJMSSpecifyingMessageType() throws Exception {
+ String payload = "Shikishima";
+ _serviceText.sendInOnly(payload);
+
+ final MessageConsumer consumer = _hqMixIn.getJMSSession().createConsumer(HornetQMixIn.getJMSQueue(OUTPUT_QUEUE));
+ javax.jms.Message msg = consumer.receive(1000);
+ Assert.assertTrue(msg instanceof TextMessage);
+ javax.jms.TextMessage txtmsg = TextMessage.class.cast(msg);
+ Assert.assertEquals(payload+"test", txtmsg.getText());
+ Assert.assertEquals(msg.getStringProperty("testProp"), "testVal");
+
+ }
+
+ @Test
public void testManagedOutboundJMS() throws Exception {
String payload = "Ashihiki";
UserTransaction tx = _jcaMixIn.getUserTransaction();
View
2  jca/src/test/java/org/switchyard/component/jca/deploy/JCAJMSReferenceService.java
@@ -2,4 +2,6 @@
public interface JCAJMSReferenceService {
public void onMessage(String body);
+
+ public void onMessageText(String body);
}
View
8 jca/src/test/java/org/switchyard/component/jca/deploy/JCAJMSReferenceServiceImpl.java
@@ -9,9 +9,17 @@
public class JCAJMSReferenceServiceImpl implements JCAJMSReferenceService {
@Inject @Reference
private JCAJMSReference service;
+
+ @Inject @Reference("JCAJMSReferenceText")
+ private JCAJMSReference serviceText;
@Override
public void onMessage(String name) {
service.onMessage(name);
}
+
+ @Override
+ public void onMessageText(String name) {
+ serviceText.onMessageText(name);
+ }
}
View
19 jca/src/test/resources/org/switchyard/component/jca/deploy/switchyard-outbound-jms-test.xml
@@ -36,6 +36,22 @@
</outboundInteraction>
</binding.jca>
</sca:reference>
+ <sca:reference name="JCAJMSReferenceText" promote="ComponentName/JCAJMSReferenceText" multiplicity="1..1">
+ <binding.jca xmlns="urn:switchyard-component-jca:config:1.0">
+ <jca:contextMapper class="org.switchyard.component.jca.deploy.MyJMSContextMapper"/>
+ <jca:messageComposer class="org.switchyard.component.jca.deploy.MyJMSMessageComposer"/>
+ <outboundConnection>
+ <resourceAdapter name="hornetq-ra.rar"/>
+ <connection jndiName="java:/JmsXA"/>
+ </outboundConnection>
+ <outboundInteraction>
+ <processor type="org.switchyard.component.jca.processor.JMSProcessor">
+ <property name="destination" value="TestQueue"/>
+ <property name="messageType" value="Text"/>
+ </processor>
+ </outboundInteraction>
+ </binding.jca>
+ </sca:reference>
<sca:component name="ComponentName">
<bean:implementation.bean class="org.switchyard.component.jca.deploy.JCAJMSReferenceServiceImpl"/>
@@ -45,6 +61,9 @@
<sca:reference name="JCAJMSReference">
<sca:interface.java interface="org.switchyard.component.jca.deploy.JCAJMSReference"/>
</sca:reference>
+ <sca:reference name="JCAJMSReferenceText">
+ <sca:interface.java interface="org.switchyard.component.jca.deploy.JCAJMSReference"/>
+ </sca:reference>
</sca:component>
</sca:composite>
Something went wrong with that request. Please try again.