Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBERET-29] Switch SPI to use a TransactionManager rather than a UserTra... #8

Merged
merged 1 commit into from Feb 6, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -36,6 +36,10 @@
import javax.batch.runtime.StepExecution;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;

import org.jberet.creation.ArchiveXmlLoader;
import org.jberet.creation.ArtifactFactoryWrapper;
Expand Down Expand Up @@ -71,8 +75,19 @@ public long start(final String jobXMLName, final Properties jobParameters) throw
final ClassLoader classLoader = batchEnvironment.getClassLoader();
final Job jobDefined = ArchiveXmlLoader.loadJobXml(jobXMLName, Job.class, classLoader);
repository.addJob(jobDefined);
final JobInstanceImpl jobInstance = repository.createJobInstance(jobDefined, getApplicationName(), classLoader);
return startJobExecution(jobInstance, jobParameters, null);
try {
return invokeTransaction(new TransactionInvocation<Long>() {
@Override
public Long invoke() throws JobStartException, JobSecurityException {
final JobInstanceImpl jobInstance = repository.createJobInstance(jobDefined, getApplicationName(), classLoader);
return startJobExecution(jobInstance, jobParameters, null);
}
});
} catch (InvalidTransactionException e) {
throw new JobStartException(e);
} catch (SystemException e) {
throw new JobStartException(e);
}
}

@Override
Expand Down Expand Up @@ -194,7 +209,12 @@ public long restart(final long executionId, final Properties restartParameters)
jobInstance.setUnsubstitutedJob(repository.getJob(jobName));
}
try {
newExecutionId = startJobExecution(jobInstance, restartParameters, originalToRestart);
newExecutionId = invokeTransaction(new TransactionInvocation<Long>() {
@Override
public Long invoke() throws JobStartException, JobSecurityException {
return startJobExecution(jobInstance, restartParameters, originalToRestart);
}
});
} catch (Exception e) {
throw new JobRestartException(e);
}
Expand Down Expand Up @@ -260,4 +280,23 @@ private String getApplicationName() {
return null;
}
}

private <T> T invokeTransaction(final TransactionInvocation<T> transactionInvocation) throws SystemException, InvalidTransactionException {
final TransactionManager tm = batchEnvironment.getTransactionManager();
final Transaction tx = tm.suspend();
if (tx != null) {
try {
return transactionInvocation.invoke();
} finally {
tm.resume(tx);
}
}
// No transaction in process
return transactionInvocation.invoke();
}

private static interface TransactionInvocation<T> {

T invoke() throws JobStartException, JobSecurityException;
}
}
Expand Up @@ -35,7 +35,7 @@
import javax.batch.runtime.BatchStatus;
import javax.batch.runtime.Metric;
import javax.transaction.Status;
import javax.transaction.UserTransaction;
import javax.transaction.TransactionManager;

import org.jberet.job.model.Chunk;
import org.jberet.job.model.ExceptionClassFilter;
Expand Down Expand Up @@ -96,7 +96,7 @@ public final class ChunkRunner extends AbstractRunner<StepContextImpl> implement
private Object itemRead;
private final List<Object> outputList = new ArrayList<Object>();

private final UserTransaction ut;
private final TransactionManager tm;

public ChunkRunner(final StepContextImpl stepContext, final CompositeExecutionRunner enclosingRunner, final StepExecutionRunner stepRunner, final Chunk chunk) {
super(stepContext, enclosingRunner);
Expand Down Expand Up @@ -160,7 +160,7 @@ public ChunkRunner(final StepContextImpl stepContext, final CompositeExecutionRu
skippableExceptionClasses = chunk.getSkippableExceptionClasses();
retryableExceptionClasses = chunk.getRetryableExceptionClasses();
noRollbackExceptionClasses = chunk.getNoRollbackExceptionClasses();
this.ut = stepRunner.ut;
this.tm = stepRunner.tm;
createChunkRelatedListeners();
}

Expand All @@ -177,28 +177,28 @@ public void run() {
globalTimeout = Integer.valueOf(globalTimeoutProp);
}
}
ut.setTransactionTimeout(globalTimeout);
ut.begin();
tm.setTransactionTimeout(globalTimeout);
tm.begin();
try {
itemReader.open(stepOrPartitionExecution.getReaderCheckpointInfo());
itemWriter.open(stepOrPartitionExecution.getWriterCheckpointInfo());
ut.commit();
tm.commit();
} catch (Exception e) {
ut.rollback();
tm.rollback();
// An error occurred, safely close the reader and writer
safeClose();
throw e;
}

