Skip to content

Commit

Permalink
BATCH-1668: added check for transaction in JobRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
dsyer committed Jan 5, 2011
1 parent a8e16c6 commit ff15bd0
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
Expand All @@ -42,12 +44,6 @@
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.transaction.AfterTransaction;
import org.springframework.test.context.transaction.BeforeTransaction;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/simple-job-launcher-context.xml" })
Expand All @@ -65,9 +61,6 @@ public class JdbcJobRepositoryTests {

@Autowired
private JobRepository repository;

@Autowired
private PlatformTransactionManager transactionManager;

/** Logger */
private final Log logger = LogFactory.getLog(getClass());
Expand All @@ -77,7 +70,7 @@ public void setDataSource(DataSource dataSource) {
this.simpleJdbcTemplate = new SimpleJdbcTemplate(dataSource);
}

@BeforeTransaction
@Before
public void onSetUpInTransaction() throws Exception {
job = new JobSupport("test-job");
job.setRestartable(true);
Expand All @@ -89,7 +82,7 @@ public void onSetUpInTransaction() throws Exception {
simpleJdbcTemplate.update("DELETE FROM BATCH_JOB_INSTANCE");
}

@AfterTransaction
@After
public void onTearDownAfterTransaction() throws Exception {
for (Long id : jobExecutionIds) {
simpleJdbcTemplate.update("DELETE FROM BATCH_JOB_EXECUTION_CONTEXT where JOB_EXECUTION_ID=?", id);
Expand All @@ -105,7 +98,6 @@ public void onTearDownAfterTransaction() throws Exception {
}
}

@Transactional
@Test
public void testFindOrCreateJob() throws Exception {
job.setName("foo");
Expand All @@ -116,7 +108,6 @@ public void testFindOrCreateJob() throws Exception {
assertNotNull(execution.getId());
}

@Transactional
@Test
public void testFindOrCreateJobConcurrently() throws Exception {

Expand Down Expand Up @@ -191,24 +182,14 @@ private void cacheJobIds(JobExecution execution) {
private JobExecution doConcurrentStart() throws Exception {
new Thread(new Runnable() {
public void run() {

try {
new TransactionTemplate(transactionManager).execute(new TransactionCallback() {
public Object doInTransaction(org.springframework.transaction.TransactionStatus status) {
try {
JobExecution execution = repository.createJobExecution(job.getName(),
new JobParameters());
cacheJobIds(execution);
list.add(execution);
Thread.sleep(1000);
}
catch (Exception e) {
list.add(e);
}
return null;
}
});
JobExecution execution = repository.createJobExecution(job.getName(), new JobParameters());
cacheJobIds(execution);
list.add(execution);
Thread.sleep(1000);
}
catch (RuntimeException e) {
catch (Exception e) {
list.add(e);
}

Expand All @@ -225,7 +206,7 @@ public Object doInTransaction(org.springframework.transaction.TransactionStatus
}

assertEquals("Timed out waiting for JobExecution to be created", 1, list.size());
assertTrue("JobExecution not created in thread", list.get(0) instanceof JobExecution);
assertTrue("JobExecution not created in thread: " + list.get(0), list.get(0) instanceof JobExecution);
return (JobExecution) list.get(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.batch.core.repository.support;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.aop.support.NameMatchMethodPointcut;
Expand All @@ -29,6 +31,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;

/**
Expand All @@ -51,6 +54,8 @@ public abstract class AbstractJobRepositoryFactoryBean implements FactoryBean, I

private String isolationLevelForCreate = DEFAULT_ISOLATION_LEVEL;

private boolean validateTransactionState = true;

/**
* Default value for isolation level in create* method.
*/
Expand Down Expand Up @@ -90,6 +95,18 @@ public boolean isSingleton() {
return true;
}

/**
* Flag to determine whether to check for an existing transaction when a
* JobExecution is created. Defaults to true because it is usually a
* mistake, and leads to problems with restartability and also to deadlocks
* in multi-threaded steps.
*
* @param validateTransactionState the flag to set
*/
public void setValidateTransactionState(boolean validateTransactionState) {
this.validateTransactionState = validateTransactionState;
}

/**
* public setter for the isolation level to be used for the transaction when
* job execution entities are initially created. The default is
Expand Down Expand Up @@ -136,15 +153,27 @@ public JobRepository getJobRepository() throws Exception {
private void initializeProxy() throws Exception {
if (proxyFactory == null) {
proxyFactory = new ProxyFactory();
TransactionInterceptor advice = new TransactionInterceptor(transactionManager, PropertiesConverter
.stringToProperties("create*=PROPAGATION_REQUIRES_NEW," + isolationLevelForCreate
+ "\ngetLastJobExecution*=PROPAGATION_REQUIRES_NEW," + isolationLevelForCreate
+ "\n*=PROPAGATION_REQUIRED"));
DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(advice);
NameMatchMethodPointcut pointcut = new NameMatchMethodPointcut();
pointcut.addMethodName("*");
advisor.setPointcut(pointcut);
proxyFactory.addAdvisor(advisor);
TransactionInterceptor advice = new TransactionInterceptor(transactionManager,
PropertiesConverter.stringToProperties("create*=PROPAGATION_REQUIRES_NEW,"
+ isolationLevelForCreate + "\ngetLastJobExecution*=PROPAGATION_REQUIRES_NEW,"
+ isolationLevelForCreate + "\n*=PROPAGATION_REQUIRED"));
if (validateTransactionState) {
DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
if (TransactionSynchronizationManager.isActualTransactionActive()) {
throw new IllegalStateException(
"Existing transaction detected in JobRepository. "
+ "Please fix this and try again (e.g. remove @Transactional annotations from client).");
}
return invocation.proceed();
}
});
NameMatchMethodPointcut pointcut = new NameMatchMethodPointcut();
pointcut.addMethodName("create*");
advisor.setPointcut(pointcut);
proxyFactory.addAdvisor(advisor);
}
proxyFactory.addAdvice(advice);
proxyFactory.setProxyTargetClass(false);
proxyFactory.addInterface(JobRepository.class);
proxyFactory.setTarget(getTarget());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
*
* @author Ben Hale
* @author Lucas Ward
* @author Dave Syer
*/
public class JobRepositoryFactoryBean extends AbstractJobRepositoryFactoryBean implements InitializingBean {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.batch.core.job.JobSupport;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -35,8 +36,18 @@ public class SimpleJobRepositoryProxyTests {
private JobSupport job = new JobSupport("SimpleJobRepositoryProxyTestsJob");

@Transactional
@Test(expected=IllegalStateException.class)
@DirtiesContext
public void testCreateAndFindWithExistingTransaction() throws Exception {
assertFalse(advice.invoked);
JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), new JobParameters());
assertNotNull(jobExecution);
assertTrue(advice.invoked);
}

@Test
public void testCreateAndFind() throws Exception {
@DirtiesContext
public void testCreateAndFindNoTransaction() throws Exception {
assertFalse(advice.invoked);
JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), new JobParameters());
assertNotNull(jobExecution);
Expand Down

0 comments on commit ff15bd0

Please sign in to comment.