diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/partition/JsrStepExecutionSplitter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/partition/JsrStepExecutionSplitter.java index 257489de85..2f700ab8fa 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/partition/JsrStepExecutionSplitter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/jsr/partition/JsrStepExecutionSplitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2014 the original author or authors. + * Copyright 2013-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.ExecutionContext; +import java.util.Collection; import java.util.Comparator; import java.util.Set; import java.util.TreeSet; @@ -31,6 +32,7 @@ * Provides JSR-352 specific behavior for the splitting of {@link StepExecution}s. * * @author Michael Minella + * @author Mahmoud Ben Hassine * @since 3.0 */ public class JsrStepExecutionSplitter extends SimpleStepExecutionSplitter { @@ -78,13 +80,18 @@ public int compare(StepExecution arg0, StepExecution arg1) { } }); JobExecution jobExecution = stepExecution.getJobExecution(); + Collection allStepExecutions = jobRepository.getStepExecutions(jobExecution.getJobInstance()); for(int i = 0; i < gridSize; i++) { String stepName = this.stepName + ":partition" + i; JobExecution curJobExecution = new JobExecution(jobExecution); StepExecution curStepExecution = new StepExecution(stepName, curJobExecution); - - if(!restoreState || isStartable(curStepExecution, new ExecutionContext())) { + StepExecution lastStepExecution = getLastStepExecution(allStepExecutions, curStepExecution.getStepName()); + if (lastStepExecution != null) { + lastStepExecution.getJobExecution().setExecutionContext(jobRepository.getJobExecutionContext(lastStepExecution.getJobExecution())); + lastStepExecution.setExecutionContext(jobRepository.getStepExecutionContext(lastStepExecution)); + } + if(!restoreState || isStartable(curStepExecution, lastStepExecution, new ExecutionContext())) { executions.add(curStepExecution); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java index 522cfb2fbb..d176870369 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.batch.core.partition.support; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -33,6 +34,7 @@ import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.item.ExecutionContext; import org.springframework.beans.factory.InitializingBean; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -178,14 +180,21 @@ public Set split(StepExecution stepExecution, int gridSize) throw Map contexts = getContexts(stepExecution, gridSize); Set set = new HashSet<>(contexts.size()); + Collection allStepExecutions = jobRepository.getStepExecutions(jobExecution.getJobInstance()); + for (Entry context : contexts.entrySet()) { // Make the step execution name unique and repeatable String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey(); StepExecution currentStepExecution = jobExecution.createStepExecution(stepName); + StepExecution lastStepExecution = getLastStepExecution(allStepExecutions, currentStepExecution.getStepName()); + if (lastStepExecution != null) { + lastStepExecution.getJobExecution().setExecutionContext(jobRepository.getJobExecutionContext(lastStepExecution.getJobExecution())); + lastStepExecution.setExecutionContext(jobRepository.getStepExecutionContext(lastStepExecution)); + } - boolean startable = isStartable(currentStepExecution, context.getValue()); + boolean startable = isStartable(currentStepExecution, lastStepExecution, context.getValue()); if (startable) { set.add(currentStepExecution); @@ -201,6 +210,14 @@ public Set split(StepExecution stepExecution, int gridSize) throw } + @Nullable + protected StepExecution getLastStepExecution(Collection stepExecutions, String stepName) { + return stepExecutions.parallelStream() + .filter(stepExecution -> stepExecution.getStepName().equals(stepName)) + .min(new StepExecutionComparator()) + .orElse(null); + } + private Map getContexts(StepExecution stepExecution, int gridSize) { ExecutionContext context = stepExecution.getExecutionContext(); @@ -239,17 +256,58 @@ private Map getContexts(StepExecution stepExecution, i return result; } + /** + * Check if a step execution is startable. + * @param stepExecution the step execution to check + * @param lastStepExecution the last step execution of the same step + * @param context the execution context of the step + * @return true if the step execution is startable, false otherwise + * @throws JobExecutionException if unable to check if the step execution is startable + */ + protected boolean isStartable(StepExecution stepExecution, StepExecution lastStepExecution, ExecutionContext context) throws JobExecutionException { + return getStartable(stepExecution, lastStepExecution, context); + } + /** * Check if a step execution is startable. * @param stepExecution the step execution to check * @param context the execution context of the step * @return true if the step execution is startable, false otherwise * @throws JobExecutionException if unable to check if the step execution is startable + * @deprecated This method is less performant than and deprecated in favor of + * {@link SimpleStepExecutionSplitter#isStartable(StepExecution, StepExecution, ExecutionContext)} */ + @Deprecated protected boolean isStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException { return getStartable(stepExecution, context); } + /** + * Check if a step execution is startable. + * @param stepExecution the step execution to check + * @param context the execution context of the step + * @return true if the step execution is startable, false otherwise + * @throws JobExecutionException if unable to check if the step execution is startable + * @deprecated This method is deprecated in favor of + * {@link SimpleStepExecutionSplitter#isStartable} and will be removed in a + * future version. + */ + @Deprecated + protected boolean getStartable(StepExecution stepExecution, StepExecution lastStepExecution, ExecutionContext context) throws JobExecutionException { + + boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED); + + if (isRestart) { + stepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); + } + else { + stepExecution.setExecutionContext(context); + } + + return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart; + + } + /** * Check if a step execution is startable. * @param stepExecution the step execution to check @@ -335,4 +393,25 @@ private boolean isSameJobExecution(StepExecution stepExecution, StepExecution la return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId()); } + private class StepExecutionComparator implements Comparator { + + @Override + public int compare(StepExecution stepExecution1, StepExecution stepExecution2) { + long startTime1 = stepExecution1.getStartTime().getTime(); + long startTime2 = stepExecution2.getStartTime().getTime(); + if (startTime1 < startTime2) { + return -1; + } + else { + if (startTime1 > startTime2) { + return 1; + } + else { + // Use step execution ID as the tie breaker if start time is identical + return stepExecution1.getId().compareTo(stepExecution2.getId()); + } + } + } + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/JobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/JobRepository.java index a65a7a51b9..0d716158b0 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/JobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/JobRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -179,6 +179,7 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters) void updateExecutionContext(JobExecution jobExecution); /** + * Get the last step execution by step name for a given job instance. * @param jobInstance {@link JobInstance} instance containing the step executions. * @param stepName the name of the step execution that might have run. * @return the last execution of step for the given job instance. @@ -186,6 +187,15 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters) @Nullable StepExecution getLastStepExecution(JobInstance jobInstance, String stepName); + /** + * Get all step executions for a given job instance. + * @param jobInstance {@link JobInstance} instance containing the step executions. + * @return all step executions for the given job instance. + */ + default Collection getStepExecutions(JobInstance jobInstance) { + throw new UnsupportedOperationException(); + } + /** * @param jobInstance {@link JobInstance} instance containing the step executions. * @param stepName the name of the step execution that might have run. @@ -201,4 +211,22 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters) @Nullable JobExecution getLastJobExecution(String jobName, JobParameters jobParameters); + /** + * Get the execution context of a given job execution. + * @param jobExecution for which the execution context should be returned + * @return the execution context of the given job execution + */ + default ExecutionContext getJobExecutionContext(JobExecution jobExecution) { + throw new UnsupportedOperationException(); + } + + /** + * Get the execution context of a given step execution. + * @param stepExecution for which the execution context should be returned + * @return the execution context of the given step execution + */ + default ExecutionContext getStepExecutionContext(StepExecution stepExecution) { + throw new UnsupportedOperationException(); + } + } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java index 05ef5127f9..5759d82e05 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -256,6 +256,29 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa return latest; } + @Override + public Collection getStepExecutions(JobInstance jobInstance) { + List jobExecutions = jobExecutionDao.findJobExecutions(jobInstance); + List stepExecutions = new ArrayList<>(); + + for (JobExecution jobExecution : jobExecutions) { + stepExecutionDao.addStepExecutions(jobExecution); + stepExecutions.addAll(jobExecution.getStepExecutions()); + } + + return stepExecutions; + } + + @Override + public ExecutionContext getJobExecutionContext(JobExecution jobExecution) { + return ecDao.getExecutionContext(jobExecution); + } + + @Override + public ExecutionContext getStepExecutionContext(StepExecution stepExecution) { + return ecDao.getExecutionContext(stepExecution); + } + /** * @return number of executions of the step within given job instance */ diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/step/JobRepositorySupport.java b/spring-batch-core/src/test/java/org/springframework/batch/core/step/JobRepositorySupport.java index 55339d9f83..aff32918d3 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/step/JobRepositorySupport.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/step/JobRepositorySupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2013 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,19 @@ package org.springframework.batch.core.step; import java.util.Collection; +import java.util.Collections; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.item.ExecutionContext; /** * @author Dave Syer * @author David Turanski + * @author Mahmoud Ben Hassine * */ public class JobRepositorySupport implements JobRepository { @@ -114,4 +117,19 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jobParameters, String jobConfigurationLocation) { return null; } + + @Override + public Collection getStepExecutions(JobInstance jobInstance) { + return Collections.emptyList(); + } + + @Override + public ExecutionContext getJobExecutionContext(JobExecution jobExecution) { + return new ExecutionContext(); + } + + @Override + public ExecutionContext getStepExecutionContext(StepExecution stepExecution) { + return new ExecutionContext(); + } }