Permalink
Browse files

Polish and updated a test to validate the conditions

  • Loading branch information...
mminella committed May 25, 2018
1 parent ee56a23 commit 98070e4a1cff56af307bcd060962e009efcf4f1d
@@ -434,7 +434,7 @@ protected Tasklet createTasklet() {
SkipPolicy readSkipPolicy = createSkipPolicy();
readSkipPolicy = getFatalExceptionAwareProxy(readSkipPolicy);
- FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<I>(getReader(),
+ FaultTolerantChunkProvider<I> chunkProvider = new FaultTolerantChunkProvider<>(getReader(),
createChunkOperations());
chunkProvider.setMaxSkipsOnRead(Math.max(getChunkSize(), FaultTolerantChunkProvider.DEFAULT_MAX_SKIPS_ON_READ));
chunkProvider.setSkipPolicy(readSkipPolicy);
@@ -939,7 +939,7 @@ For more information, see the section of the "Scalability" chapter on
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].
Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration`
-annotation that can be used to simplify remote chunking setup. This annotation provides
+annotation that can be used to simplify a remote chunking setup. This annotation provides
two beans that can be autowired in the application context:
* `RemoteChunkingMasterStepBuilderFactory`: used to configure the master step
@@ -957,8 +957,8 @@ configure a master step by declaring:
* the output channel ("Outgoing requests") to send requests to workers
* the input channel ("Incoming replies") to receive replies from workers
-There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
-and the `MessagingTemplate` (Those can still be explicitly configured if required).
+A `ChunkMessageChannelItemWriter` and the `MessagingTemplate` are not needed to be explicitly configured
+(Those can still be explicitly configured if required).
On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a worker to:
@@ -967,14 +967,15 @@ On the worker side, the `RemoteChunkingWorkerBuilder` allows you to configure a
with the configured `ItemProcessor` and `ItemWriter`
* send replies on the output channel ("Outgoing replies") to the master
-There is no need anymore to explicitly configure the `SimpleChunkProcessor`
-and the `ChunkProcessorChunkHandler` (Those can still be explicitly configured if required).
+There is no need to explicitly configure the `SimpleChunkProcessor`
+and the `ChunkProcessorChunkHandler` (Those can be explicitly configured if required).
The following example shows how to use these APIs:
[source, java]
----
@EnableBatchIntegration
+@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {
@Configuration
@@ -26,6 +26,7 @@
import org.springframework.batch.core.step.item.KeyGenerator;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.tasklet.TaskletStep;
+import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
@@ -400,4 +401,10 @@ public TaskletStep build() {
super.allowStartIfComplete(allowStartIfComplete);
return this;
}
+
+ @Override
+ public RemoteChunkingMasterStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> itemProcessor) {
+ super.processor(itemProcessor);
+ return this;
+ }
}
@@ -45,6 +45,7 @@
* <pre class="code">
* &#064;Configuration
* &#064;EnableBatchIntegration
+ * &#064;EnableBatchProcessing
* public class RemoteChunkingAppConfig {
*
* &#064;Autowired
@@ -15,40 +15,57 @@
*/
package org.springframework.batch.integration.chunk;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
+import org.springframework.batch.core.ChunkListener;
+import org.springframework.batch.core.ItemReadListener;
+import org.springframework.batch.core.ItemWriteListener;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.SkipListener;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
-import org.springframework.batch.core.listener.ChunkListenerSupport;
-import org.springframework.batch.core.listener.CompositeItemReadListener;
-import org.springframework.batch.core.listener.CompositeItemWriteListener;
-import org.springframework.batch.core.listener.SkipListenerSupport;
-import org.springframework.batch.core.listener.StepExecutionListenerSupport;
import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
+import org.springframework.batch.core.step.item.SimpleChunkProcessor;
+import org.springframework.batch.core.step.item.SimpleChunkProvider;
import org.springframework.batch.core.step.tasklet.TaskletStep;
+import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamSupport;
+import org.springframework.batch.item.ItemWriter;
+import org.springframework.batch.item.support.CompositeItemStream;
import org.springframework.batch.item.support.ListItemReader;
-import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
import org.springframework.batch.repeat.support.RepeatTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
+import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.PollableChannel;
+import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.NoBackOffPolicy;
-import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.MapRetryContextCache;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* @author Mahmoud Ben Hassine
*/
@@ -216,44 +233,136 @@ public void testMasterStepCreation() {
* The following test is to cover setters that override those from parent builders.
*/
@Test
- public void testSetters() {
+ public void testSetters() throws Exception {
// when
+ DefaultTransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
+
+ Object annotatedListener = new Object();
+ MapRetryContextCache retryCache = new MapRetryContextCache();
+ RepeatTemplate stepOperations = new RepeatTemplate();
+ NoBackOffPolicy backOffPolicy = new NoBackOffPolicy();
+ ItemStreamSupport stream = new ItemStreamSupport() {
+ };
+ StepExecutionListener stepExecutionListener = mock(StepExecutionListener.class);
+ ItemReadListener<String> itemReadListener = mock(ItemReadListener.class);
+ ItemWriteListener<String> itemWriteListener = mock(ItemWriteListener.class);
+ ChunkListener chunkListener = mock(ChunkListener.class);
+ SkipListener<String, String> skipListener = mock(SkipListener.class);
+ RetryListener retryListener = mock(RetryListener.class);
+
+ when(retryListener.open(any(), any())).thenReturn(true);
+
+ ItemProcessor<String, String> itemProcessor = item -> {
+ System.out.println("processing item " + item);
+ if(item.equals("b")) {
+ throw new Exception("b was found");
+ }
+ else {
+ return item;
+ }
+ };
+
+ ItemReader<String> itemReader = new ItemReader<String>() {
+
+ int count = 0;
+ List<String> items = Arrays.asList("a", "b", "c", "d", "d", "e", "f", "g", "h", "i");
+
+ @Override
+ public String read() throws Exception {
+ System.out.println(">> count == " + count);
+ if(count == 6) {
+ count++;
+ throw new IOException("6th item");
+ }
+ else if(count == 7) {
+ count++;
+ throw new RuntimeException("7th item");
+ }
+ else if(count < items.size()){
+ String item = items.get(count++);
+ System.out.println(">> item read was " + item);
+ return item;
+ }
+ else {
+ return null;
+ }
+ }
+ };
+
TaskletStep taskletStep = new RemoteChunkingMasterStepBuilder<String, String>("step")
- .reader(this.itemReader)
+ .reader(itemReader)
.readerIsTransactionalQueue()
+ .processor(itemProcessor)
.repository(this.jobRepository)
.transactionManager(this.transactionManager)
- .transactionAttribute(new DefaultTransactionAttribute())
+ .transactionAttribute(transactionAttribute)
.inputChannel(this.inputChannel)
.outputChannel(this.outputChannel)
- .listener(new Object())
- .listener(new SkipListenerSupport<>())
- .listener(new ChunkListenerSupport())
- .listener(new StepExecutionListenerSupport())
- .listener(new CompositeItemReadListener<>())
- .listener(new CompositeItemWriteListener<>())
- .listener(new RetryListenerSupport())
+ .listener(annotatedListener)
+ .listener(skipListener)
+ .listener(chunkListener)
+ .listener(stepExecutionListener)
+ .listener(itemReadListener)
+ .listener(itemWriteListener)
+ .listener(retryListener)
.skip(Exception.class)
.noSkip(RuntimeException.class)
.skipLimit(10)
- .retry(Exception.class)
+ .retry(IOException.class)
.noRetry(RuntimeException.class)
.retryLimit(10)
- .retryContextCache(new MapRetryContextCache())
+ .retryContextCache(retryCache)
.noRollback(Exception.class)
- .chunk(10)
.startLimit(3)
.allowStartIfComplete(true)
- .exceptionHandler(new DefaultExceptionHandler())
- .stepOperations(new RepeatTemplate())
- .chunkOperations(new RepeatTemplate())
- .backOffPolicy(new NoBackOffPolicy())
- .stream(new ItemStreamSupport() {})
+ .stepOperations(stepOperations)
+ .chunk(3)
+ .backOffPolicy(backOffPolicy)
+ .stream(stream)
.keyGenerator(Object::hashCode)
.build();
+ JobExecution jobExecution = this.jobRepository.createJobExecution("job1", new JobParameters());
+ StepExecution stepExecution = new StepExecution("step1", jobExecution);
+ this.jobRepository.add(stepExecution);
+
+ taskletStep.execute(stepExecution);
+
// then
Assert.assertNotNull(taskletStep);
+ ChunkOrientedTasklet tasklet = (ChunkOrientedTasklet) ReflectionTestUtils.getField(taskletStep, "tasklet");
+ SimpleChunkProvider provider = (SimpleChunkProvider) ReflectionTestUtils.getField(tasklet, "chunkProvider");
+ SimpleChunkProcessor processor = (SimpleChunkProcessor) ReflectionTestUtils.getField(tasklet, "chunkProcessor");
+ ItemWriter itemWriter = (ItemWriter) ReflectionTestUtils.getField(processor, "itemWriter");
+ MessagingTemplate messagingTemplate = (MessagingTemplate) ReflectionTestUtils.getField(itemWriter, "messagingGateway");
+ CompositeItemStream compositeItemStream = (CompositeItemStream) ReflectionTestUtils.getField(taskletStep, "stream");
+
+ Assert.assertEquals(ReflectionTestUtils.getField(provider, "itemReader"), itemReader);
+ Assert.assertFalse((Boolean) ReflectionTestUtils.getField(tasklet, "buffering"));
+ Assert.assertEquals(ReflectionTestUtils.getField(taskletStep, "jobRepository"), this.jobRepository);
+ Assert.assertEquals(ReflectionTestUtils.getField(taskletStep, "transactionManager"), this.transactionManager);
+ Assert.assertEquals(ReflectionTestUtils.getField(taskletStep, "transactionAttribute"), transactionAttribute);
+ Assert.assertEquals(ReflectionTestUtils.getField(itemWriter, "replyChannel"), this.inputChannel);
+ Assert.assertEquals(ReflectionTestUtils.getField(messagingTemplate, "defaultDestination"), this.outputChannel);
+ Assert.assertEquals(ReflectionTestUtils.getField(processor, "itemProcessor"), itemProcessor);
+
+ Assert.assertEquals((int) ReflectionTestUtils.getField(taskletStep, "startLimit"), 3);
+ Assert.assertTrue((Boolean) ReflectionTestUtils.getField(taskletStep, "allowStartIfComplete"));
+ Object stepOperationsUsed = ReflectionTestUtils.getField(taskletStep, "stepOperations");
+ Assert.assertEquals(stepOperationsUsed, stepOperations);
+
+ Assert.assertEquals(((List)ReflectionTestUtils.getField(compositeItemStream, "streams")).size(), 2);
+ Assert.assertNotNull(ReflectionTestUtils.getField(processor, "keyGenerator"));
+
+ verify(skipListener, atLeastOnce()).onSkipInProcess(any(), any());
+ verify(retryListener, atLeastOnce()).open(any(), any());
+ verify(stepExecutionListener, atLeastOnce()).beforeStep(any());
+ verify(chunkListener, atLeastOnce()).beforeChunk(any());
+ verify(itemReadListener, atLeastOnce()).beforeRead();
+ verify(itemWriteListener, atLeastOnce()).beforeWrite(any());
+
+ Assert.assertEquals(stepExecution.getSkipCount(), 2);
+ Assert.assertEquals(stepExecution.getRollbackCount(), 3);
}
@Configuration

0 comments on commit 98070e4

Please sign in to comment.