Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BATCH-1767: fix optimistic locking exception in multi-threaded step #591

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInterruptedException;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
Expand Down Expand Up @@ -339,6 +340,7 @@ private class ChunkTransactionCallback extends TransactionSynchronizationAdapter
private boolean stepExecutionUpdated = false;

private StepExecution oldVersion;
private ExecutionContext oldExecutionContext;

private boolean locked = false;

Expand All @@ -360,6 +362,7 @@ public void afterCompletion(int status) {
logger.info("Commit failed while step execution data was already updated. "
+ "Reverting to old version.");
copy(oldVersion, stepExecution);
stepExecution.setExecutionContext(oldExecutionContext);
if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
rollback(stepExecution);
}
Expand All @@ -371,7 +374,6 @@ public void afterCompletion(int status) {
logger.error("Rolling back with transaction in unknown state");
rollback(stepExecution);
stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
stepExecution.setTerminateOnly();
}
}
finally {
Expand All @@ -397,8 +399,7 @@ public RepeatStatus doInTransaction(TransactionStatus status) {

// In case we need to push it back to its old value
// after a commit fails...
oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
copy(stepExecution, oldVersion);
oldExecutionContext = new ExecutionContext(stepExecution.getExecutionContext());

try {

Expand Down Expand Up @@ -433,6 +434,23 @@ public RepeatStatus doInTransaction(TransactionStatus status) {
Thread.currentThread().interrupt();
}

// Refresh stepExecution to the latest correctly persisted
// state in order to apply the contribution on the latest version
String stepName = stepExecution.getStepName();
JobExecution jobExecution = stepExecution.getJobExecution();
StepExecution lastStepExecution = getJobRepository()
.getLastStepExecution(jobExecution.getJobInstance(), stepName);
if (lastStepExecution != null &&
!lastStepExecution.getVersion().equals(stepExecution.getVersion())) {
copy(lastStepExecution, stepExecution);
}

// Take a copy of the stepExecution in case we need to
// undo the current contribution to the in memory instance
// if the commit fails
oldVersion = new StepExecution(stepName, jobExecution);
copy(stepExecution, oldVersion);

// Apply the contribution to the step
// even if unsuccessful
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -499,11 +517,9 @@ private void rollback(StepExecution stepExecution) {
}

private void copy(final StepExecution source, final StepExecution target) {
target.setVersion(source.getVersion());
target.setWriteCount(source.getWriteCount());
target.setFilterCount(source.getFilterCount());
target.setCommitCount(source.getCommitCount());
target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.core.step.item;

import org.junit.Test;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.BatchConfigurer;
import org.springframework.batch.core.configuration.annotation.DefaultBatchConfigurer;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;

import javax.sql.DataSource;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/**
* Tests for the behavior of a multi-threaded TaskletStep.
*
* @author Mahmoud Ben Hassine
*/
public class MultiThreadedTaskletStepIntegrationTests {

@Test
public void testMultiThreadedTaskletExecutionWhenNoErrors() throws Exception {
// given
Class<?>[] configurationClasses = {JobConfiguration.class, TransactionManagerConfiguration.class};
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParameters();

// when
JobExecution jobExecution = jobLauncher.run(job, jobParameters);

// then
assertNotNull(jobExecution);
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
assertEquals(BatchStatus.COMPLETED, stepExecution.getStatus());
assertEquals(0, stepExecution.getFailureExceptions().size());
}

@Test
public void testMultiThreadedTaskletExecutionWhenCommitFails() throws Exception {
// given
Class<?>[] configurationClasses = {JobConfiguration.class, CommitFailingTransactionManagerConfiguration.class};
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParameters();

// when
JobExecution jobExecution = jobLauncher.run(job, jobParameters);

// then
assertNotNull(jobExecution);
assertEquals(BatchStatus.FAILED, jobExecution.getStatus());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
assertEquals(BatchStatus.FAILED, stepExecution.getStatus());
Throwable e = stepExecution.getFailureExceptions().get(0);
assertEquals("Planned commit exception!", e.getMessage());
// No assertions on execution context because it is undefined in this case
}

@Test
public void testMultiThreadedTaskletExecutionWhenRollbackFails() throws Exception {
// given
Class<?>[] configurationClasses = {JobConfiguration.class, RollbackFailingTransactionManagerConfiguration.class};
ApplicationContext context = new AnnotationConfigApplicationContext(configurationClasses);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParameters();

// when
JobExecution jobExecution = jobLauncher.run(job, jobParameters);

// then
assertNotNull(jobExecution);
assertEquals(BatchStatus.UNKNOWN, jobExecution.getStatus());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
assertEquals(BatchStatus.UNKNOWN, stepExecution.getStatus());
Throwable e = stepExecution.getFailureExceptions().get(0);
assertEquals("Planned rollback exception!", e.getMessage());
// No assertions on execution context because it is undefined in this case
}

@Configuration
@EnableBatchProcessing
public static class JobConfiguration {

@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public TaskletStep step() {
return stepBuilderFactory.get("step")
.<Integer, Integer>chunk(3)
.reader(itemReader())
.writer(items -> {})
.taskExecutor(taskExecutor())
.build();
}

@Bean
public Job job(ThreadPoolTaskExecutor taskExecutor) {
return jobBuilderFactory.get("job")
.start(step())
.listener(new JobExecutionListenerSupport() {
@Override
public void afterJob(JobExecution jobExecution) {
taskExecutor.shutdown();
}
})
.build();
}

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(3);
taskExecutor.setThreadNamePrefix("spring-batch-worker-thread-");
return taskExecutor;
}

@Bean
public ItemReader<Integer> itemReader() {
return new ItemReader<Integer>() {
private final AtomicInteger atomicInteger = new AtomicInteger();
@Override
public synchronized Integer read() {
int value = atomicInteger.incrementAndGet();
return value <= 9 ? value : null;
}
};
}

}

@Configuration
public static class DataSourceConfiguration {

@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("org/springframework/batch/core/schema-hsqldb.sql")
.build();
}

}

@Configuration
@Import(DataSourceConfiguration.class)
public static class TransactionManagerConfiguration {

@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
};
}

}

@Configuration
@Import(DataSourceConfiguration.class)
public static class CommitFailingTransactionManagerConfiguration {

@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource) {
@Override
protected void doCommit(DefaultTransactionStatus status) {
super.doCommit(status);
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
throw new RuntimeException("Planned commit exception!");
}
}
};
}
};
}

}

