Skip to content

Commit

Permalink
INT-3349 BeanFactory Propagation
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-3349

Several FactoryBeans did not propagate the BeanFactory
to their created object(s). Beans that create messages
must have access to a bean factory to get the
message builder factory.

Fix the FactoryBeans and add a mock FB to all tests that
need one.

Add a runtime environment variable to make any infractions
fatal. This should be set to `true` on CI builds and on
framework developer environments.
  • Loading branch information
garyrussell committed Apr 2, 2014
1 parent 35af66c commit 9dee131
Show file tree
Hide file tree
Showing 62 changed files with 521 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -4,4 +4,4 @@ services:
- rabbitmq
- redis-server
env:
- TERM=dumb
- TERM=dumb SI_FATAL_WHEN_NO_BEANFACTORY=true GRADLE_OPTS='-XX:MaxPermSize=512M -Xmx1024M'
Expand Up @@ -318,9 +318,6 @@ protected AbstractAmqpChannel createInstance() throws Exception {
if (this.maxSubscribers != null) {
pubsub.setMaxSubscribers(this.maxSubscribers);
}
if (this.getBeanFactory() != null) {
pubsub.setBeanFactory(this.getBeanFactory());
}
this.channel = pubsub;
}
else {
Expand Down Expand Up @@ -350,8 +347,11 @@ protected AbstractAmqpChannel createInstance() throws Exception {
if (!CollectionUtils.isEmpty(this.interceptors)) {
this.channel.setInterceptors(this.interceptors);
}
this.channel.afterPropertiesSet();
this.channel.setBeanName(this.beanName);
if (this.getBeanFactory() != null) {
this.channel.setBeanFactory(this.getBeanFactory());
}
this.channel.afterPropertiesSet();
return this.channel;
}

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageDeliveryException;

Expand Down Expand Up @@ -80,6 +81,7 @@ public Channel answer(InvocationOnMock invocation) throws Throwable {
PointToPointSubscribableAmqpChannel amqpChannel = new PointToPointSubscribableAmqpChannel("noSubscribersChannel",
container, amqpTemplate);
amqpChannel.setBeanName("noSubscribersChannel");
amqpChannel.setBeanFactory(mock(BeanFactory.class));
amqpChannel.afterPropertiesSet();

MessageListener listener = (MessageListener) container.getMessageListener();
Expand Down Expand Up @@ -115,6 +117,7 @@ protected Queue initializeQueue(AmqpAdmin admin,
return queue;
}};
amqpChannel.setBeanName("noSubscribersChannel");
amqpChannel.setBeanFactory(mock(BeanFactory.class));
amqpChannel.afterPropertiesSet();

List<String> logList = insertMockLoggerInListener(amqpChannel);
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2013 the original author or authors.
* Copyright 2013-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@
import org.springframework.amqp.support.converter.JsonMessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
Expand All @@ -60,6 +61,7 @@

/**
* @author Artem Bilan
* @author Gary Russell
* @since 3.0
*/
public class InboundEndpointTests {
Expand All @@ -84,6 +86,7 @@ public Channel answer(InvocationOnMock invocation) throws Throwable {
PollableChannel channel = new QueueChannel();

adapter.setOutputChannel(channel);
adapter.setBeanFactory(mock(BeanFactory.class));
adapter.afterPropertiesSet();

Object payload = new Foo("bar1");
Expand Down Expand Up @@ -122,6 +125,7 @@ public Channel answer(InvocationOnMock invocation) throws Throwable {
PollableChannel channel = new QueueChannel();

adapter.setOutputChannel(channel);
adapter.setBeanFactory(mock(BeanFactory.class));
adapter.afterPropertiesSet();

Object payload = new Foo("bar1");
Expand Down Expand Up @@ -171,6 +175,7 @@ public Message<?> transform(Message<?> message) {
gateway.setMessageConverter(new JsonMessageConverter());

gateway.setRequestChannel(channel);
gateway.setBeanFactory(mock(BeanFactory.class));
gateway.afterPropertiesSet();

RabbitTemplate rabbitTemplate = Mockito.spy(TestUtils.getPropertyValue(gateway, "amqpTemplate", RabbitTemplate.class));
Expand Down
Expand Up @@ -86,6 +86,11 @@ public abstract class IntegrationContextUtils {

public static final String GLOBAL_CHANNEL_INTERCEPTOR_PROCESSOR_BEAN_NAME = "globalChannelInterceptorProcessor";

/**
* Should be set to TRUE on CI plans and framework developer systems.
*/
public static final boolean fatalWhenNoBeanFactory = Boolean.valueOf(System.getenv("SI_FATAL_WHEN_NO_BEANFACTORY"));

/**
* @param beanFactory BeanFactory for lookup, must not be null.
* @return The {@link MetadataStore} bean whose name is "metadataStore".
Expand Down Expand Up @@ -195,6 +200,9 @@ public static MessageBuilderFactory getMessageBuilderFactory(BeanFactory beanFac
logger.warn("No 'beanFactory' supplied; cannot find MessageBuilderFactory"
+ ", using default.");
}
if (fatalWhenNoBeanFactory) {
throw new RuntimeException("All Message creators need a BeanFactory");
}
}
if (messageBuilderFactory == null) {
messageBuilderFactory = new DefaultMessageBuilderFactory();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -18,11 +18,13 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.SimpleMessageStore;
Expand All @@ -36,6 +38,7 @@
* @author Mark Fisher
* @author Marius Bogoevici
* @author Iwein Fuld
* @author Gary Russell
*/
public class AggregatorTests {

Expand All @@ -47,6 +50,7 @@ public class AggregatorTests {
@Before
public void configureAggregator() {
this.aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), store);
this.aggregator.setBeanFactory(mock(BeanFactory.class));
this.aggregator.afterPropertiesSet();
}

Expand Down Expand Up @@ -218,6 +222,7 @@ private static Message<?> createMessage(Object payload, Object correlationId, in


private class MultiplyingProcessor implements MessageGroupProcessor {
@Override
public Object processMessageGroup(MessageGroup group) {
Integer product = 1;
for (Message<?> message : group.getMessages()) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@
import org.junit.Before;
import org.junit.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MessageBuilder;
Expand All @@ -33,19 +34,20 @@

public class CorrelatingMessageHandlerIntegrationTests {

private MessageGroupStore store = new SimpleMessageStore(100);
private final MessageGroupStore store = new SimpleMessageStore(100);

private MessageChannel outputChannel = mock(MessageChannel.class);
private final MessageChannel outputChannel = mock(MessageChannel.class);

private MessageGroupProcessor processor = new PassThroughMessageGroupProcessor();
private final MessageGroupProcessor processor = new PassThroughMessageGroupProcessor();

private AggregatingMessageHandler defaultHandler = new AggregatingMessageHandler(processor, store);
private final AggregatingMessageHandler defaultHandler = new AggregatingMessageHandler(processor, store);

@Before
public void setupHandler() {
when(outputChannel.send(isA(Message.class))).thenReturn(true);
defaultHandler.setOutputChannel(outputChannel);
defaultHandler.setSendTimeout(-1);
defaultHandler.setBeanFactory(mock(BeanFactory.class));
defaultHandler.afterPropertiesSet();
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,13 +21,16 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;

import org.junit.Before;
import org.junit.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.store.MessageGroupStore;
Expand All @@ -42,6 +45,7 @@
* @author Dave Syer
* @author Iwein Fuld
* @author Oleg Zhurakousky
* @author Gary Russell
*/
public class ResequencerTests {

Expand All @@ -54,6 +58,7 @@ public class ResequencerTests {
@Before
public void configureResequencer() {
this.resequencer = new ResequencingMessageHandler(processor, store, null, null);
this.resequencer.setBeanFactory(mock(BeanFactory.class));
this.resequencer.afterPropertiesSet();
}

Expand Down Expand Up @@ -82,6 +87,7 @@ public void testBasicResequencingA() throws InterruptedException {
SequenceSizeReleaseStrategy releaseStrategy = new SequenceSizeReleaseStrategy();
releaseStrategy.setReleasePartialSequences(true);
this.resequencer = new ResequencingMessageHandler(processor, store, null, releaseStrategy);
this.resequencer.setBeanFactory(mock(BeanFactory.class));
this.resequencer.afterPropertiesSet();

QueueChannel replyChannel = new QueueChannel();
Expand All @@ -102,10 +108,12 @@ public void testBasicUnboundedResequencing() throws InterruptedException {
this.resequencer = new ResequencingMessageHandler(processor, store, null, releaseStrategy);
QueueChannel replyChannel = new QueueChannel();
this.resequencer.setCorrelationStrategy(new CorrelationStrategy() {
@Override
public Object getCorrelationKey(Message<?> message) {
return "A";
}
});
this.resequencer.setBeanFactory(mock(BeanFactory.class));
this.resequencer.afterPropertiesSet();

//Message<?> message0 = MessageBuilder.withPayload("0").setSequenceNumber(0).build();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,23 +17,26 @@
package org.springframework.integration.endpoint;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import org.junit.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;

/**
* @author Oleg Zhurakousky
* @author Mark Fisher
* @author Gary Russell
* @since 2.0.1
*/
public class MessageProducerSupportTests {
Expand All @@ -43,6 +46,7 @@ public void validateExceptionIfNoErrorChannel() {
DirectChannel outChannel = new DirectChannel();

outChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
throw new RuntimeException("problems");
}
Expand All @@ -59,12 +63,14 @@ public void handleMessage(Message<?> message) throws MessagingException {
public void validateExceptionIfSendToErrorChannelFails() {
DirectChannel outChannel = new DirectChannel();
outChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
throw new RuntimeException("problems");
}
});
PublishSubscribeChannel errorChannel = new PublishSubscribeChannel();
errorChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
throw new RuntimeException("ooops");
}
Expand All @@ -82,13 +88,15 @@ public void handleMessage(Message<?> message) throws MessagingException {
public void validateSuccessfulErrorFlowDoesNotThrowErrors() {
DirectChannel outChannel = new DirectChannel();
outChannel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
throw new RuntimeException("problems");
}
});
PublishSubscribeChannel errorChannel = new PublishSubscribeChannel();
SuccessfulErrorService errorService = new SuccessfulErrorService();
ServiceActivatingHandler handler = new ServiceActivatingHandler(errorService);
handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();
errorChannel.subscribe(handler);
MessageProducerSupport mps = new MessageProducerSupport() {};
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -129,6 +129,7 @@ public void testSolicitResponse() throws Exception {
proxyFactory.setDefaultRequestChannel(new DirectChannel());
proxyFactory.setDefaultReplyChannel(replyChannel);
proxyFactory.setBeanName("testGateway");
proxyFactory.setBeanFactory(mock(BeanFactory.class));
proxyFactory.afterPropertiesSet();
TestService service = (TestService) proxyFactory.getObject();
String result = service.solicitResponse();
Expand Down

0 comments on commit 9dee131

Please sign in to comment.