Skip to content

Commit

Permalink
BATCH-2686: Remove usage of BeanDefinitionRegistry in RemoteChunkingM…
Browse files Browse the repository at this point in the history
…asterStepBuilder

There is no need to register the RemoteChunkHandlerFactoryBean as a bean
in the bean definition registry. The fix uses this factory bean to create
a ChunkProcessorChunkHandler which is already a message endpoint configured
as a service activator
  • Loading branch information
fmbenhassine committed May 4, 2018
1 parent f721b85 commit 94bd4d1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 93 deletions.
Expand Up @@ -36,9 +36,7 @@
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.exception.ExceptionHandler;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.PollableChannel;
Expand All @@ -54,7 +52,7 @@
*
* <ul>
* <li>creates and sets a {@link ChunkMessageChannelItemWriter} on the master step</li>
* <li>registers a {@link RemoteChunkHandlerFactoryBean} in the given bean definition registry</li>
* <li>creates and configures a {@link ChunkProcessorChunkHandler} as a service activator</li>
* </ul>
*
* @param <I> type of input items
Expand All @@ -66,7 +64,6 @@
public class RemoteChunkingMasterStepBuilder<I, O> extends FaultTolerantStepBuilder<I, O> {

private MessagingTemplate messagingTemplate;
private BeanDefinitionRegistry beanDefinitionRegistry;
private PollableChannel inputChannel;
private DirectChannel outputChannel;

Expand All @@ -84,18 +81,6 @@ public RemoteChunkingMasterStepBuilder(String stepName) {
super(new StepBuilder(stepName));
}

/**
* Set the bean definition registry in which required beans will be registered.
*
* @param beanDefinitionRegistry the bean definition registry to use.
* @return this builder instance for fluent chaining
*/
public RemoteChunkingMasterStepBuilder<I, O> beanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) {
Assert.notNull(beanDefinitionRegistry, "beanDefinitionRegistry must not be null");
this.beanDefinitionRegistry = beanDefinitionRegistry;
return this;
}

/**
* Set the input channel on which replies from workers will be received.
* The provided input channel will be set as a reply channel on the
Expand Down Expand Up @@ -175,11 +160,14 @@ public RemoteChunkingMasterStepBuilder<I, O> throttleLimit(long throttleLimit) {
/**
* Build a master {@link TaskletStep}.
*
* <p>Any item writer set using {@link RemoteChunkingMasterStepBuilder#writer(ItemWriter)}
* will be replaced with a {@link ChunkMessageChannelItemWriter}.</p>
*
* @return the configured master step
* @see RemoteChunkHandlerFactoryBean
*/
public TaskletStep build() {

Assert.notNull(this.beanDefinitionRegistry, "A BeanDefinitionRegistry must be provided");
Assert.notNull(this.inputChannel, "An InputChannel must be provided");
Assert.notNull(this.outputChannel, "An OutputChannel must be provided");

Expand All @@ -195,15 +183,22 @@ public TaskletStep build() {
chunkMessageChannelItemWriter.setMaxWaitTimeouts(this.maxWaitTimeouts);
chunkMessageChannelItemWriter.setThrottleLimit(this.throttleLimit);
chunkMessageChannelItemWriter.setReplyChannel(this.inputChannel);
super.writer(chunkMessageChannelItemWriter);

// register remote chunk handler factory bean
// the item writer will be replaced anyway with a
// chunkMessageChannelItemWriter by the RemoteChunkHandlerFactoryBean
super.writer(items -> { });
TaskletStep step = super.build();
BeanDefinition remoteChunkHandlerFactoryBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RemoteChunkHandlerFactoryBean.class)
.addPropertyValue("chunkWriter", chunkMessageChannelItemWriter)
.addPropertyValue("step", step)
.getBeanDefinition();
this.beanDefinitionRegistry.registerBeanDefinition(step.getName() + "_remoteChunkHandler", remoteChunkHandlerFactoryBeanDefinition);

RemoteChunkHandlerFactoryBean<O> remoteChunkHandlerFactoryBean = new RemoteChunkHandlerFactoryBean<>();
remoteChunkHandlerFactoryBean.setChunkWriter(chunkMessageChannelItemWriter);
remoteChunkHandlerFactoryBean.setStep(step);
try {
// the ChunkProcessorChunkHandler created here is already a message
// endpoint that is automatically configured as a service activator
remoteChunkHandlerFactoryBean.getObject();
} catch (Exception e) {
throw new BeanCreationException("Unable to create a ChunkProcessorChunkHandler bean.", e);
}

return step;
}
Expand Down Expand Up @@ -359,6 +354,14 @@ public RemoteChunkingMasterStepBuilder<I, O> chunk(CompletionPolicy completionPo
return this;
}