@Configuration
@Import(DataSourceConfiguration.class)
public static class RollbackFailingTransactionManagerConfiguration {

@Bean
public BatchConfigurer batchConfigurer(DataSource dataSource) {
return new DefaultBatchConfigurer(dataSource) {
@Override
public PlatformTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource) {
@Override
protected void doCommit(DefaultTransactionStatus status) {
super.doCommit(status);
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
throw new RuntimeException("Planned commit exception!");
}
}

@Override
protected void doRollback(DefaultTransactionStatus status) {
super.doRollback(status);
if (Thread.currentThread().getName().equals("spring-batch-worker-thread-2")) {
throw new RuntimeException("Planned rollback exception!");
}
}
};
}
};
}

}

}
14 changes: 10 additions & 4 deletions spring-batch-docs/src/main/asciidoc/scalability.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ your step, such as a `DataSource`. Be sure to make the pool in those resources
as large as the desired number of concurrent threads in the step.

There are some practical limitations of using multi-threaded `Step` implementations for
some common batch use cases. Many participants in a `Step` (such as readers and writers)
some common batch use cases:

* Many participants in a `Step` (such as readers and writers)
are stateful. If the state is not segregated by thread, then those components are not
usable in a multi-threaded `Step`. In particular, most of the off-the-shelf readers and
writers from Spring Batch are not designed for multi-threaded use. It is, however,
Expand All @@ -132,9 +134,8 @@ possible to work with stateless or thread safe readers and writers, and there is
https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples[Spring
Batch Samples] that shows the use of a process indicator (see
<<readersAndWriters.adoc#process-indicator,Preventing State Persistence>>) to keep track
of items that have been processed in a database input table.

Spring Batch provides some implementations of `ItemWriter` and `ItemReader`. Usually,
of items that have been processed in a database input table. Spring Batch provides some
implementations of `ItemWriter` and `ItemReader`. Usually,
they say in the Javadoc if they are thread safe or not or what you have to do to avoid
problems in a concurrent environment. If there is no information in the Javadoc, you can
check the implementation to see if there is any state. If a reader is not thread safe,
Expand All @@ -143,6 +144,11 @@ synchronizing delegator. You can synchronize the call to `read()` and as long as
processing and writing is the most expensive part of the chunk, your step may still
complete much faster than it would in a single threaded configuration.

* In a multi-threaded `Step`, each thread runs in its own transaction and the `ChunkContext`
is shared between threads. This shared state might end up in an inconsistent state
if one of the transactions is rolled back. Hence, we recommend avoiding `ExecutionContext`
manipulation in a multi-threaded `Step`.

[[scalabilityParallelSteps]]


Expand Down