Skip to content

Commit

Permalink
BATCH-2686: update samples/docs
Browse files Browse the repository at this point in the history
  • Loading branch information
fmbenhassine committed Apr 30, 2018
1 parent 3e5bb95 commit 3a9d8d9
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 1 deletion.
4 changes: 3 additions & 1 deletion build.gradle
Expand Up @@ -553,7 +553,7 @@ project('spring-batch-samples') {

dependencies {

compile project(":spring-batch-core")
compile project(":spring-batch-integration")
compile "org.aspectj:aspectjrt:$aspectjVersion"
compile "org.aspectj:aspectjweaver:$aspectjVersion"
compile "org.quartz-scheduler:quartz:$quartzVersion"
Expand Down Expand Up @@ -609,6 +609,8 @@ project('spring-batch-samples') {
optional "org.springframework:spring-web:$springVersion"
optional "org.springframework.data:spring-data-commons:$springDataCommonsVersion"
optional "org.springframework.amqp:spring-amqp:$springAmqpVersion"
optional "org.springframework.integration:spring-integration-amqp:$springIntegrationVersion"

optional ("org.springframework.amqp:spring-rabbit:$springAmqpVersion") {
exclude group: "org.springframework", module: "spring-messaging"
exclude group: "org.springframework", module: "spring-web"
Expand Down
71 changes: 71 additions & 0 deletions spring-batch-docs/asciidoc/spring-batch-integration.adoc
Expand Up @@ -948,6 +948,77 @@ when it receives chunks from the master.
For more information, see the section of the "Scalability" chapter on
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].

Starting from version 4.1, Spring Batch Integration provides the `@EnableBatchIntegration`
annotation that can be used to simplify remote chunking setup. This annotation provides
two beans that can be autowired in the application context:

* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step
* `RemoteChunkingWorkerFlowBuilder`: used to configure the remote worker flow

The following example shows how to use these APIs:

[source, java]
----
@EnableBatchIntegration
public class RemoteChunkingConfiguration {
@Configuration
public static class MasterConfiguration {
@Autowired
private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;
@Bean
public TaskletStep masterStep() {
return masterStepBuilderFactory.get("masterStep")
.chunk(100)
.reader(itemReader())
.outputChannel(requests()) // requests sent to workers
.inputChannel(replies()) // replies received from workers
.build();
}
// Middleware beans setup omitted
}
@Configuration
public static class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerFlowBuilder workerFlowBuilder;
@Bean
public IntegrationFlow workerFlow(DirectChannel requests, DirectChannel replies) {
return workerFlowBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests) // requests received from the master
.outputChannel(replies) // replies sent to the master
.build();
}
// Middleware beans setup omitted
}
}
----

On the master side, the `RemoteChunkingMasterStepBuilderFactory` makes it possible
to configure a master step by declaring the item reader, the output channel
(to send requests to workers) and the input channel (to receive replies from workers).
There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
and `RemoteChunkHandlerFactoryBean`.

On the worker side, the `RemoteChunkingWorkerFlowBuilder` allows you to configure
the worker flow to listens to requests sent by the master on the input channel,
call the `ChunkProcessorChunkHandler` for each request with the configured `ItemProcessor`
and `ItemWriter` and finally send replies on the output channel to the master.
There is no need anymore to explicitly configure the `ChunkProcessorChunkHandler`
and `AggregatorFactoryBean`.

You can find a complete example of a remote chunking job
link:$$https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples#remote-chunking-sample[here].

[[remote-partitioning]]

Expand Down
9 changes: 9 additions & 0 deletions spring-batch-samples/README.md
Expand Up @@ -39,6 +39,7 @@ Job/Feature | skip | retry | restart | aut
[multilineOrder](#multilineOrder) | | | | | | | X | | | |
[parallel](#parallel) | | | | | | | | | | X |
[partition](#partition) | | | | | | | | | | X |
[remoteChunking](#remoteChunking) | | | | | | | | | | X |
[quartz](#quartz) | | | | | X | | | | | |
[restart](#restart) | | | X | | | | | | | |
[retry](#retry) | | X | | | | | | | | |
Expand Down Expand Up @@ -644,6 +645,14 @@ the work. Notice that the readers and writers in the `Step`
that is being partitioned are step-scoped, so that their state does
not get shared across threads of execution.

### [Remote Chunking Sample](id:remoteChunking)

This sample shows how to configure a remote chunking job. The sample uses
the `RemoteChunkingMasterStepBuilderFactory` to create a master step and
the `RemoteChunkingWorkerFlowBuilder` to configure an integration flow on
the worker side. This example also shows how to use rabbitmq as a communication
middleware between the master and workers.

### [Quartz Sample](id:quartz)

The goal is to demonstrate how to schedule job execution using
Expand Down
@@ -0,0 +1,168 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.sample.remotechunking;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.integration.chunk.RemoteChunkingWorkerFlowBuilder;
import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;

/**
* This class is used to start a worker for
* {@code org.springframework.batch.sample.RemoteChunkingJobFunctionalTests#testLaunchJob()}.
*/

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
@PropertySource("classpath:default.amqp.properties")
public class WorkerConfiguration {

@Value("${rabbitmq.host}")
private String host;

@Value("${rabbitmq.port}")
private int port;

public static void main(String[] args) {
new AnnotationConfigApplicationContext(WorkerConfiguration.class);
}

@Autowired
private RemoteChunkingWorkerFlowBuilder<Integer, Integer> workerFlowBuilder;

@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
return cachingConnectionFactory;
}

@Bean
public AmqpTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(rabbitConnectionFactory());
return rabbitTemplate;
}

@Bean
public Queue requestQueue() {
return new Queue("requests", false);
}

@Bean
public Queue repliesQueue() {
return new Queue("replies", false);
}

@Bean
public TopicExchange exchange() {
return new TopicExchange("remote-chunking-exchange");
}

@Bean
Binding repliesBinding(TopicExchange exchange) {
return BindingBuilder
.bind(repliesQueue())
.to(exchange)
.with("replies");
}

@Bean
Binding requestBinding(TopicExchange exchange) {
return BindingBuilder
.bind(requestQueue())
.to(exchange)
.with("requests");
}

@Bean
public DirectChannel requests() {
return new DirectChannel();
}

@Bean
public DirectChannel replies() {
return new DirectChannel();
}

@Bean
public IntegrationFlow incomingRequests(ConnectionFactory rabbitConnectionFactory) {
return IntegrationFlows
.from(Amqp.inboundAdapter(rabbitConnectionFactory, "requests"))
.channel(requests())
.get();
}

@Bean
public IntegrationFlow outgoingReplies(AmqpTemplate rabbitTemplate) {
return IntegrationFlows
.from("replies")
.handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("replies"))
.get();
}

@Bean
public IntegrationFlow integrationFlow(DirectChannel requests, DirectChannel replies) {
return workerFlowBuilder
.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests)
.outputChannel(replies)
.build();
}

@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
System.out.println("writing item " + item);
}
};
}

@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
System.out.println("processing item " + item);
return item;
};
}

}

0 comments on commit 3a9d8d9

Please sign in to comment.