/**
* The item writer set using this setter will be replaced with a {@link ChunkMessageChannelItemWriter}.
*
* @param writer an item writer
* @return this builder instance for fluent chaining
* @see ChunkMessageChannelItemWriter
* @see RemoteChunkHandlerFactoryBean#setChunkWriter(ItemWriter)
*/
@Override
public RemoteChunkingMasterStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
super.writer(writer);
Expand Down
Expand Up @@ -16,13 +16,11 @@
package org.springframework.batch.integration.chunk;

import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.transaction.PlatformTransactionManager;

/**
* Convenient factory for a {@link RemoteChunkingMasterStepBuilder} which sets
* the {@link JobRepository}, {@link PlatformTransactionManager} and
* {@link BeanDefinitionRegistry} automatically.
* the {@link JobRepository} and {@link PlatformTransactionManager} automatically.
*
* @since 4.1
* @author Mahmoud Ben Hassine
Expand All @@ -33,36 +31,30 @@ public class RemoteChunkingMasterStepBuilderFactory {

private PlatformTransactionManager transactionManager;

private BeanDefinitionRegistry beanDefinitionRegistry;

/**
* Create a new {@link RemoteChunkingMasterStepBuilderFactory}.
*
* @param jobRepository the job repository to use
* @param transactionManager the transaction manager to use
* @param beanDefinitionRegistry the bean definition registry to use
*/
public RemoteChunkingMasterStepBuilderFactory(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
BeanDefinitionRegistry beanDefinitionRegistry) {
PlatformTransactionManager transactionManager) {
this.jobRepository = jobRepository;
this.transactionManager = transactionManager;
this.beanDefinitionRegistry = beanDefinitionRegistry;
}

