Skip to content

Commit

Permalink
Merge pull request #435 from artembilan/INT-2275
Browse files Browse the repository at this point in the history
* INT-2275:
  INT-2275: any outbound-channel-adapter in <chain>
  • Loading branch information
olegz committed May 14, 2012
2 parents 4d5b8d5 + 45c429e commit 6c2f3d8
Show file tree
Hide file tree
Showing 58 changed files with 1,301 additions and 317 deletions.
Expand Up @@ -22,5 +22,10 @@
mapped-request-headers="foo*"/>

<int:channel id="requestChannel"/>


<int:chain id="chainWithRabbitOutbound" input-channel="amqpOutboundChannelAdapterWithinChain">
<amqp:outbound-channel-adapter exchange-name="outboundchanneladapter.test.1"/>
</int:chain>


</beans>
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.integration.amqp.config;

import java.lang.reflect.Field;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -33,6 +34,7 @@
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.MessageHandlerChain;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.test.context.ContextConfiguration;
Expand All @@ -47,6 +49,7 @@
* @author Mark Fisher
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
* @since 2.1
*/
@ContextConfiguration
Expand Down Expand Up @@ -99,4 +102,35 @@ public Object answer(InvocationOnMock invocation) {
Mockito.verify(amqpTemplate, Mockito.times(1)).send(Mockito.any(String.class), Mockito.any(String.class), Mockito.any(org.springframework.amqp.core.Message.class));
}

@SuppressWarnings("rawtypes")
@Test
public void amqpOutboundChannelAdapterWithinChain() {
Object eventDrivernConsumer = context.getBean("chainWithRabbitOutbound");

List chainHandlers = TestUtils.getPropertyValue(eventDrivernConsumer, "handler.handlers", List.class);

AmqpOutboundEndpoint endpoint = (AmqpOutboundEndpoint) chainHandlers.get(0);

Field amqpTemplateField = ReflectionUtils.findField(AmqpOutboundEndpoint.class, "amqpTemplate");
amqpTemplateField.setAccessible(true);
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(endpoint, "amqpTemplate", RabbitTemplate.class);
amqpTemplate = Mockito.spy(amqpTemplate);

Mockito.doAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
org.springframework.amqp.core.Message amqpReplyMessage = (org.springframework.amqp.core.Message) args[2];
assertEquals("hello", new String(amqpReplyMessage.getBody()));
return null;
}})
.when(amqpTemplate).send(Mockito.any(String.class), Mockito.any(String.class), Mockito.any(org.springframework.amqp.core.Message.class));
ReflectionUtils.setField(amqpTemplateField, endpoint, amqpTemplate);


MessageChannel requestChannel = context.getBean("amqpOutboundChannelAdapterWithinChain", MessageChannel.class);
Message<?> message = MessageBuilder.withPayload("hello").build();
requestChannel.send(message);
Mockito.verify(amqpTemplate, Mockito.times(1)).send(Mockito.any(String.class), Mockito.any(String.class), Mockito.any(org.springframework.amqp.core.Message.class));
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,18 +25,28 @@
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.util.StringUtils;

/**
* Base parser for Channel Adapters.
*
*
* Includes logic to determine {@link org.springframework.integration.MessageChannel}:
* if 'channel' attribute is defined - uses its value as 'channelName';
* if 'id' attribute is defined - creates {@link DirectChannel} at runtime and uses id's value as 'channelName';
* if current component is defined as nested element inside any other components e.g. &lt;chain&gt;
* 'id' and 'channel' attributes will be ignored and this component will not be parsed as
* {@link org.springframework.integration.endpoint.AbstractEndpoint}.
*
* @author Mark Fisher
* @author Artem Bilan
*/
public abstract class AbstractChannelAdapterParser extends AbstractBeanDefinitionParser {

@Override
protected final String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) throws BeanDefinitionStoreException {
String id = element.getAttribute("id");
protected final String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext)
throws BeanDefinitionStoreException {
String id = element.getAttribute(ID_ATTRIBUTE);
if (!element.hasAttribute("channel")) {
// the created channel will get the 'id', so the adapter's bean name includes a suffix
id = id + ".adapter";
Expand All @@ -57,13 +67,15 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont
}

private String createDirectChannel(Element element, ParserContext parserContext) {
String channelId = element.getAttribute("id");
if (parserContext.isNested()) {
return null;
}
String channelId = element.getAttribute(ID_ATTRIBUTE);
if (!StringUtils.hasText(channelId)) {
parserContext.getReaderContext().error("The channel-adapter's 'id' attribute is required when no 'channel' "
+ "reference has been provided, because that 'id' would be used for the created channel.", element);
}
BeanDefinitionBuilder channelBuilder = BeanDefinitionBuilder.genericBeanDefinition(
IntegrationNamespaceUtils.BASE_PACKAGE + ".channel.DirectChannel");
BeanDefinitionBuilder channelBuilder = BeanDefinitionBuilder.genericBeanDefinition(DirectChannel.class);
BeanDefinitionHolder holder = new BeanDefinitionHolder(channelBuilder.getBeanDefinition(), channelId);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry());
return channelId;
Expand All @@ -72,7 +84,7 @@ private String createDirectChannel(Element element, ParserContext parserContext)
/**
* Subclasses must implement this method to parse the adapter element.
* The name of the MessageChannel bean is provided.
*/
*/
protected abstract AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName);

}
Expand Up @@ -23,22 +23,31 @@
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;

