Skip to content

Commit

Permalink
INT-1626 the JMS 'message-driven-channel-adapter' element now accepts…
Browse files Browse the repository at this point in the history
… the 'error-channel' attribute
  • Loading branch information
markfisher committed Nov 16, 2010
1 parent 6b8a0da commit ed9a8d9
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ private String parseMessageListener(Element element, ParserContext parserContext
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-timeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "extract-request-payload");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "extract-reply-payload");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
int defaults = 0;
if (StringUtils.hasText(element.getAttribute(DEFAULT_REPLY_DESTINATION_ATTRIB))) {
defaults++;
Expand Down Expand Up @@ -191,6 +190,7 @@ private String parseMessageListener(Element element, ParserContext parserContext
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout", "requestTimeout");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "extract-payload", "extractRequestPayload");
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "message-converter");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "header-mapper");
BeanDefinition beanDefinition = builder.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,15 @@
</xsd:attribute>
<xsd:attribute name="extract-payload" type="xsd:string" default="true"/>
<xsd:attribute name="send-timeout" type="xsd:string"/>
<xsd:attribute name="error-channel" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.integration.core.MessageChannel"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="transaction-manager" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

<int-jms:message-driven-channel-adapter channel="jmsInputChannel" container="container"/>

<int-jms:message-driven-channel-adapter channel="jmsInputChannel" destination="queueB" error-channel="testErrorChannel"/>

<int:service-activator input-channel="jmsInputChannel">
<bean id="testService" class="org.springframework.integration.jms.config.InboundOneWayErrorTests$TestService"/>
</int:service-activator>

<int:channel id="testErrorChannel">
<int:queue/>
</int:channel>

<bean id="queueA" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="oneway.a"/>
</bean>

<bean id="queueB" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="oneway.b"/>
</bean>

<bean id="container" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="queueA"/>
<property name="errorHandler" ref="testErrorHandler"/>
</bean>

<bean id="testErrorHandler" class="org.springframework.integration.jms.config.InboundOneWayErrorTests$TestErrorHandler"/>

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost"/>
</bean>
</property>
<property name="sessionCacheSize" value="10"/>
<property name="cacheProducers" value="false"/>
</bean>

</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.jms.config;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;

import org.junit.Test;

import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.core.PollableChannel;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.util.ErrorHandler;

/**
* @author Mark Fisher
*/
public class InboundOneWayErrorTests {

@Test
public void noErrorChannel() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("InboundOneWayErrorTests-context.xml", getClass());
JmsTemplate jmsTemplate = new JmsTemplate(context.getBean("connectionFactory", ConnectionFactory.class));
Destination queue = context.getBean("queueA", Destination.class);
jmsTemplate.send(queue, new MessageCreator() {
public javax.jms.Message createMessage(Session session) throws JMSException {
return session.createTextMessage("test-A");
}
});
TestErrorHandler errorHandler = context.getBean("testErrorHandler", TestErrorHandler.class);
errorHandler.latch.await(3000, TimeUnit.MILLISECONDS);
assertNotNull(errorHandler.lastError);
assertNotNull(errorHandler.lastError.getCause());
assertEquals("failed to process: test-A", errorHandler.lastError.getCause().getMessage());
PollableChannel testErrorChannel = context.getBean("testErrorChannel", PollableChannel.class);
assertNull(testErrorChannel.receive(0));
context.close();
}

@Test
public void errorChannel() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("InboundOneWayErrorTests-context.xml", getClass());
JmsTemplate jmsTemplate = new JmsTemplate(context.getBean("connectionFactory", ConnectionFactory.class));
Destination queue = context.getBean("queueB", Destination.class);
jmsTemplate.send(queue, new MessageCreator() {
public javax.jms.Message createMessage(Session session) throws JMSException {
return session.createTextMessage("test-B");
}
});
PollableChannel errorChannel = context.getBean("testErrorChannel", PollableChannel.class);
Message<?> errorMessage = errorChannel.receive(3000);
assertNotNull(errorMessage);
assertEquals(MessageHandlingException.class, errorMessage.getPayload().getClass());
MessageHandlingException exception = (MessageHandlingException) errorMessage.getPayload();
assertNotNull(exception.getCause());
assertEquals(TestException.class, exception.getCause().getClass());
assertEquals("failed to process: test-B", exception.getCause().getMessage());
TestErrorHandler errorHandler = context.getBean("testErrorHandler", TestErrorHandler.class);
assertNull(errorHandler.lastError);
context.close();
}


public static class TestService {
public void process(Object o) {
throw new TestException("failed to process: " + o);
}
}


@SuppressWarnings("serial")
private static class TestException extends RuntimeException {
public TestException(String message) {
super(message);
}
}


@SuppressWarnings("unused")
private static class TestErrorHandler implements ErrorHandler {

private final CountDownLatch latch = new CountDownLatch(1);

private volatile Throwable lastError;

public void handleError(Throwable t) {
this.lastError = t;
this.latch.countDown();
}
}

}

0 comments on commit ed9a8d9

Please sign in to comment.