/**
* Creates a {@link RemoteChunkingMasterStepBuilder} and initializes its job
* repository, transaction manager and bean definition registry.
* repository and transaction manager.
*
* @param name the name of the step
* @return a {@link RemoteChunkingMasterStepBuilder}
*/
public <I, O> RemoteChunkingMasterStepBuilder<I, O> get(String name) {
return new RemoteChunkingMasterStepBuilder<I, O>(name)
.repository(this.jobRepository)
.transactionManager(this.transactionManager)
.beanDefinitionRegistry(this.beanDefinitionRegistry);
.transactionManager(this.transactionManager);
}

}
Expand Up @@ -18,11 +18,7 @@
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
Expand All @@ -34,14 +30,12 @@
* @author Mahmoud Ben Hassine
*/
@Configuration
public class BatchIntegrationConfiguration implements ApplicationContextAware {
public class BatchIntegrationConfiguration {

private JobRepository jobRepository;

private PlatformTransactionManager transactionManager;

private BeanDefinitionRegistry beanDefinitionRegistry;

@Autowired
public BatchIntegrationConfiguration(
JobRepository jobRepository,
Expand All @@ -53,17 +47,12 @@ public BatchIntegrationConfiguration(
@Bean
public RemoteChunkingMasterStepBuilderFactory remoteChunkingMasterStepBuilderFactory() {
return new RemoteChunkingMasterStepBuilderFactory(this.jobRepository,
this.transactionManager, this.beanDefinitionRegistry);
this.transactionManager);
}

@Bean
public <I,O> RemoteChunkingWorkerBuilder<I, O> remoteChunkingWorkerBuilder() {
return new RemoteChunkingWorkerBuilder<>();
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.beanDefinitionRegistry = (BeanDefinitionRegistry) applicationContext;
}

}
Expand Up @@ -29,20 +29,18 @@
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.PlatformTransactionManager;

/**
* @author Mahmoud Ben Hassine
*/
@RunWith(SpringJUnit4ClassRunner.class)
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {RemoteChunkingMasterStepBuilderTest.BatchConfiguration.class})
public class RemoteChunkingMasterStepBuilderTest {

Expand All @@ -54,37 +52,6 @@ public class RemoteChunkingMasterStepBuilderTest {
@Autowired
private PlatformTransactionManager transactionManager;

private ItemReader<String> itemReader = new ListItemReader<>(Arrays.asList("a", "b", "c"));

@Test
public void testMandatoryBeanDefinitionRegistry() {
// given
RemoteChunkingMasterStepBuilder<String, String> builder = new RemoteChunkingMasterStepBuilder<>("step");
this.expectedException.expect(IllegalArgumentException.class);
this.expectedException.expectMessage("A BeanDefinitionRegistry must be provided");

// when
builder.build();

// then
// expected exception
}

@Test
public void beanDefinitionRegistryMustNotBeNull() {
// given
this.expectedException.expect(IllegalArgumentException.class);
this.expectedException.expectMessage("beanDefinitionRegistry must not be null");

// when
TaskletStep step = new RemoteChunkingMasterStepBuilder<String, String>("step")
.beanDefinitionRegistry(null)
.build();

// then
// expected exception
}

@Test
public void inputChannelMustNotBeNull() {
// given
Expand Down Expand Up @@ -163,8 +130,7 @@ public void throttleLimitMustNotBeGreaterThanZero() {
@Test
public void testMandatoryInputChannel() {
// given
RemoteChunkingMasterStepBuilder<String, String> builder = new RemoteChunkingMasterStepBuilder<String, String>("step")
.beanDefinitionRegistry(new GenericApplicationContext());
RemoteChunkingMasterStepBuilder<String, String> builder = new RemoteChunkingMasterStepBuilder<>("step");

this.expectedException.expect(IllegalArgumentException.class);
this.expectedException.expectMessage("An InputChannel must be provided");
Expand All @@ -180,7 +146,6 @@ public void testMandatoryInputChannel() {
public void testMandatoryOutputChannel() {
// given
RemoteChunkingMasterStepBuilder<String, String> builder = new RemoteChunkingMasterStepBuilder<String, String>("step")
.beanDefinitionRegistry(new GenericApplicationContext())
.inputChannel(new QueueChannel());

this.expectedException.expect(IllegalArgumentException.class);
Expand All @@ -194,25 +159,23 @@ public void testMandatoryOutputChannel() {
}

@Test
public void testRemoteChunkHandlerRegistration() {
public void testMasterStepCreation() {
// given
BeanDefinitionRegistry beanDefinitionRegistry = new GenericApplicationContext();
ItemReader<String> itemReader = new ListItemReader<>(Arrays.asList("a", "b", "c"));
PollableChannel inputChannel = new QueueChannel();
DirectChannel outputChannel = new DirectChannel();

// when
TaskletStep taskletStep = new RemoteChunkingMasterStepBuilder<String, String>("step")
.reader(this.itemReader)
.reader(itemReader)
.repository(this.jobRepository)
.transactionManager(this.transactionManager)
.beanDefinitionRegistry(beanDefinitionRegistry)
.inputChannel(inputChannel)
.outputChannel(outputChannel)
.build();

// then
Assert.assertNotNull(taskletStep);
Assert.assertTrue(beanDefinitionRegistry.containsBeanDefinition("step_remoteChunkHandler"));
}

@Configuration
Expand Down

0 comments on commit 94bd4d1

Please sign in to comment.