diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/FaultTolerantJobLauncher.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/FaultTolerantJobLauncher.java new file mode 100644 index 0000000000..f1b267bea1 --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/FaultTolerantJobLauncher.java @@ -0,0 +1,230 @@ +package org.springframework.batch.core.launch.support; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.batch.core.*; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.task.SyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.core.task.TaskRejectedException; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.Assert; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Simple implementation of the {@link JobLauncher} interface. The Spring Core + * {@link TaskExecutor} interface is used to launch a {@link Job}. This means + * that the type of executor set is very important. If a + * {@link SyncTaskExecutor} is used, then the job will be processed + * within the same thread that called the launcher. Care should + * be taken to ensure any users of this class understand fully whether or not + * the implementation of TaskExecutor used will start tasks synchronously or + * asynchronously. The default setting uses a synchronous task executor. + * + * There are two required dependencies of this Launcher, a + * {@link JobRepository} and {@link RetryTemplate}. The JobRepository is used to + * obtain a valid JobExecution. The Repository must be used because the provided + * {@link Job} could be a restart of an existing {@link JobInstance}, and only the + * Repository can reliably recreate it. The RetryTemplate is used to automatically + * restart and retry the failed job. + * + * @see JobRepository + * @see TaskExecutor + */ +public class FaultTolerantJobLauncher implements JobLauncher, InitializingBean { + + protected static final Log logger = LogFactory.getLog(FaultTolerantJobLauncher.class); + + private JobRepository jobRepository; + + private RetryTemplate retryTemplate; + + private TaskExecutor taskExecutor; + + /** + * Run the provided job with the given {@link JobParameters}. The + * {@link JobParameters} will be used to determine if this is an execution + * of an existing job instance, or if a new one should be created. + * + * @param job the job to be run. + * @param jobParameters the {@link JobParameters} for this particular + * execution. + * @return JobExecutionAlreadyRunningException if the JobInstance already + * exists and has an execution already running. + * @throws JobRestartException if the execution would be a re-start, but a + * re-start is either not allowed or not needed. + * @throws JobInstanceAlreadyCompleteException if this instance has already + * completed successfully + * @throws JobParametersInvalidException + */ + @Override + public JobExecution run(final Job job, final JobParameters jobParameters) + throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, + JobParametersInvalidException { + + Assert.notNull(job, "The Job must not be null."); + Assert.notNull(jobParameters, "The JobParameters must not be null."); + + final AtomicReference executionReference = new AtomicReference<>(); + JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); + if (lastExecution != null) { + if (!job.isRestartable()) { + throw new JobRestartException("JobInstance already exists and is not restartable"); + } + /* + * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING + * retrieve the previous execution and check + */ + for (StepExecution execution : lastExecution.getStepExecutions()) { + BatchStatus status = execution.getStatus(); + if (status.isRunning() || status == BatchStatus.STOPPING) { + throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " + + lastExecution); + } else if (status == BatchStatus.UNKNOWN) { + throw new JobRestartException( + "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. " + + "The last execution ended with a failure that could not be rolled back, " + + "so it may be dangerous to proceed. Manual intervention is probably necessary."); + } + } + } + + // Check the validity of the parameters before doing creating anything + // in the repository... + job.getJobParametersValidator().validate(jobParameters); + + taskExecutor.execute(new Runnable() { + @Override + public void run() { + try { + retryTemplate.execute(new FaultTolerantJobRetryCallback(executionReference, job, jobParameters)); + } catch (TaskRejectedException e) { + executionReference.get().upgradeStatus(BatchStatus.FAILED); + if (executionReference.get().getExitStatus().equals(ExitStatus.UNKNOWN)) { + executionReference.get().setExitStatus(ExitStatus.FAILED.addExitDescription(e)); + } + jobRepository.update(executionReference.get()); + } + } + }); + + return executionReference.get(); + } + + /** + * Set the JobRepsitory. + * + * @param jobRepository + */ + public void setJobRepository(JobRepository jobRepository) { + this.jobRepository = jobRepository; + } + + /** + * Set the retryTemplate + * + * @param retryTemplate + */ + public void setRetryTemplate(RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; + } + + /** + * Set the TaskExecutor. (Optional) + * + * @param taskExecutor + */ + public void setTaskExecutor(TaskExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + + /** + * Ensure the required dependencies of a {@link JobRepository} have been + * set. + */ + @Override + public void afterPropertiesSet() throws Exception { + Assert.state(jobRepository != null, "A JobRepository has not been set."); + Assert.state(retryTemplate != null, "A RetryTemplate has not been set."); + if (taskExecutor == null) { + logger.info("No TaskExecutor has been set, defaulting to synchronous executor."); + taskExecutor = new SyncTaskExecutor(); + } + } + + private class FaultTolerantJobRetryCallback implements RetryCallback { + + private final AtomicReference executionReference; + private final Job job; + private final JobParameters jobParameters; + + FaultTolerantJobRetryCallback(AtomicReference executionReference, Job job, JobParameters jobParameters){ + this.executionReference = executionReference; + this.job = job; + this.jobParameters = jobParameters; + } + + @Override + public Object doWithRetry(RetryContext retryContext) { + if(!job.isRestartable()){ + //will be set only once and in case that job can not be restarted we don't retry + retryContext.setExhaustedOnly(); + } + + if(retryContext.getRetryCount() > 0){ + logger.info("Job: [" + job + "] retrying/restarting with the following parameters: [" + jobParameters + + "]"); + } + + try { + /* + * There is a very small probability that a non-restartable job can be + * restarted, but only if another process or thread manages to launch + * and fail a job execution for this instance between the last + * assertion and the next method returning successfully. + */ + executionReference.set(jobRepository.createJobExecution(job.getName(), jobParameters)); + logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters + + "]"); + job.execute(executionReference.get()); + logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters + + "] and the following status: [" + executionReference.get().getStatus() + "]"); + } + catch (JobInstanceAlreadyCompleteException | JobExecutionAlreadyRunningException e){ + retryContext.setExhaustedOnly(); //don't repeat if instance already complete or running + rethrow(e); + } + catch (Throwable t) { + logger.info("Job: [" + job + + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters + + "]", t); + rethrow(t); + } + + if(job.isRestartable() && executionReference.get().getStatus() == BatchStatus.FAILED){ + //if job failed and can be restarted, use retry template to restart the job + throw new TaskRejectedException("RetryTemplate failed too many times"); + } + + return null; + } + + private void rethrow(Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + else if (t instanceof Error) { + throw (Error) t; + } + throw new IllegalStateException(t); + } + } +} diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/launch/FaultTolerantJobLauncherTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/FaultTolerantJobLauncherTests.java new file mode 100644 index 0000000000..03fd86e55c --- /dev/null +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/launch/FaultTolerantJobLauncherTests.java @@ -0,0 +1,333 @@ +/* + * Copyright 2006-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.core.launch; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.batch.core.*; +import org.springframework.batch.core.job.DefaultJobParametersValidator; +import org.springframework.batch.core.job.JobSupport; +import org.springframework.batch.core.launch.support.FaultTolerantJobLauncher; +import org.springframework.batch.core.launch.support.SimpleJobLauncher; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.core.task.TaskExecutor; +import org.springframework.core.task.TaskRejectedException; +import org.springframework.retry.backoff.FixedBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * @author Lucas Ward + * @author Will Schipp + * + */ +public class FaultTolerantJobLauncherTests { + + private FaultTolerantJobLauncher jobLauncher; + + private RetryTemplate retryTemplate; + + private JobSupport job = new JobSupport("foo") { + @Override + public void execute(JobExecution execution) { + execution.setExitStatus(ExitStatus.COMPLETED); + return; + } + }; + + private JobParameters jobParameters = new JobParameters(); + + private JobRepository jobRepository; + + @Before + public void setUp() throws Exception { + + retryTemplate = new RetryTemplate(); + FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); + backOffPolicy.setBackOffPeriod(1); + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(2); + retryTemplate.setRetryPolicy(retryPolicy); + retryTemplate.setBackOffPolicy(backOffPolicy); + + jobLauncher = new FaultTolerantJobLauncher(); + jobLauncher.setRetryTemplate(retryTemplate); + jobRepository = mock(JobRepository.class); + jobLauncher.setJobRepository(jobRepository); + + } + + @Test + public void testRun() throws Exception { + run(ExitStatus.COMPLETED); + } + + @Test(expected = JobParametersInvalidException.class) + public void testRunWithValidator() throws Exception { + + job.setJobParametersValidator(new DefaultJobParametersValidator(new String[] { "missing-and-required" }, + new String[0])); + + when(jobRepository.getLastJobExecution(job.getName(), jobParameters)).thenReturn(null); + + jobLauncher.afterPropertiesSet(); + jobLauncher.run(job, jobParameters); + + } + + @Test + public void testRunRestartableJobInstanceTwice() throws Exception { + job = new JobSupport("foo") { + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void execute(JobExecution execution) { + execution.setExitStatus(ExitStatus.COMPLETED); + return; + } + }; + + testRun(); + when(jobRepository.getLastJobExecution(job.getName(), jobParameters)).thenReturn( + new JobExecution(new JobInstance(1L, job.getName()), jobParameters)); + when(jobRepository.createJobExecution(job.getName(), jobParameters)).thenReturn( + new JobExecution(new JobInstance(1L, job.getName()), jobParameters)); + jobLauncher.run(job, jobParameters); + } + + /* + * Non-restartable JobInstance can be run only once - attempt to run + * existing non-restartable JobInstance causes error. + */ + @Test + public void testRunNonRestartableJobInstanceTwice() throws Exception { + job = new JobSupport("foo") { + @Override + public boolean isRestartable() { + return false; + } + + @Override + public void execute(JobExecution execution) { + execution.setExitStatus(ExitStatus.COMPLETED); + return; + } + }; + + testRun(); + try { + when(jobRepository.getLastJobExecution(job.getName(), jobParameters)).thenReturn( + new JobExecution(new JobInstance(1L, job.getName()), jobParameters)); + jobLauncher.run(job, jobParameters); + fail("Expected JobRestartException"); + } + catch (JobRestartException e) { + // expected + } + } + + @Test + public void testTaskExecutor() throws Exception { + final List list = new ArrayList(); + jobLauncher.setTaskExecutor(new TaskExecutor() { + @Override + public void execute(Runnable task) { + list.add("execute"); + task.run(); + } + }); + testRun(); + assertEquals(1, list.size()); + } + + @Test + public void testTaskExecutorRejects() throws Exception { + + JobExecution jobExecution = new JobExecution((JobInstance) null, (JobParameters) null); + + final List list = new ArrayList(); + jobLauncher.setTaskExecutor(new TaskExecutor() { + @Override + public void execute(Runnable task) { + try { + list.add("execute"); + throw new TaskRejectedException("Planned failure"); + } catch (TaskRejectedException e) { + jobExecution.upgradeStatus(BatchStatus.FAILED); + if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { + jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); + } + jobRepository.update(jobExecution); + } + } + }); + + when(jobRepository.getLastJobExecution(job.getName(), jobParameters)).thenReturn(null); + when(jobRepository.createJobExecution(job.getName(), jobParameters)).thenReturn(jobExecution); + jobRepository.update(jobExecution); + + jobLauncher.afterPropertiesSet(); + try { + jobLauncher.run(job, jobParameters); + } + finally { + assertEquals(BatchStatus.FAILED, jobExecution.getStatus()); + assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode()); + } + + assertEquals(1, list.size()); + + } + + @Test + public void testRunWithException() throws Exception { + job = new JobSupport() { + @Override + public void execute(JobExecution execution) { + execution.setExitStatus(ExitStatus.FAILED); + throw new RuntimeException("foo"); + } + }; + try { + run(ExitStatus.FAILED); + fail("Expected RuntimeException"); + } + catch (RuntimeException e) { + assertEquals("Retry exhausted after last attempt with no recovery path.; nested exception is java.lang.RuntimeException: foo", e.getMessage()); + } + } + + @Test + public void testRunWithError() throws Exception { + job = new JobSupport() { + @Override + public void execute(JobExecution execution) { + execution.setExitStatus(ExitStatus.FAILED); + throw new Error("foo"); + } + }; + try { + run(ExitStatus.FAILED); + fail("Expected Error"); + } + catch (RuntimeException e) { + assertEquals("Retry exhausted after last attempt with no recovery path.; nested exception is java.lang.Error: foo", e.getMessage()); + } + } + + @Test + public void testInitialiseWithoutRepository() throws Exception { + try { + new SimpleJobLauncher().afterPropertiesSet(); + fail("Expected IllegalArgumentException"); + } + catch (IllegalStateException e) { + // expected + assertTrue("Message did not contain repository: " + e.getMessage(), contains(e.getMessage().toLowerCase(), + "repository")); + } + } + + @Test + public void testInitialiseWithRepository() throws Exception { + jobLauncher = new FaultTolerantJobLauncher(); + jobLauncher.setRetryTemplate(retryTemplate); + jobLauncher.setJobRepository(jobRepository); + jobLauncher.afterPropertiesSet(); // no error + } + + private void run(ExitStatus exitStatus) throws Exception { + JobExecution jobExecution = new JobExecution((JobInstance) null, (JobParameters) null); + + when(jobRepository.getLastJobExecution(job.getName(), jobParameters)).thenReturn(null); + when(jobRepository.createJobExecution(job.getName(), jobParameters)).thenReturn(jobExecution); + + jobLauncher.afterPropertiesSet(); + try { + jobLauncher.run(job, jobParameters); + } + finally { + assertEquals(exitStatus, jobExecution.getExitStatus()); + } + } + + private boolean contains(String str, String searchStr) { + return str.indexOf(searchStr) != -1; + } + + /** + * Test to support BATCH-1770 -> throw in parent thread JobRestartException when + * a stepExecution is UNKNOWN + */ + @Test(expected=JobRestartException.class) + public void testRunStepStatusUnknown() throws Exception { + testRestartStepExecutionInvalidStatus(BatchStatus.UNKNOWN); + } + + @Test(expected = JobExecutionAlreadyRunningException.class) + public void testRunStepStatusStarting() throws Exception { + testRestartStepExecutionInvalidStatus(BatchStatus.STARTING); + } + + @Test(expected = JobExecutionAlreadyRunningException.class) + public void testRunStepStatusStarted() throws Exception { + testRestartStepExecutionInvalidStatus(BatchStatus.STARTED); + } + + @Test(expected = JobExecutionAlreadyRunningException.class) + public void testRunStepStatusStopping() throws Exception { + testRestartStepExecutionInvalidStatus(BatchStatus.STOPPING); + } + + private void testRestartStepExecutionInvalidStatus(BatchStatus status) throws Exception { + String jobName = "test_job"; + JobRepository jobRepository = mock(JobRepository.class); + JobParameters parameters = new JobParametersBuilder().addLong("runtime", System.currentTimeMillis()).toJobParameters(); + JobExecution jobExecution = mock(JobExecution.class); + Job job = mock(Job.class); + JobParametersValidator validator = mock(JobParametersValidator.class); + StepExecution stepExecution = mock(StepExecution.class); + + when(job.getName()).thenReturn(jobName); + when(job.isRestartable()).thenReturn(true); + when(job.getJobParametersValidator()).thenReturn(validator); + when(jobRepository.getLastJobExecution(jobName, parameters)).thenReturn(jobExecution); + when(stepExecution.getStatus()).thenReturn(status); + when(jobExecution.getStepExecutions()).thenReturn(Arrays.asList(stepExecution)); + + //setup launcher + jobLauncher = new FaultTolerantJobLauncher(); + jobLauncher.setRetryTemplate(retryTemplate); + jobLauncher.setJobRepository(jobRepository); + + //run + jobLauncher.run(job, parameters); + } +}