Skip to content

Commit

Permalink
BATCH-2686: Address Michael's remarks:
Browse files Browse the repository at this point in the history
- Add missing assertions on input channel and output channel
- Use this keyword for local fields
- Rename RemoteChunkingWorkerFlowBuilder to RemoteChunkingWorkerBuilder
  • Loading branch information
fmbenhassine committed May 2, 2018
1 parent 429f567 commit 0b6d812
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 36 deletions.
14 changes: 7 additions & 7 deletions spring-batch-docs/asciidoc/spring-batch-integration.adoc
Expand Up @@ -953,7 +953,7 @@ annotation that can be used to simplify remote chunking setup. This annotation p
two beans that can be autowired in the application context:

* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step
* `RemoteChunkingWorkerFlowBuilder`: used to configure the remote worker flow
* `RemoteChunkingWorkerBuilder`: used to configure the remote worker integration flow

The following example shows how to use these APIs:

Expand All @@ -970,7 +970,7 @@ public class RemoteChunkingConfiguration {
@Bean
public TaskletStep masterStep() {
return masterStepBuilderFactory.get("masterStep")
return this.masterStepBuilderFactory.get("masterStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
Expand All @@ -986,11 +986,11 @@ public class RemoteChunkingConfiguration {
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerFlowBuilder workerFlowBuilder;
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow(DirectChannel requests, DirectChannel replies){
return workerFlowBuilder
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests) // requests received from the master
Expand All @@ -1004,13 +1004,13 @@ public class RemoteChunkingConfiguration {
}
----

On the master side, the `RemoteChunkingMasterStepBuilderFactory` makes it possible
to configure a master step by declaring the item reader, the output channel
On the master side, the `RemoteChunkingMasterStepBuilderFactory` allows you to
configure a master step by declaring the item reader, the output channel
(to send requests to workers) and the input channel (to receive replies from workers).
There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
and `RemoteChunkHandlerFactoryBean`.

On the worker side, the `RemoteChunkingWorkerFlowBuilder` allows you to configure
On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure
the worker flow to listens to requests sent by the master on the input channel,
call the `ChunkProcessorChunkHandler` for each request with the configured `ItemProcessor`
and `ItemWriter` and finally send replies on the output channel to the master.
Expand Down
Expand Up @@ -163,12 +163,14 @@ public void throttleLimit(long throttleLimit) {
public TaskletStep build() {

Assert.notNull(applicationContext, "A GenericApplicationContext must be provided");
Assert.notNull(inputChannel, "inputChannel must not be null");
Assert.notNull(outputChannel, "outputChannel must not be null");

// configure messaging template
if (messagingTemplate == null) {
messagingTemplate = new MessagingTemplate();
if (this.messagingTemplate == null) {
this.messagingTemplate = new MessagingTemplate();
}
messagingTemplate.setDefaultChannel(outputChannel);
this.messagingTemplate.setDefaultChannel(outputChannel);

// configure item writer
ChunkMessageChannelItemWriter<O> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
Expand All @@ -184,7 +186,7 @@ public TaskletStep build() {
.addPropertyValue("chunkWriter", chunkMessageChannelItemWriter)
.addPropertyValue("step", step)
.getBeanDefinition();
applicationContext.registerBeanDefinition(step.getName() + "_remoteChunkHandler", remoteChunkHandlerFactoryBeanDefinition);
this.applicationContext.registerBeanDefinition(step.getName() + "_remoteChunkHandler", remoteChunkHandlerFactoryBeanDefinition);

return step;
}
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.springframework.util.Assert;

/**
* Builder for a worker flow in a remote chunking setup. This builder:
* Builder for a worker in a remote chunking setup. This builder:
*
* <ul>
* <li>creates a {@link ChunkProcessorChunkHandler} with the provided
Expand All @@ -44,7 +44,7 @@
* @author Mahmoud Ben Hassine
*/

public class RemoteChunkingWorkerFlowBuilder<I, O> {
public class RemoteChunkingWorkerBuilder<I, O> {

private static final String SERVICE_ACTIVATOR_METHOD_NAME = "handleChunk";

Expand All @@ -60,7 +60,7 @@ public class RemoteChunkingWorkerFlowBuilder<I, O> {
* @param itemProcessor to use
* @return this builder instance for fluent chaining
*/
public RemoteChunkingWorkerFlowBuilder<I, O> itemProcessor(ItemProcessor<I, O> itemProcessor) {
public RemoteChunkingWorkerBuilder<I, O> itemProcessor(ItemProcessor<I, O> itemProcessor) {
Assert.notNull(itemProcessor, "itemProcessor must not be null");
this.itemProcessor = itemProcessor;
return this;
Expand All @@ -72,7 +72,7 @@ public RemoteChunkingWorkerFlowBuilder<I, O> itemProcessor(ItemProcessor<I, O> i
* @param itemWriter to use
* @return this builder instance for fluent chaining
*/
public RemoteChunkingWorkerFlowBuilder<I, O> itemWriter(ItemWriter<O> itemWriter) {
public RemoteChunkingWorkerBuilder<I, O> itemWriter(ItemWriter<O> itemWriter) {
Assert.notNull(itemWriter, "itemWriter must not be null");
this.itemWriter = itemWriter;
return this;
Expand All @@ -84,7 +84,7 @@ public RemoteChunkingWorkerFlowBuilder<I, O> itemWriter(ItemWriter<O> itemWriter
* @param inputChannel the input channel
* @return this builder instance for fluent chaining
*/
public RemoteChunkingWorkerFlowBuilder<I, O> inputChannel(DirectChannel inputChannel) {
public RemoteChunkingWorkerBuilder<I, O> inputChannel(DirectChannel inputChannel) {
Assert.notNull(inputChannel, "inputChannel must not be null");
this.inputChannel = inputChannel;
return this;
Expand All @@ -96,7 +96,7 @@ public RemoteChunkingWorkerFlowBuilder<I, O> inputChannel(DirectChannel inputCha
* @param outputChannel the output channel
* @return this builder instance for fluent chaining
*/
public RemoteChunkingWorkerFlowBuilder<I, O> outputChannel(DirectChannel outputChannel) {
public RemoteChunkingWorkerBuilder<I, O> outputChannel(DirectChannel outputChannel) {
Assert.notNull(outputChannel, "outputChannel must not be null");
this.outputChannel = outputChannel;
return this;
Expand All @@ -113,8 +113,8 @@ public IntegrationFlow build() {

Assert.notNull(itemWriter, "An ItemWriter must be provided");

if(itemProcessor == null) {
itemProcessor = new PassThroughItemProcessor();
if(this.itemProcessor == null) {
this.itemProcessor = new PassThroughItemProcessor();
}
SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(itemProcessor, itemWriter);

Expand Down
Expand Up @@ -17,7 +17,7 @@

import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerFlowBuilder;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -55,8 +55,8 @@ public RemoteChunkingMasterStepBuilderFactory remoteChunkingMasterStepBuilderFac
}

@Bean
public <I,O> RemoteChunkingWorkerFlowBuilder<I, O> remoteChunkingWorkerFlowBuilder() {
return new RemoteChunkingWorkerFlowBuilder<>();
public <I,O> RemoteChunkingWorkerBuilder<I, O> remoteChunkingWorkerBuilder() {
return new RemoteChunkingWorkerBuilder<>();
}

}
Expand Up @@ -15,7 +15,7 @@
*/
package org.springframework.batch.integration.config.annotation;

import org.springframework.batch.integration.chunk.RemoteChunkingWorkerFlowBuilder;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder;
import org.springframework.context.annotation.Import;

import java.lang.annotation.Documented;
Expand All @@ -35,7 +35,7 @@
* <li>{@link org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory}:
* used to create a master step by automatically setting up the job repository,
* transaction manager and application context.</li>
* <li>{@link RemoteChunkingWorkerFlowBuilder}:
* <li>{@link RemoteChunkingWorkerBuilder}:
* used to create the integration flow on the worker side.</li>
* </ul>
*
Expand All @@ -50,11 +50,11 @@
* private RemoteChunkingMasterStepBuilderFactory masterBuilderFactory;
*
* &#064;Autowired
* private RemoteChunkingWorkerFlowBuilder workerBuilder;
* private RemoteChunkingWorkerBuilder workerBuilder;
*
* &#064;Bean
* public Step master() {
* masterBuilderFactory
* this.masterBuilderFactory
* .get("masterStep")
* .reader(itemReader())
* .outputChannel(outgoingRequestsToWorkers())
Expand All @@ -64,7 +64,7 @@
*
* &#064;Bean
* public IntegrationFlow worker() {
* workerBuilder
* this.workerBuilder
* .itemProcessor(itemProcessor())
* .itemWriter(itemWriter())
* .inputChannel(incomingRequestsFromMaster())
Expand Down
Expand Up @@ -28,7 +28,7 @@
/**
* @author Mahmoud Ben Hassine
*/
public class RemoteChunkingWorkerFlowBuilderTest {
public class RemoteChunkingWorkerBuilderTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();
Expand All @@ -39,7 +39,7 @@ public class RemoteChunkingWorkerFlowBuilderTest {
@Test
public void testMandatoryItemWriter() {
// given
RemoteChunkingWorkerFlowBuilder<String, String> builder = new RemoteChunkingWorkerFlowBuilder<>();
RemoteChunkingWorkerBuilder<String, String> builder = new RemoteChunkingWorkerBuilder<>();

expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("An ItemWriter must be provided");
Expand All @@ -56,7 +56,7 @@ public void testIntegrationFlowCreation() {
// given
DirectChannel inputChannel = new DirectChannel();
DirectChannel outputChannel = new DirectChannel();
RemoteChunkingWorkerFlowBuilder<String, String> builder = new RemoteChunkingWorkerFlowBuilder<String, String>()
RemoteChunkingWorkerBuilder<String, String> builder = new RemoteChunkingWorkerBuilder<String, String>()
.itemProcessor(itemProcessor)
.itemWriter(itemWriter)
.inputChannel(inputChannel)
Expand Down
2 changes: 1 addition & 1 deletion spring-batch-samples/README.md
Expand Up @@ -649,7 +649,7 @@ not get shared across threads of execution.

This sample shows how to configure a remote chunking job. The sample uses
the `RemoteChunkingMasterStepBuilderFactory` to create a master step and
the `RemoteChunkingWorkerFlowBuilder` to configure an integration flow on
the `RemoteChunkingWorkerBuilder` to configure an integration flow on
the worker side. This example also shows how to use rabbitmq as a communication
middleware between the master and workers.

Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerFlowBuilder;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
Expand Down Expand Up @@ -64,7 +64,7 @@ public static void main(String[] args) {
}

@Autowired
private RemoteChunkingWorkerFlowBuilder<Integer, Integer> workerFlowBuilder;
private RemoteChunkingWorkerBuilder<Integer, Integer> workerBuilder;

@Bean
public ConnectionFactory rabbitConnectionFactory() {
Expand Down Expand Up @@ -139,8 +139,8 @@ public IntegrationFlow outgoingReplies(AmqpTemplate rabbitTemplate) {
}

@Bean
public IntegrationFlow integrationFlow(DirectChannel requests, DirectChannel replies) {
return workerFlowBuilder
public IntegrationFlow workerIntegrationFlow(DirectChannel requests, DirectChannel replies) {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests)
Expand Down
Expand Up @@ -148,7 +148,7 @@ public IntegrationFlow inboundFlow(ConnectionFactory rabbitConnectionFactory, Po

@Bean
public TaskletStep masterStep() {
return masterStepBuilderFactory.get("masterStep")
return this.masterStepBuilderFactory.get("masterStep")
.<Integer, Integer>chunk(3)
.reader(itemReader())
.outputChannel(requests())
Expand Down

0 comments on commit 0b6d812

Please sign in to comment.