/**
* Base class for outbound Channel Adapter parsers.
*
* If this component is defined as the top-level element in the Spring application context it will produce
* an {@link org.springframework.integration.endpoint.AbstractEndpoint} depending on the channel type.
* If this component is defined as nested element (e.g., inside of the chain) it will produce
* a {@link org.springframework.integration.core.MessageHandler}.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*/
public abstract class AbstractOutboundChannelAdapterParser extends AbstractChannelAdapterParser {

@Override
protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
if (parserContext.isNested()) {
return this.parseConsumer(element, parserContext);
}
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerEndpointFactoryBean.class);
Element pollerElement = DomUtils.getChildElementByTagName(element, "poller");
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
IntegrationNamespaceUtils.BASE_PACKAGE + ".config.ConsumerEndpointFactoryBean");
builder.addPropertyReference("handler", this.parseAndRegisterConsumer(element, parserContext));
if (pollerElement != null) {
if (!StringUtils.hasText(channelName)) {
Expand All @@ -60,15 +69,13 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
protected String parseAndRegisterConsumer(Element element, ParserContext parserContext) {
AbstractBeanDefinition definition = this.parseConsumer(element, parserContext);
if (definition == null) {
parserContext.getReaderContext().error(
"Consumer parsing must return a BeanComponentDefinition.", element);
parserContext.getReaderContext().error("Consumer parsing must return an AbstractBeanDefinition.", element);
}
String order = element.getAttribute("order");
String order = element.getAttribute(IntegrationNamespaceUtils.ORDER);
if (StringUtils.hasText(order)) {
definition.getPropertyValues().addPropertyValue("order", order);
definition.getPropertyValues().addPropertyValue(IntegrationNamespaceUtils.ORDER, order);
}
String beanName = BeanDefinitionReaderUtils.generateBeanName(
definition, parserContext.getRegistry());
String beanName = BeanDefinitionReaderUtils.generateBeanName(definition, parserContext.getRegistry());
parserContext.registerBeanComponent(new BeanComponentDefinition(definition, beanName));
return beanName;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,16 +18,14 @@

import org.w3c.dom.Element;

import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.config.ExpressionFactoryBean;
import org.springframework.integration.handler.ExpressionEvaluatingMessageHandler;
import org.springframework.integration.handler.MethodInvokingMessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
Expand All @@ -39,7 +37,8 @@
*/
public class DefaultOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {

protected String parseAndRegisterConsumer(Element element, ParserContext parserContext) {
@Override
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
BeanComponentDefinition innerConsumerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element, parserContext);

String consumerRef = element.getAttribute(IntegrationNamespaceUtils.REF_ATTRIBUTE);
Expand All @@ -61,42 +60,27 @@ protected String parseAndRegisterConsumer(Element element, ParserContext parserC
"The 'method' attribute cannot be used with the 'expression' attribute.", element);
}

if (hasMethod | isExpression) {
BeanDefinitionBuilder consumerBuilder = null;
if (hasMethod) {
consumerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingMessageHandler.class);
if (isRef) {
consumerBuilder.addConstructorArgReference(consumerRef);
}
else {
consumerBuilder.addConstructorArgValue(innerConsumerDefinition);
}
consumerBuilder.addConstructorArgValue(methodName);
BeanDefinitionBuilder consumerBuilder = null;

if (isExpression) {
consumerBuilder = BeanDefinitionBuilder.genericBeanDefinition(ExpressionEvaluatingMessageHandler.class);
RootBeanDefinition expressionDef = new RootBeanDefinition(ExpressionFactoryBean.class);
expressionDef.getConstructorArgumentValues().addGenericArgumentValue(consumerExpressionString);
consumerBuilder.addConstructorArgValue(expressionDef);
}
else {
consumerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingMessageHandler.class);
if (isRef) {
consumerBuilder.addConstructorArgReference(consumerRef);
}
else {
consumerBuilder = BeanDefinitionBuilder.genericBeanDefinition(ExpressionEvaluatingMessageHandler.class);
RootBeanDefinition expressionDef = new RootBeanDefinition(ExpressionFactoryBean.class);
expressionDef.getConstructorArgumentValues().addGenericArgumentValue(consumerExpressionString);
consumerBuilder.addConstructorArgValue(expressionDef);
consumerBuilder.addConstructorArgValue(innerConsumerDefinition);
}

consumerBuilder.addPropertyValue("componentType", "outbound-channel-adapter");
String order = element.getAttribute(IntegrationNamespaceUtils.ORDER);
if (StringUtils.hasText(order)) {
consumerBuilder.addPropertyValue(IntegrationNamespaceUtils.ORDER, order);
}
consumerRef = BeanDefinitionReaderUtils.registerWithGeneratedName(consumerBuilder.getBeanDefinition(), parserContext.getRegistry());
}
else if (isInnerConsumer) {
consumerRef = innerConsumerDefinition.getBeanName();
consumerBuilder.addConstructorArgValue(hasMethod ? methodName : "handleMessage");
}
Assert.hasText(consumerRef, "cannot determine consumer for 'outbound-channel-adapter'");
return consumerRef;
}

@Override
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
throw new UnsupportedOperationException();
consumerBuilder.addPropertyValue("componentType", "outbound-channel-adapter");
return consumerBuilder.getBeanDefinition();
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2011 the original author or authors.
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,9 @@

package org.springframework.integration.handler;

import java.util.HashSet;
import java.util.List;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.Ordered;
import org.springframework.integration.Message;
Expand All @@ -34,6 +32,9 @@
import org.springframework.integration.support.channel.ChannelResolver;
import org.springframework.util.Assert;

import java.util.HashSet;
import java.util.List;

/**
* A composite {@link MessageHandler} implementation that invokes a chain of
* MessageHandler instances in order.
Expand Down Expand Up @@ -65,8 +66,9 @@
* @author Mark Fisher
* @author Iwein Fuld
* @author Gary Russell
* @author Artem Bilan
*/
public class MessageHandlerChain extends AbstractMessageHandler implements MessageProducer, Ordered {
public class MessageHandlerChain extends AbstractMessageHandler implements MessageProducer {

private volatile List<MessageHandler> handlers;

Expand All @@ -79,8 +81,6 @@ public class MessageHandlerChain extends AbstractMessageHandler implements Messa
*/
private volatile Long sendTimeout = null;

private volatile int order = Ordered.LOWEST_PRECEDENCE;

private volatile ChannelResolver channelResolver;

private volatile boolean initialized;
Expand All @@ -100,15 +100,6 @@ public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
}

public void setOrder(int order) {
this.order = order;
}

public int getOrder() {
return this.order;
}


@Override
public String getComponentType() {
return "chain";
Expand Down Expand Up @@ -156,6 +147,13 @@ public boolean send(Message<?> message) {
}
};
((MessageProducer) handler).setOutputChannel(nextChannel);

// If this 'handler' is a nested non-last &lt;chain&gt;, it is necessary
// to 'force' re-init it for check its configuration in conjunction with current MessageHandlerChain.
if (handler instanceof MessageHandlerChain) {
new DirectFieldAccessor(handler).setPropertyValue("initialized", false);
((MessageHandlerChain) handler).afterPropertiesSet();
}
}
else if (handler instanceof MessageProducer) {
MessageChannel replyChannel = new ReplyForwardingMessageChannel();
Expand Down

0 comments on commit 6c2f3d8

Please sign in to comment.