diff --git a/spring-batch-docs/asciidoc/images/remote-chunking-config.png b/spring-batch-docs/asciidoc/images/remote-chunking-config.png new file mode 100644 index 0000000000..53d3ac6a5d Binary files /dev/null and b/spring-batch-docs/asciidoc/images/remote-chunking-config.png differ diff --git a/spring-batch-docs/asciidoc/spring-batch-integration.adoc b/spring-batch-docs/asciidoc/spring-batch-integration.adoc index 5eba5b80f3..d3ae66f8e7 100644 --- a/spring-batch-docs/asciidoc/spring-batch-integration.adoc +++ b/spring-batch-docs/asciidoc/spring-batch-integration.adoc @@ -771,7 +771,6 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { /* * Configure outbound flow (requests going to workers) */ - @Bean public DirectChannel requests() { return new DirectChannel(); @@ -788,7 +787,6 @@ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) /* * Configure inbound flow (replies coming from workers) */ - @Bean public QueueChannel replies() { return new QueueChannel(); @@ -805,7 +803,6 @@ public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) /* * Configure the ChunkMessageChannelItemWriter */ - @Bean public ItemWriter itemWriter() { MessagingTemplate messagingTemplate = new MessagingTemplate(); @@ -883,7 +880,6 @@ public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { /* * Configure inbound flow (requests coming from the master) */ - @Bean public DirectChannel requests() { return new DirectChannel(); @@ -900,7 +896,6 @@ public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) /* * Configure outbound flow (replies going to the master) */ - @Bean public DirectChannel replies() { return new DirectChannel(); @@ -917,7 +912,6 @@ public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) /* * Configure the ChunkProcessorChunkHandler */ - @Bean @ServiceActivator(inputChannel = "requests", outputChannel = "replies") public ChunkProcessorChunkHandler chunkProcessorChunkHandler() { @@ -944,6 +938,88 @@ when it receives chunks from the master. For more information, see the section of the "Scalability" chapter on link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking]. +Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration` +annotation that can be used to simplify remote chunking setup. This annotation provides +two beans that can be autowired in the application context: + +* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step +* `RemoteChunkingWorkerBuilder`: used to configure the remote worker integration flow + +These APIs will take care of configuring a number of components as described in the following diagram: + +.Remote Chunking Configuration +image::{batch-asciidoc}images/remote-chunking-config.png[Remote Chunking Configuration, scaledwidth="80%"] + +On the master side, the `RemoteChunkingMasterStepBuilderFactory` allows you to +configure a master step by declaring: + +* the item reader to read items and send them to workers +* the output channel ("Outgoing requests") to send requests to workers +* the input channel ("Incoming replies") to receive replies from workers + +There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter` +and the `MessagingTemplate` (Those can still be explicitly configured if required). + +On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a worker to: + +* listen to requests sent by the master on the input channel ("Incoming requests") +* call the `handleChunk` method of `ChunkProcessorChunkHandler` for each request +with the configured `ItemProcessor` and `ItemWriter` +* send replies on the output channel ("Outgoing replies") to the master + +There is no need anymore to explicitly configure the `SimpleChunkProcessor` +and the `ChunkProcessorChunkHandler` (Those can still be explicitly configured if required). + +The following example shows how to use these APIs: + +[source, java] +---- +@EnableBatchIntegration +public class RemoteChunkingJobConfiguration { + + @Configuration + public static class MasterConfiguration { + + @Autowired + private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory; + + @Bean + public TaskletStep masterStep() { + return this.masterStepBuilderFactory.get("masterStep") + .chunk(100) + .reader(itemReader()) + .outputChannel(requests()) // requests sent to workers + .inputChannel(replies()) // replies received from workers + .build(); + } + + // Middleware beans setup omitted + + } + + @Configuration + public static class WorkerConfiguration { + + @Autowired + private RemoteChunkingWorkerBuilder workerBuilder; + + @Bean + public IntegrationFlow workerFlow() { + return this.workerBuilder + .itemProcessor(itemProcessor()) + .itemWriter(itemWriter()) + .inputChannel(requests()) // requests received from the master + .outputChannel(replies()) // replies sent to the master + .build(); + } + + // Middleware beans setup omitted + + } + +} +---- + You can find a complete example of a remote chunking job link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample$$[here]. diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilder.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilder.java new file mode 100644 index 0000000000..a87f9182a5 --- /dev/null +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilder.java @@ -0,0 +1,403 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.chunk; + +import org.springframework.batch.core.ChunkListener; +import org.springframework.batch.core.ItemReadListener; +import org.springframework.batch.core.ItemWriteListener; +import org.springframework.batch.core.SkipListener; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.core.step.item.KeyGenerator; +import org.springframework.batch.core.step.skip.SkipPolicy; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemStream; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.CompletionPolicy; +import org.springframework.batch.repeat.RepeatOperations; +import org.springframework.batch.repeat.exception.ExceptionHandler; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessagingTemplate; +import org.springframework.messaging.PollableChannel; +import org.springframework.retry.RetryPolicy; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.policy.RetryContextCache; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.interceptor.TransactionAttribute; +import org.springframework.util.Assert; + +/** + * Builder for a master step in a remote chunking setup. This builder creates and + * sets a {@link ChunkMessageChannelItemWriter} on the master step. + * + * If no messagingTemplate is provided through + * {@link RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)}, + * this builder will create one. The outputChannel set with + * {@link RemoteChunkingMasterStepBuilder#outputChannel(DirectChannel)} will be + * used as a default channel of the messaging template and will override any default + * channel already set on the messaging template. + * + * @param type of input items + * @param type of output items + * + * @since 4.1 + * @author Mahmoud Ben Hassine + */ +public class RemoteChunkingMasterStepBuilder extends FaultTolerantStepBuilder { + + private MessagingTemplate messagingTemplate; + private PollableChannel inputChannel; + private DirectChannel outputChannel; + + private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40; + private static final long DEFAULT_THROTTLE_LIMIT = 6; + private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS; + private long throttleLimit = DEFAULT_THROTTLE_LIMIT; + + /** + * Create a new {@link RemoteChunkingMasterStepBuilder}. + * + * @param stepName name of the master step + */ + public RemoteChunkingMasterStepBuilder(String stepName) { + super(new StepBuilder(stepName)); + } + + /** + * Set the input channel on which replies from workers will be received. + * The provided input channel will be set as a reply channel on the + * {@link ChunkMessageChannelItemWriter} created by this builder. + * + * @param inputChannel the input channel + * @return this builder instance for fluent chaining + * + * @see ChunkMessageChannelItemWriter#setReplyChannel + */ + public RemoteChunkingMasterStepBuilder inputChannel(PollableChannel inputChannel) { + Assert.notNull(inputChannel, "inputChannel must not be null"); + this.inputChannel = inputChannel; + return this; + } + + /** + * Set the output channel on which requests to workers will be sent. + * The output channel will be set as a default channel on the provided + * {@link MessagingTemplate} trough {@link RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)} + * (or the one created by this builder if no messaging template is provided). + * + * @param outputChannel the output channel. + * @return this builder instance for fluent chaining + * + * @see RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate) + */ + public RemoteChunkingMasterStepBuilder outputChannel(DirectChannel outputChannel) { + Assert.notNull(outputChannel, "outputChannel must not be null"); + this.outputChannel = outputChannel; + return this; + } + + /** + * Set the {@link MessagingTemplate} to use to send data to workers. + * + *

The default destination of the messaging template will be + * overridden by the output channel provided through + * {@link RemoteChunkingMasterStepBuilder#outputChannel(DirectChannel)}.

+ * + * @param messagingTemplate the messaging template to use + * @return this builder instance for fluent chaining + */ + public RemoteChunkingMasterStepBuilder messagingTemplate(MessagingTemplate messagingTemplate) { + Assert.notNull(messagingTemplate, "messagingTemplate must not be null"); + this.messagingTemplate = messagingTemplate; + return this; + } + + /** + * The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a + * multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing + * slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40. + * + * @param maxWaitTimeouts the maximum number of wait timeouts + * @see ChunkMessageChannelItemWriter#setMaxWaitTimeouts(int) + */ + public RemoteChunkingMasterStepBuilder maxWaitTimeouts(int maxWaitTimeouts) { + Assert.isTrue(maxWaitTimeouts > 0, "maxWaitTimeouts must be greater than zero"); + this.maxWaitTimeouts = maxWaitTimeouts; + return this; + } + + /** + * Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid + * overwhelming the receivers. + * + * @param throttleLimit the throttle limit to set + * @see ChunkMessageChannelItemWriter#setThrottleLimit(long) + */ + public RemoteChunkingMasterStepBuilder throttleLimit(long throttleLimit) { + Assert.isTrue(throttleLimit > 0, "throttleLimit must be greater than zero"); + this.throttleLimit = throttleLimit; + return this; + } + + /** + * Build a master {@link TaskletStep}. + * + * @return the configured master step + * @see RemoteChunkHandlerFactoryBean + */ + public TaskletStep build() { + Assert.notNull(this.inputChannel, "An InputChannel must be provided"); + Assert.notNull(this.outputChannel, "An OutputChannel must be provided"); + + // configure messaging template + if (this.messagingTemplate == null) { + this.messagingTemplate = new MessagingTemplate(); + } + this.messagingTemplate.setDefaultChannel(this.outputChannel); + + // configure item writer + ChunkMessageChannelItemWriter chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); + chunkMessageChannelItemWriter.setMessagingOperations(this.messagingTemplate); + chunkMessageChannelItemWriter.setMaxWaitTimeouts(this.maxWaitTimeouts); + chunkMessageChannelItemWriter.setThrottleLimit(this.throttleLimit); + chunkMessageChannelItemWriter.setReplyChannel(this.inputChannel); + super.writer(chunkMessageChannelItemWriter); + + return super.build(); + } + + /* + * The following methods override those from parent builders and return + * the current builder type. + * FIXME: Change parent builders to be generic and return current builder + * type in each method. + */ + + @Override + public RemoteChunkingMasterStepBuilder reader(ItemReader reader) { + super.reader(reader); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder repository(JobRepository jobRepository) { + super.repository(jobRepository); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder transactionManager(PlatformTransactionManager transactionManager) { + super.transactionManager(transactionManager); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(Object listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(SkipListener listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(ChunkListener listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder transactionAttribute(TransactionAttribute transactionAttribute) { + super.transactionAttribute(transactionAttribute); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(org.springframework.retry.RetryListener listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder keyGenerator(KeyGenerator keyGenerator) { + super.keyGenerator(keyGenerator); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder retryLimit(int retryLimit) { + super.retryLimit(retryLimit); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder retryPolicy(RetryPolicy retryPolicy) { + super.retryPolicy(retryPolicy); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder backOffPolicy(BackOffPolicy backOffPolicy) { + super.backOffPolicy(backOffPolicy); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder retryContextCache(RetryContextCache retryContextCache) { + super.retryContextCache(retryContextCache); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder skipLimit(int skipLimit) { + super.skipLimit(skipLimit); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder noSkip(Class type) { + super.noSkip(type); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder skip(Class type) { + super.skip(type); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder skipPolicy(SkipPolicy skipPolicy) { + super.skipPolicy(skipPolicy); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder noRollback(Class type) { + super.noRollback(type); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder noRetry(Class type) { + super.noRetry(type); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder retry(Class type) { + super.retry(type); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder stream(ItemStream stream) { + super.stream(stream); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder chunk(int chunkSize) { + super.chunk(chunkSize); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder chunk(CompletionPolicy completionPolicy) { + super.chunk(completionPolicy); + return this; + } + + /** + * This method will throw a {@link UnsupportedOperationException} since + * the item writer of the master step in a remote chunking setup will be + * automatically set to an instance of {@link ChunkMessageChannelItemWriter}. + * + * When building a master step for remote chunking, no item writer must be + * provided. + * + * @throws UnsupportedOperationException if an item writer is provided + * @see ChunkMessageChannelItemWriter + * @see RemoteChunkHandlerFactoryBean#setChunkWriter(ItemWriter) + */ + @Override + public RemoteChunkingMasterStepBuilder writer(ItemWriter writer) throws UnsupportedOperationException { + throw new UnsupportedOperationException("When configuring a master step " + + "for remote chunking, the item writer will be automatically set " + + "to an instance of ChunkMessageChannelItemWriter. The item writer " + + "must not be provided in this case."); + } + + @Override + public RemoteChunkingMasterStepBuilder readerIsTransactionalQueue() { + super.readerIsTransactionalQueue(); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(ItemReadListener listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(ItemWriteListener listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder chunkOperations(RepeatOperations repeatTemplate) { + super.chunkOperations(repeatTemplate); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder exceptionHandler(ExceptionHandler exceptionHandler) { + super.exceptionHandler(exceptionHandler); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder stepOperations(RepeatOperations repeatTemplate) { + super.stepOperations(repeatTemplate); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder startLimit(int startLimit) { + super.startLimit(startLimit); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder listener(StepExecutionListener listener) { + super.listener(listener); + return this; + } + + @Override + public RemoteChunkingMasterStepBuilder allowStartIfComplete(boolean allowStartIfComplete) { + super.allowStartIfComplete(allowStartIfComplete); + return this; + } +} diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderFactory.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderFactory.java new file mode 100644 index 0000000000..a8dca82b3c --- /dev/null +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderFactory.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.chunk; + +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * Convenient factory for a {@link RemoteChunkingMasterStepBuilder} which sets + * the {@link JobRepository} and {@link PlatformTransactionManager} automatically. + * + * @since 4.1 + * @author Mahmoud Ben Hassine + */ +public class RemoteChunkingMasterStepBuilderFactory { + + private JobRepository jobRepository; + + private PlatformTransactionManager transactionManager; + + /** + * Create a new {@link RemoteChunkingMasterStepBuilderFactory}. + * + * @param jobRepository the job repository to use + * @param transactionManager the transaction manager to use + */ + public RemoteChunkingMasterStepBuilderFactory( + JobRepository jobRepository, + PlatformTransactionManager transactionManager) { + + this.jobRepository = jobRepository; + this.transactionManager = transactionManager; + } + + /** + * Creates a {@link RemoteChunkingMasterStepBuilder} and initializes its job + * repository and transaction manager. + * + * @param name the name of the step + * @return a {@link RemoteChunkingMasterStepBuilder} + */ + public RemoteChunkingMasterStepBuilder get(String name) { + return new RemoteChunkingMasterStepBuilder(name) + .repository(this.jobRepository) + .transactionManager(this.transactionManager); + } + +} diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilder.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilder.java new file mode 100644 index 0000000000..c19e3f2857 --- /dev/null +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilder.java @@ -0,0 +1,131 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.chunk; + +import org.springframework.batch.core.step.item.SimpleChunkProcessor; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.PassThroughItemProcessor; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.util.Assert; + +/** + * Builder for a worker in a remote chunking setup. This builder: + * + *
    + *
  • creates a {@link ChunkProcessorChunkHandler} with the provided + * item processor and writer. If no item processor is provided, a + * {@link PassThroughItemProcessor} will be used
  • + *
  • creates an {@link IntegrationFlow} with the + * {@link ChunkProcessorChunkHandler} as a service activator which listens + * to incoming requests on inputChannel and sends replies + * on outputChannel
  • + *
+ * + * @param type of input items + * @param type of output items + * + * @since 4.1 + * @author Mahmoud Ben Hassine + */ +public class RemoteChunkingWorkerBuilder { + + private static final String SERVICE_ACTIVATOR_METHOD_NAME = "handleChunk"; + + private ItemProcessor itemProcessor; + private ItemWriter itemWriter; + private DirectChannel inputChannel; + private DirectChannel outputChannel; + + /** + * Set the {@link ItemProcessor} to use to process items sent by the master + * step. + * + * @param itemProcessor to use + * @return this builder instance for fluent chaining + */ + public RemoteChunkingWorkerBuilder itemProcessor(ItemProcessor itemProcessor) { + Assert.notNull(itemProcessor, "itemProcessor must not be null"); + this.itemProcessor = itemProcessor; + return this; + } + + /** + * Set the {@link ItemWriter} to use to write items sent by the master step. + * + * @param itemWriter to use + * @return this builder instance for fluent chaining + */ + public RemoteChunkingWorkerBuilder itemWriter(ItemWriter itemWriter) { + Assert.notNull(itemWriter, "itemWriter must not be null"); + this.itemWriter = itemWriter; + return this; + } + + /** + * Set the input channel on which items sent by the master are received. + * + * @param inputChannel the input channel + * @return this builder instance for fluent chaining + */ + public RemoteChunkingWorkerBuilder inputChannel(DirectChannel inputChannel) { + Assert.notNull(inputChannel, "inputChannel must not be null"); + this.inputChannel = inputChannel; + return this; + } + + /** + * Set the output channel on which replies will be sent to the master step. + * + * @param outputChannel the output channel + * @return this builder instance for fluent chaining + */ + public RemoteChunkingWorkerBuilder outputChannel(DirectChannel outputChannel) { + Assert.notNull(outputChannel, "outputChannel must not be null"); + this.outputChannel = outputChannel; + return this; + } + + /** + * Create an {@link IntegrationFlow} with a {@link ChunkProcessorChunkHandler} + * configured as a service activator listening to the input channel and replying + * on the output channel. + * + * @return the integration flow + */ + public IntegrationFlow build() { + Assert.notNull(this.itemWriter, "An ItemWriter must be provided"); + Assert.notNull(this.inputChannel, "An InputChannel must be provided"); + Assert.notNull(this.outputChannel, "An OutputChannel must be provided"); + + if(this.itemProcessor == null) { + this.itemProcessor = new PassThroughItemProcessor(); + } + SimpleChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(this.itemProcessor, this.itemWriter); + + ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); + chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); + + return IntegrationFlows + .from(this.inputChannel) + .handle(chunkProcessorChunkHandler, SERVICE_ACTIVATOR_METHOD_NAME) + .channel(this.outputChannel) + .get(); + } + +} diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/BatchIntegrationConfiguration.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/BatchIntegrationConfiguration.java new file mode 100644 index 0000000000..087e6401d6 --- /dev/null +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/BatchIntegrationConfiguration.java @@ -0,0 +1,59 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.config.annotation; + +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory; +import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * Base configuration class for Spring Batch Integration factory beans. + * + * @since 4.1 + * @author Mahmoud Ben Hassine + */ +@Configuration +public class BatchIntegrationConfiguration { + + private JobRepository jobRepository; + + private PlatformTransactionManager transactionManager; + + @Autowired + public BatchIntegrationConfiguration( + JobRepository jobRepository, + PlatformTransactionManager transactionManager) { + + this.jobRepository = jobRepository; + this.transactionManager = transactionManager; + } + + @Bean + public RemoteChunkingMasterStepBuilderFactory remoteChunkingMasterStepBuilderFactory() { + return new RemoteChunkingMasterStepBuilderFactory(this.jobRepository, + this.transactionManager); + } + + @Bean + public RemoteChunkingWorkerBuilder remoteChunkingWorkerBuilder() { + return new RemoteChunkingWorkerBuilder<>(); + } + +} diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/EnableBatchIntegration.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/EnableBatchIntegration.java new file mode 100644 index 0000000000..d7c7e8f386 --- /dev/null +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/config/annotation/EnableBatchIntegration.java @@ -0,0 +1,88 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.config.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder; +import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory; +import org.springframework.context.annotation.Import; + +/** + * Enable Spring Batch Integration features and provide a base configuration for + * setting up remote chunking infrastructure beans. + * + * By adding this annotation on a {@link org.springframework.context.annotation.Configuration} + * class, it will be possible to autowire the following beans: + * + *
    + *
  • {@link RemoteChunkingMasterStepBuilderFactory}: + * used to create a master step by automatically setting up the job repository + * and transaction manager.
  • + *
  • {@link RemoteChunkingWorkerBuilder}: used to create the integration + * flow on the worker side.
  • + *
+ * + * For example: + * + *
+ * @Configuration
+ * @EnableBatchIntegration
+ * public class RemoteChunkingAppConfig {
+ *
+ * 	@Autowired
+ * 	private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;
+ *
+ * 	@Autowired
+ * 	private RemoteChunkingWorkerBuilder workerBuilder;
+ *
+ * 	@Bean
+ * 	public TaskletStep masterStep() {
+ *       	 return this.masterStepBuilderFactory
+ *       		.get("masterStep")
+ *       		.chunk(100)
+ *       		.reader(itemReader())
+ *       		.outputChannel(outgoingRequestsToWorkers())
+ *       		.inputChannel(incomingRepliesFromWorkers())
+ *       		.build();
+ * 	}
+ *
+ * 	@Bean
+ * 	public IntegrationFlow worker() {
+ *       	 return this.workerBuilder
+ *       		.itemProcessor(itemProcessor())
+ *       		.itemWriter(itemWriter())
+ *       		.inputChannel(incomingRequestsFromMaster())
+ *       		.outputChannel(outgoingRepliesToMaster())
+ *       		.build();
+ * 	}
+ *
+ * }
+ * 
+ * + * @since 4.1 + * @author Mahmoud Ben Hassine + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Import(BatchIntegrationConfiguration.class) +public @interface EnableBatchIntegration { +} diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderTest.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderTest.java new file mode 100644 index 0000000000..058173c7d7 --- /dev/null +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingMasterStepBuilderTest.java @@ -0,0 +1,264 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.chunk; + +import java.util.Arrays; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; + +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.listener.ChunkListenerSupport; +import org.springframework.batch.core.listener.CompositeItemReadListener; +import org.springframework.batch.core.listener.CompositeItemWriteListener; +import org.springframework.batch.core.listener.SkipListenerSupport; +import org.springframework.batch.core.listener.StepExecutionListenerSupport; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemStreamSupport; +import org.springframework.batch.item.support.ListItemReader; +import org.springframework.batch.repeat.exception.DefaultExceptionHandler; +import org.springframework.batch.repeat.support.RepeatTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.messaging.PollableChannel; +import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.listener.RetryListenerSupport; +import org.springframework.retry.policy.MapRetryContextCache; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.interceptor.DefaultTransactionAttribute; + +/** + * @author Mahmoud Ben Hassine + */ +@RunWith(SpringRunner.class) +@ContextConfiguration(classes = {RemoteChunkingMasterStepBuilderTest.BatchConfiguration.class}) +public class RemoteChunkingMasterStepBuilderTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Autowired + private JobRepository jobRepository; + @Autowired + private PlatformTransactionManager transactionManager; + + private PollableChannel inputChannel = new QueueChannel(); + private DirectChannel outputChannel = new DirectChannel(); + private ItemReader itemReader = new ListItemReader<>(Arrays.asList("a", "b", "c")); + + @Test + public void inputChannelMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("inputChannel must not be null"); + + // when + TaskletStep step = new RemoteChunkingMasterStepBuilder("step") + .inputChannel(null) + .build(); + + // then + // expected exception + } + + @Test + public void outputChannelMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("outputChannel must not be null"); + + // when + TaskletStep step = new RemoteChunkingMasterStepBuilder("step") + .outputChannel(null) + .build(); + + // then + // expected exception + } + + @Test + public void messagingTemplateMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("messagingTemplate must not be null"); + + // when + TaskletStep step = new RemoteChunkingMasterStepBuilder("step") + .messagingTemplate(null) + .build(); + + // then + // expected exception + } + + @Test + public void maxWaitTimeoutsMustBeGreaterThanZero() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("maxWaitTimeouts must be greater than zero"); + + // when + TaskletStep step = new RemoteChunkingMasterStepBuilder("step") + .maxWaitTimeouts(-1) + .build(); + + // then + // expected exception + } + + @Test + public void throttleLimitMustNotBeGreaterThanZero() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("throttleLimit must be greater than zero"); + + // when + TaskletStep step = new RemoteChunkingMasterStepBuilder("step") + .throttleLimit(-1L) + .build(); + + // then + // expected exception + } + + @Test + public void testMandatoryInputChannel() { + // given + RemoteChunkingMasterStepBuilder builder = new RemoteChunkingMasterStepBuilder<>("step"); + + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("An InputChannel must be provided"); + + // when + TaskletStep step = builder.build(); + + // then + // expected exception + } + + @Test + public void testMandatoryOutputChannel() { + // given + RemoteChunkingMasterStepBuilder builder = new RemoteChunkingMasterStepBuilder("step") + .inputChannel(this.inputChannel); + + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("An OutputChannel must be provided"); + + // when + TaskletStep step = builder.build(); + + // then + // expected exception + } + + @Test + public void testUnsupportedOperationExceptionWhenSpecifyingAnItemWriter() { + // given + this.expectedException.expect(UnsupportedOperationException.class); + this.expectedException.expectMessage("When configuring a master " + + "step for remote chunking, the item writer will be automatically " + + "set to an instance of ChunkMessageChannelItemWriter. " + + "The item writer must not be provided in this case."); + + // when + TaskletStep step = new RemoteChunkingMasterStepBuilder("step") + .reader(this.itemReader) + .writer(items -> { }) + .repository(this.jobRepository) + .transactionManager(this.transactionManager) + .inputChannel(this.inputChannel) + .outputChannel(this.outputChannel) + .build(); + + // then + // expected exception + } + + @Test + public void testMasterStepCreation() { + // when + TaskletStep taskletStep = new RemoteChunkingMasterStepBuilder("step") + .reader(this.itemReader) + .repository(this.jobRepository) + .transactionManager(this.transactionManager) + .inputChannel(this.inputChannel) + .outputChannel(this.outputChannel) + .build(); + + // then + Assert.assertNotNull(taskletStep); + } + + /* + * The following test is to cover setters that override those from parent builders. + */ + @Test + public void testSetters() { + // when + TaskletStep taskletStep = new RemoteChunkingMasterStepBuilder("step") + .reader(this.itemReader) + .readerIsTransactionalQueue() + .repository(this.jobRepository) + .transactionManager(this.transactionManager) + .transactionAttribute(new DefaultTransactionAttribute()) + .inputChannel(this.inputChannel) + .outputChannel(this.outputChannel) + .listener(new Object()) + .listener(new SkipListenerSupport<>()) + .listener(new ChunkListenerSupport()) + .listener(new StepExecutionListenerSupport()) + .listener(new CompositeItemReadListener<>()) + .listener(new CompositeItemWriteListener<>()) + .listener(new RetryListenerSupport()) + .skip(Exception.class) + .noSkip(RuntimeException.class) + .skipLimit(10) + .retry(Exception.class) + .noRetry(RuntimeException.class) + .retryLimit(10) + .retryContextCache(new MapRetryContextCache()) + .noRollback(Exception.class) + .chunk(10) + .startLimit(3) + .allowStartIfComplete(true) + .exceptionHandler(new DefaultExceptionHandler()) + .stepOperations(new RepeatTemplate()) + .chunkOperations(new RepeatTemplate()) + .backOffPolicy(new NoBackOffPolicy()) + .stream(new ItemStreamSupport() {}) + .keyGenerator(Object::hashCode) + .build(); + + // then + Assert.assertNotNull(taskletStep); + } + + @Configuration + @EnableBatchProcessing + public static class BatchConfiguration { + + } +} diff --git a/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTest.java b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTest.java new file mode 100644 index 0000000000..b1e0272c2c --- /dev/null +++ b/spring-batch-integration/src/test/java/org/springframework/batch/integration/chunk/RemoteChunkingWorkerBuilderTest.java @@ -0,0 +1,166 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.integration.chunk; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.item.support.PassThroughItemProcessor; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.dsl.IntegrationFlow; + +/** + * @author Mahmoud Ben Hassine + */ +public class RemoteChunkingWorkerBuilderTest { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private ItemProcessor itemProcessor = new PassThroughItemProcessor<>(); + private ItemWriter itemWriter = items -> { }; + + @Test + public void itemProcessorMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("itemProcessor must not be null"); + + // when + IntegrationFlow integrationFlow = new RemoteChunkingWorkerBuilder() + .itemProcessor(null) + .build(); + + // then + // expected exception + } + + @Test + public void itemWriterMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("itemWriter must not be null"); + + // when + IntegrationFlow integrationFlow = new RemoteChunkingWorkerBuilder() + .itemWriter(null) + .build(); + + // then + // expected exception + } + + @Test + public void inputChannelMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("inputChannel must not be null"); + + // when + IntegrationFlow integrationFlow = new RemoteChunkingWorkerBuilder() + .inputChannel(null) + .build(); + + // then + // expected exception + } + + @Test + public void outputChannelMustNotBeNull() { + // given + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("outputChannel must not be null"); + + // when + IntegrationFlow integrationFlow = new RemoteChunkingWorkerBuilder() + .outputChannel(null) + .build(); + + // then + // expected exception + } + + @Test + public void testMandatoryItemWriter() { + // given + RemoteChunkingWorkerBuilder builder = new RemoteChunkingWorkerBuilder<>(); + + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("An ItemWriter must be provided"); + + // when + builder.build(); + + // then + // expected exception + } + + @Test + public void testMandatoryInputChannel() { + // given + RemoteChunkingWorkerBuilder builder = new RemoteChunkingWorkerBuilder() + .itemWriter(items -> { }); + + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("An InputChannel must be provided"); + + // when + builder.build(); + + // then + // expected exception + } + + @Test + public void testMandatoryOutputChannel() { + // given + RemoteChunkingWorkerBuilder builder = new RemoteChunkingWorkerBuilder() + .itemWriter(items -> { }) + .inputChannel(new DirectChannel()); + + this.expectedException.expect(IllegalArgumentException.class); + this.expectedException.expectMessage("An OutputChannel must be provided"); + + // when + builder.build(); + + // then + // expected exception + } + + @Test + public void testIntegrationFlowCreation() { + // given + DirectChannel inputChannel = new DirectChannel(); + DirectChannel outputChannel = new DirectChannel(); + RemoteChunkingWorkerBuilder builder = new RemoteChunkingWorkerBuilder() + .itemProcessor(this.itemProcessor) + .itemWriter(this.itemWriter) + .inputChannel(inputChannel) + .outputChannel(outputChannel); + + // when + IntegrationFlow integrationFlow = builder.build(); + + // then + Assert.assertNotNull(integrationFlow); + } + +} diff --git a/spring-batch-samples/README.md b/spring-batch-samples/README.md index 9daf02c456..b9f013fdff 100644 --- a/spring-batch-samples/README.md +++ b/spring-batch-samples/README.md @@ -651,11 +651,10 @@ This sample shows how to configure a remote chunking job. The master step will read numbers from 1 to 6 and send two chunks ({1, 2, 3} and {4, 5, 6}) to workers for processing and writing. -This example shows how to: +This example shows how to use: -* configure a `ChunkMessageChannelItemWriter` on the master side to send chunks to workers -* configure a `ChunkProcessorChunkHandler` on the worker side to process chunks and -send replies back to the master +* the `RemoteChunkingMasterStepBuilderFactory` to create a master step +* the `RemoteChunkingWorkerBuilder` to configure an integration flow on the worker side. The sample uses an embedded JMS broker as a communication middleware between the master and workers. The usage of an embedded broker is only for simplicity's sake, diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java index 4441033773..2d8eebfd19 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/MasterConfiguration.java @@ -22,9 +22,9 @@ import org.springframework.batch.core.Job; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; -import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.step.tasklet.TaskletStep; -import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter; +import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory; +import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; import org.springframework.batch.item.support.ListItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -34,7 +34,6 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.jms.dsl.Jms; @@ -48,6 +47,7 @@ */ @Configuration @EnableBatchProcessing +@EnableBatchIntegration @EnableIntegration @PropertySource("classpath:remote-chunking.properties") public class MasterConfiguration { @@ -59,7 +59,7 @@ public class MasterConfiguration { private JobBuilderFactory jobBuilderFactory; @Autowired - private StepBuilderFactory stepBuilderFactory; + private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory; @Bean public ActiveMQConnectionFactory connectionFactory() { @@ -109,23 +109,13 @@ public ListItemReader itemReader() { return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6)); } - @Bean - public ChunkMessageChannelItemWriter itemWriter() { - MessagingTemplate messagingTemplate = new MessagingTemplate(); - messagingTemplate.setDefaultChannel(requests()); - ChunkMessageChannelItemWriter chunkMessageChannelItemWriter - = new ChunkMessageChannelItemWriter<>(); - chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); - chunkMessageChannelItemWriter.setReplyChannel(replies()); - return chunkMessageChannelItemWriter; - } - @Bean public TaskletStep masterStep() { - return this.stepBuilderFactory.get("masterStep") + return this.masterStepBuilderFactory.get("masterStep") .chunk(3) .reader(itemReader()) - .writer(itemWriter()) + .outputChannel(requests()) + .inputChannel(replies()) .build(); } diff --git a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java index 84363e7b68..e0ba2af20e 100644 --- a/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java +++ b/spring-batch-samples/src/main/java/org/springframework/batch/sample/remotechunking/WorkerConfiguration.java @@ -18,16 +18,15 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; -import org.springframework.batch.core.step.item.ChunkProcessor; -import org.springframework.batch.core.step.item.SimpleChunkProcessor; -import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler; +import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder; +import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; -import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.dsl.IntegrationFlow; @@ -36,7 +35,8 @@ /** * This configuration class is for the worker side of the remote chunking sample. - * It configures a {@link ChunkProcessorChunkHandler} as a service activator to: + * It uses the {@link RemoteChunkingWorkerBuilder} to configure an + * {@link IntegrationFlow} in order to: *
    *
  • receive requests from the master
  • *
  • process chunks with the configured item processor and writer
  • @@ -47,6 +47,7 @@ */ @Configuration @EnableBatchProcessing +@EnableBatchIntegration @EnableIntegration @PropertySource("classpath:remote-chunking.properties") public class WorkerConfiguration { @@ -54,6 +55,9 @@ public class WorkerConfiguration { @Value("${broker.url}") private String brokerUrl; + @Autowired + private RemoteChunkingWorkerBuilder remoteChunkingWorkerBuilder; + @Bean public ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); @@ -115,12 +119,13 @@ public ItemWriter itemWriter() { } @Bean - @ServiceActivator(inputChannel = "requests", outputChannel = "replies") - public ChunkProcessorChunkHandler chunkProcessorChunkHandler() { - ChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); - ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); - chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); - return chunkProcessorChunkHandler; + public IntegrationFlow workerIntegrationFlow() { + return this.remoteChunkingWorkerBuilder + .itemProcessor(itemProcessor()) + .itemWriter(itemWriter()) + .inputChannel(requests()) + .outputChannel(replies()) + .build(); } }