Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,6 @@ public void interceptor() {
TestUtils.getPropertyValue(channel, "dispatcher"), "maxSubscribers", Integer.class).intValue());
channel = context.getBean("pubSub", MessageChannel.class);
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
assertSame(mbf, TestUtils.getPropertyValue(channel, "dispatcher.messageBuilderFactory"));
assertSame(mbf, TestUtils.getPropertyValue(channel, "container.messageListener.messageBuilderFactory"));
assertTrue(TestUtils.getPropertyValue(channel, "container.missingQueuesFatal", Boolean.class));
assertFalse(TestUtils.getPropertyValue(channel, "container.transactional", Boolean.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* @author Mark Fisher
* @author Dave Syer
* @author Gary Russell
* @author Artem Bilan
* @since 2.0
*/
public abstract class AbstractAggregatingMessageGroupProcessor implements MessageGroupProcessor,
Expand All @@ -52,9 +53,23 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;

private BeanFactory beanFactory;

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(beanFactory);
this.beanFactory = beanFactory;
}

protected MessageBuilderFactory getMessageBuilderFactory() {
if (!this.messageBuilderFactorySet) {
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
this.messageBuilderFactorySet = true;
}
return this.messageBuilderFactory;
}

@Override
Expand All @@ -65,10 +80,10 @@ public final Object processMessageGroup(MessageGroup group) {
Object payload = this.aggregatePayloads(group, headers);
AbstractIntegrationMessageBuilder<?> builder;
if (payload instanceof Message<?>) {
builder = this.messageBuilderFactory.fromMessage((Message<?>) payload).copyHeadersIfAbsent(headers);
builder = getMessageBuilderFactory().fromMessage((Message<?>) payload).copyHeadersIfAbsent(headers);
}
else {
builder = this.messageBuilderFactory.withPayload(payload).copyHeadersIfAbsent(headers);
builder = getMessageBuilderFactory().withPayload(payload).copyHeadersIfAbsent(headers);
}

return builder.popSequenceDetails().build();
Expand All @@ -89,7 +104,8 @@ protected Map<String, Object> aggregateHeaders(MessageGroup group) {
for (Entry<String, Object> entry : message.getHeaders().entrySet()) {
String key = entry.getKey();
if (MessageHeaders.ID.equals(key) || MessageHeaders.TIMESTAMP.equals(key)
|| IntegrationMessageHeaderAccessor.SEQUENCE_SIZE.equals(key) || IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER.equals(key)) {
|| IntegrationMessageHeaderAccessor.SEQUENCE_SIZE.equals(key)
|| IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER.equals(key)) {
continue;
}
Object value = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,14 +22,16 @@
import java.util.Collection;
import java.util.Map;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.Message;
import org.springframework.integration.util.AbstractExpressionEvaluator;
import org.springframework.integration.util.MessagingMethodInvokerHelper;

/**
* A MessageListProcessor implementation that invokes a method on a target POJO.
*
*
* @author Dave Syer
* @author Artem Bilan
* @since 2.0
*/
public class MethodInvokingMessageListProcessor<T> extends AbstractExpressionEvaluator {
Expand Down Expand Up @@ -57,6 +59,12 @@ public MethodInvokingMessageListProcessor(Object targetObject, Class<? extends A
delegate = new MessagingMethodInvokerHelper<T>(targetObject, annotationType, Object.class, true);
}

@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
this.delegate.setBeanFactory(beanFactory);
}

public String toString() {
return delegate.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,6 @@
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.ParseException;
import org.springframework.expression.spel.SpelParserConfiguration;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.core.MessagingTemplate;
Expand Down Expand Up @@ -63,9 +62,9 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact

private final MessagingTemplate messagingTemplate = new MessagingTemplate();

private volatile PublisherMetadataSource metadataSource;
private final ExpressionParser parser = new SpelExpressionParser();

private final ExpressionParser parser = new SpelExpressionParser(new SpelParserConfiguration(true, true));
private volatile PublisherMetadataSource metadataSource;

private volatile DestinationResolver<MessageChannel> channelResolver;

Expand All @@ -75,6 +74,8 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;

private volatile String defaultChannelName;

public MessagePublishingInterceptor(PublisherMetadataSource metadataSource) {
Expand Down Expand Up @@ -114,7 +115,16 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
this.messagingTemplate.setBeanFactory(beanFactory);
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(beanFactory);
}

protected MessageBuilderFactory getMessageBuilderFactory() {
if (!this.messageBuilderFactorySet) {
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
this.messageBuilderFactorySet = true;
}
return this.messageBuilderFactory;
}

@Override
Expand Down Expand Up @@ -164,8 +174,8 @@ private void publishMessage(Method method, StandardEvaluationContext context) th
Object result = expression.getValue(context);
if (result != null) {
AbstractIntegrationMessageBuilder<?> builder = (result instanceof Message<?>)
? this.messageBuilderFactory.fromMessage((Message<?>) result)
: this.messageBuilderFactory.withPayload(result);
? getMessageBuilderFactory().fromMessage((Message<?>) result)
: getMessageBuilderFactory().withPayload(result);
Map<String, Object> headers = this.evaluateHeaders(method, context);
if (headers != null) {
builder.copyHeaders(headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ public String getComponentType() {
}

@Override
public final void setBeanFactory(BeanFactory beanFactory) {
public void setBeanFactory(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "'beanFactory' must not be null");
this.beanFactory = beanFactory;
this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
}

@Override
Expand All @@ -143,9 +142,15 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv

@Override
public final void afterPropertiesSet() {
this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
try {
if (this.messageBuilderFactory == null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
if (this.beanFactory != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just move this to getMessageBuilderFactory() to make it really lazy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be done

this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
else {
this.messageBuilderFactory = new DefaultMessageBuilderFactory();
}
}
this.onInit();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.core;

import java.util.Properties;
Expand All @@ -22,6 +23,7 @@
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.GenericMessagingTemplate;

Expand All @@ -35,6 +37,10 @@
*/
public class MessagingTemplate extends GenericMessagingTemplate {

private BeanFactory beanFactory;

private volatile boolean throwExceptionOnLateReplySet;

/**
* Create a MessagingTemplate with no default channel. Note, that one
* may be provided by invoking {@link #setDefaultChannel(MessageChannel)}.
Expand All @@ -55,10 +61,14 @@ public MessagingTemplate(MessageChannel defaultChannel) {
*/
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
super.setDestinationResolver(new BeanFactoryChannelResolver(beanFactory));
Properties integrationProperties = IntegrationContextUtils.getIntegrationProperties(beanFactory);
Boolean throwExceptionOnLateReply = Boolean.valueOf(integrationProperties.getProperty(IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY));
this.setThrowExceptionOnLateReply(throwExceptionOnLateReply);
}

@Override
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
super.setThrowExceptionOnLateReply(throwExceptionOnLateReply);
this.throwExceptionOnLateReplySet = true;
}

/**
Expand All @@ -70,4 +80,21 @@ public void setDefaultChannel(MessageChannel channel) {
super.setDefaultDestination(channel);
}

@Override
public Message<?> sendAndReceive(MessageChannel destination, Message<?> requestMessage) {
if (!this.throwExceptionOnLateReplySet) {
synchronized (this) {
if (!this.throwExceptionOnLateReplySet) {
Properties integrationProperties =
IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
Boolean throwExceptionOnLateReply = Boolean.valueOf(integrationProperties
.getProperty(IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY));
super.setThrowExceptionOnLateReply(throwExceptionOnLateReply);
this.throwExceptionOnLateReplySet = true;
}
}
}
return super.sendAndReceive(destination, requestMessage);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,7 @@
* @author Iwein Fuld
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
public class BroadcastingDispatcher extends AbstractDispatcher implements BeanFactoryAware {

Expand All @@ -59,6 +60,10 @@ public class BroadcastingDispatcher extends AbstractDispatcher implements BeanFa

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;

private BeanFactory beanFactory;


public BroadcastingDispatcher() {
this(null, false);
Expand Down Expand Up @@ -114,7 +119,17 @@ public void setMinSubscribers(int minSubscribers) {

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(beanFactory);
this.beanFactory = beanFactory;
}

protected MessageBuilderFactory getMessageBuilderFactory() {
if (!this.messageBuilderFactorySet) {
if (this.beanFactory != null) {
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
}
this.messageBuilderFactorySet = true;
}
return this.messageBuilderFactory;
}

@Override
Expand All @@ -127,7 +142,7 @@ public boolean dispatch(Message<?> message) {
}
int sequenceSize = handlers.size();
for (final MessageHandler handler : handlers) {
final Message<?> messageToSend = (!this.applySequence) ? message : this.messageBuilderFactory.fromMessage(message)
final Message<?> messageToSend = (!this.applySequence) ? message : getMessageBuilderFactory().fromMessage(message)
.pushSequenceDetails(message.getHeaders().getId(), sequenceNumber++, sequenceSize).build();
if (this.executor != null) {
this.executor.execute(new Runnable() {
Expand Down Expand Up @@ -174,5 +189,4 @@ else if (this.logger.isWarnEnabled()) {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,21 @@
import org.springframework.expression.EvaluationContext;

/**
* Interface to be implemented by beans that wish to be aware of their
* owning integration {@link EvaluationContext}, which is the result of
* {@link org.springframework.integration.config.IntegrationEvaluationContextFactoryBean}
* <p>
* The {@link #setIntegrationEvaluationContext} is invoked from
* the {@link IntegrationEvaluationContextAwareBeanPostProcessor#afterSingletonsInstantiated()},
* not during standard {@code postProcessBefore(After)Initialization} to avoid any
* {@code BeanFactory} early access during integration {@link EvaluationContext} retrieval.
* Therefore, if it is necessary to use {@link EvaluationContext} in the {@code afterPropertiesSet()},
* the {@code IntegrationContextUtils.getEvaluationContext(this.beanFactory)} should be used instead
* of this interface implementation.
*
* @author Artem Bilan
* @since 3.0
* @see IntegrationEvaluationContextAwareBeanPostProcessor
*/
public interface IntegrationEvaluationContextAware {

Expand Down
Loading