Skip to content

Commit

Permalink
Add builders for master/worker beans in remote chunking setup
Browse files Browse the repository at this point in the history
This commit adds a new annotation `@EnableBatchIntegration` that makes
it possible to autowire two beans in the application context:

* RemoteChunkingMasterStepBuilderFactory: used to create a master step
* RemoteChunkingWorkerBuilder: used to create a worker integration flow

The goal of these new APIs is to simplify the setup of remote chunking
job by automatically configuring infrastructure beans.

Resolves BATCH-2686
  • Loading branch information
fmbenhassine committed May 18, 2018
1 parent 69b32e5 commit 9c45f65
Show file tree
Hide file tree
Showing 12 changed files with 1,280 additions and 38 deletions.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
88 changes: 82 additions & 6 deletions spring-batch-docs/asciidoc/spring-batch-integration.adoc
Expand Up @@ -771,7 +771,6 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
Expand All @@ -788,7 +787,6 @@ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory)
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies() {
return new QueueChannel();
Expand All @@ -805,7 +803,6 @@ public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory)
/*
* Configure the ChunkMessageChannelItemWriter
*/
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
Expand Down Expand Up @@ -883,7 +880,6 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
/*
* Configure inbound flow (requests coming from the master)
*/
@Bean
public DirectChannel requests() {
return new DirectChannel();
Expand All @@ -900,7 +896,6 @@ public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory)
/*
* Configure outbound flow (replies going to the master)
*/
@Bean
public DirectChannel replies() {
return new DirectChannel();
Expand All @@ -917,7 +912,6 @@ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory)
/*
* Configure the ChunkProcessorChunkHandler
*/
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
Expand All @@ -944,6 +938,88 @@ when it receives chunks from the master.
For more information, see the section of the "Scalability" chapter on
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].

Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration`
annotation that can be used to simplify remote chunking setup. This annotation provides
two beans that can be autowired in the application context:

* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step
* `RemoteChunkingWorkerBuilder`: used to configure the remote worker integration flow

These APIs will take care of configuring a number of components as described in the following diagram:

.Remote Chunking Configuration
image::{batch-asciidoc}images/remote-chunking-config.png[Remote Chunking Configuration, scaledwidth="80%"]

On the master side, the `RemoteChunkingMasterStepBuilderFactory` allows you to
configure a master step by declaring:

* the item reader to read items and send them to workers
* the output channel ("Outgoing requests") to send requests to workers
* the input channel ("Incoming replies") to receive replies from workers

There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
and the `MessagingTemplate` (Those can still be explicitly configured if required).

On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a worker to:

* listen to requests sent by the master on the input channel ("Incoming requests")
* call the `handleChunk` method of `ChunkProcessorChunkHandler` for each request
with the configured `ItemProcessor` and `ItemWriter`
* send replies on the output channel ("Outgoing replies") to the master

There is no need anymore to explicitly configure the `SimpleChunkProcessor`
and the `ChunkProcessorChunkHandler` (Those can still be explicitly configured if required).

The following example shows how to use these APIs:

[source, java]
----
@EnableBatchIntegration
public class RemoteChunkingJobConfiguration {
@Configuration
public static class MasterConfiguration {
@Autowired
private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;
@Bean
public TaskletStep masterStep() {
return this.masterStepBuilderFactory.get("masterStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder workerBuilder;
@Bean
public IntegrationFlow workerFlow() {
return this.workerBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests()) // requests received from the master
.outputChannel(replies()) // replies sent to the master
.build();
}
// Middleware beans setup omitted
}
}
----

You can find a complete example of a remote chunking job
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here].

Expand Down

0 comments on commit 9c45f65

Please sign in to comment.