readProcessWriteItems();

ut.begin();
tm.begin();
try {
itemReader.close();
itemWriter.close();
ut.commit();
tm.commit();
} catch (Exception e) {
ut.rollback();
tm.rollback();
// An error occurred, safely close the reader and writer
safeClose();
throw e;
Expand Down Expand Up @@ -275,11 +275,11 @@ private void readProcessWriteItems() throws Exception {
processingInfo.reset();
}
//if during Chunk RETRYING, and an item is skipped, the ut is still active so no need to begin a new one
if (ut.getStatus() != Status.STATUS_ACTIVE) {
if (tm.getStatus() != Status.STATUS_ACTIVE) {
if (checkpointAlgorithm != null) {
ut.setTransactionTimeout(checkpointAlgorithm.checkpointTimeout());
tm.setTransactionTimeout(checkpointAlgorithm.checkpointTimeout());
}
ut.begin();
tm.begin();
}
for (final ChunkListener l : chunkListeners) {
l.beforeChunk();
Expand Down Expand Up @@ -316,19 +316,19 @@ private void readProcessWriteItems() throws Exception {
l.afterChunk();
}
} catch (Exception e) {
ut.rollback();
tm.rollback();
stepMetrics.increment(Metric.MetricType.ROLLBACK_COUNT, 1);
throw e;
}
ut.commit();
tm.commit();
stepMetrics.increment(Metric.MetricType.COMMIT_COUNT, 1);
}
} catch (Exception e) {
final int txStatus = ut.getStatus();
final int txStatus = tm.getStatus();
if (txStatus == Status.STATUS_ACTIVE || txStatus == Status.STATUS_MARKED_ROLLBACK ||
txStatus == Status.STATUS_PREPARED || txStatus == Status.STATUS_PREPARING ||
txStatus == Status.STATUS_COMMITTING || txStatus == Status.STATUS_ROLLING_BACK) {
ut.rollback();
tm.rollback();
}
for (final ChunkListener l : chunkListeners) {
l.onError(e);
Expand Down Expand Up @@ -568,7 +568,7 @@ private void doCheckpoint(final ProcessingInfo processingInfo) throws Exception
private void rollbackCheckpoint(final ProcessingInfo processingInfo) throws Exception {
outputList.clear();
processingInfo.failurePoint = itemReader.checkpointInfo();
ut.rollback();
tm.rollback();
stepMetrics.increment(Metric.MetricType.ROLLBACK_COUNT, 1);
// Close the reader and writer
try {
Expand Down
Expand Up @@ -36,7 +36,7 @@
import javax.batch.api.partition.PartitionReducer;
import javax.batch.operations.BatchRuntimeException;
import javax.batch.runtime.BatchStatus;
import javax.transaction.UserTransaction;
import javax.transaction.TransactionManager;

import org.jberet._private.BatchLogger;
import org.jberet.job.model.Chunk;
Expand Down Expand Up @@ -75,14 +75,14 @@ public final class StepExecutionRunner extends AbstractRunner<StepContextImpl> i
BlockingQueue<Serializable> collectorDataQueue;
BlockingQueue<Boolean> completedPartitionThreads;

final UserTransaction ut;
final TransactionManager tm;
final StepExecutionImpl stepExecution;

public StepExecutionRunner(final StepContextImpl stepContext, final CompositeExecutionRunner enclosingRunner) {
super(stepContext, enclosingRunner);
this.step = stepContext.getStep();
this.stepExecution = (StepExecutionImpl) stepContext.getStepExecution();
ut = jobContext.getBatchEnvironment().getUserTransaction();
tm = jobContext.getBatchEnvironment().getTransactionManager();
createStepListeners();
initPartitionConfig();
}
Expand Down Expand Up @@ -291,7 +291,7 @@ private void beginPartition() throws Exception {

BatchStatus consolidatedBatchStatus = BatchStatus.STARTED;
final List<PartitionExecutionImpl> fromAllPartitions = new ArrayList<PartitionExecutionImpl>();
ut.begin();
tm.begin();
try {
while (fromAllPartitions.size() < numOfPartitions) {
final Serializable data = collectorDataQueue.take();
Expand Down Expand Up @@ -325,12 +325,12 @@ private void beginPartition() throws Exception {
}

if (consolidatedBatchStatus == BatchStatus.FAILED || consolidatedBatchStatus == BatchStatus.STOPPED) {
ut.rollback();
tm.rollback();
} else {
if (reducer != null) {
reducer.beforePartitionedStepCompletion();
}
ut.commit();
tm.commit();
}
if (reducer != null) {
if (consolidatedBatchStatus == BatchStatus.FAILED || consolidatedBatchStatus == BatchStatus.STOPPED) {
Expand All @@ -344,7 +344,7 @@ private void beginPartition() throws Exception {
consolidatedBatchStatus = BatchStatus.FAILED;
if (reducer != null) {
reducer.rollbackPartitionedStep();
ut.rollback();
tm.rollback();
reducer.afterPartitionedStepCompletion(PartitionReducer.PartitionStatus.ROLLBACK);
}
}
Expand Down
Expand Up @@ -14,6 +14,7 @@
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import javax.transaction.TransactionManager;

public interface BatchEnvironment {
/**
Expand Down Expand Up @@ -70,10 +71,11 @@ public interface BatchEnvironment {
<T> Future<T> submitTask(Callable<T> task);

/**
* Gets the UserTransaction
* @return UserTransaction
* Returns a transaction manager to be used for executions that require a transaction.
*
* @return a transaction manager for the environment
*/
javax.transaction.UserTransaction getUserTransaction();
TransactionManager getTransactionManager();

/**
* Gets configuration data for batch container.
Expand Down
57 changes: 54 additions & 3 deletions jberet-core/src/test/java/org/jberet/test/JobRepositoryTest.java
Expand Up @@ -18,10 +18,14 @@
import java.util.concurrent.Future;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;

import org.jberet.creation.ArchiveXmlLoader;
import org.jberet.job.model.Job;
Expand All @@ -34,6 +38,39 @@
import org.junit.Test;

public class JobRepositoryTest {
static final Transaction NO_OP_TRANSACTION = new Transaction() {
@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException {
}

@Override
public void rollback() throws IllegalStateException, SystemException {
}

@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}

@Override
public int getStatus() throws SystemException {
return 0;
}

@Override
public boolean enlistResource(final XAResource xaRes) throws RollbackException, IllegalStateException, SystemException {
return false;
}

@Override
public boolean delistResource(final XAResource xaRes, final int flag) throws IllegalStateException, SystemException {
return false;
}

@Override
public void registerSynchronization(final Synchronization sync) throws RollbackException, IllegalStateException, SystemException {
}
};

private static JobRepository repo;

@BeforeClass
Expand Down Expand Up @@ -65,8 +102,8 @@ public <T> Future<T> submitTask(final Callable<T> task) {
}

@Override
public UserTransaction getUserTransaction() {
return new UserTransaction() {
public TransactionManager getTransactionManager() {
return new TransactionManager() {
@Override
public void begin() throws NotSupportedException, SystemException {
}
Expand All @@ -88,9 +125,23 @@ public int getStatus() throws SystemException {
return 0;
}

@Override
public Transaction getTransaction() throws SystemException {
return NO_OP_TRANSACTION;
}

@Override
public void setTransactionTimeout(final int seconds) throws SystemException {
}

@Override
public Transaction suspend() throws SystemException {
return NO_OP_TRANSACTION;
}

@Override
public void resume(final Transaction tobj) throws InvalidTransactionException, IllegalStateException, SystemException {
}
};
}

Expand Down