Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.jsr.launch.JsrJobOperator;
import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ExecutionContext;

import java.util.Collection;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -78,13 +80,15 @@ public int compare(StepExecution arg0, StepExecution arg1) {
}
});
JobExecution jobExecution = stepExecution.getJobExecution();
JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
Collection<StepExecution> allPriorStepExecutions = jobRepository.getStepExecutions(jobInstance);

for(int i = 0; i < gridSize; i++) {
String stepName = this.stepName + ":partition" + i;
JobExecution curJobExecution = new JobExecution(jobExecution);
StepExecution curStepExecution = new StepExecution(stepName, curJobExecution);

if(!restoreState || isStartable(curStepExecution, new ExecutionContext())) {
if(!restoreState || isStartable(curStepExecution, new ExecutionContext(), allPriorStepExecutions)) {
executions.add(curStepExecution);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,17 @@ public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throw
Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize);
Set<StepExecution> set = new HashSet<>(contexts.size());

JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
Collection<StepExecution> allPriorStepExecutions = jobRepository.getStepExecutions(jobInstance);

for (Entry<String, ExecutionContext> context : contexts.entrySet()) {

// Make the step execution name unique and repeatable
String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();

StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);

boolean startable = isStartable(currentStepExecution, context.getValue());
boolean startable = isStartable(currentStepExecution, context.getValue(), allPriorStepExecutions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isStartable requires the current step execution as well as the latest one with the same name. So there is no need to pass the entire collection of step executions here. You can get the last one with the protected utility method getLastStepExecution and pass it to isStartable.


if (startable) {
set.add(currentStepExecution);
Expand Down Expand Up @@ -245,9 +248,29 @@ private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, i
* @param context the execution context of the step
* @return true if the step execution is startable, false otherwise
* @throws JobExecutionException if unable to check if the step execution is startable
* @deprecated This method is less performant than and deprecated in favor of
* {@link SimpleStepExecutionSplitter#isStartable(StepExecution, ExecutionContext, Collection)}
*/
@Deprecated
protected boolean isStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
return getStartable(stepExecution, context);
JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
String stepName = stepExecution.getStepName();
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName);
return isStartable(stepExecution, context, lastStepExecution);
}

/**
* Check if a step execution is startable.
* @param stepExecution the step execution to check
* @param context the execution context of the step
* @param allPriorStepExecutions all the step executions in the job repository at this moment to compare against
* @return true if the step execution is startable, false otherwise
* @throws JobExecutionException if unable to check if the step execution is startable
*/
protected boolean isStartable(StepExecution stepExecution, ExecutionContext context, Collection<StepExecution> allPriorStepExecutions) throws JobExecutionException {
String stepName = stepExecution.getStepName();
StepExecution lastStepExecution = jobRepository.getLastStepExecution(allPriorStepExecutions, stepName);
return isStartable(stepExecution, context, lastStepExecution);
}

/**
Expand All @@ -262,11 +285,10 @@ protected boolean isStartable(StepExecution stepExecution, ExecutionContext cont
*/
@Deprecated
protected boolean getStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
return isStartable(stepExecution, context);
}

JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
String stepName = stepExecution.getStepName();
StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName);

private boolean isStartable(StepExecution stepExecution, ExecutionContext context, StepExecution lastStepExecution) throws JobExecutionException {
boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED);

if (isRestart) {
Expand All @@ -277,7 +299,6 @@ protected boolean getStartable(StepExecution stepExecution, ExecutionContext con
}

return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart;

}

private boolean shouldStart(boolean allowStartIfComplete, StepExecution stepExecution, StepExecution lastStepExecution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,28 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters)
*/
void updateExecutionContext(JobExecution jobExecution);

/**
* @return all step executions, from all job executions, for the given job instance.
*/
Collection<StepExecution> getStepExecutions(JobInstance jobInstance);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use Java 8 "default method in interface" for backward compatibility here. For example:

default Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
   throw new UnsupportedOperationException();
}


/**
* @param jobInstance {@link JobInstance} instance containing the step executions.
* @param stepName the name of the step execution that might have run.
* @return the last execution of step for the given job instance.
* @return the last execution of step for the given job instance, null otherwise.
*/
@Nullable
StepExecution getLastStepExecution(JobInstance jobInstance, String stepName);

/**
*
* @param stepExecutions all step executions to filter through
* @param stepName the name of the step execution that might have run.
* @return the last execution of step inside the list, null otherwise.
*/
@Nullable
StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filtering a collection of objects by name is not the job of JobRepository. This method could be in a utility class or private/protected to a given implementation, but not part of the JobRepository contract. Please remove the method from the interface and update the implementations accordingly.

Here is how I would do it in SimpleStepExecutionSplitter:

@Nullable
protected StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName) {
   return stepExecutions.stream()
      .filter(stepExecution -> stepExecution.getStepName().equals(stepName))
      .min(new StepExecutionComparator())
      .orElse(null);
}

The code of the comparator is what is done here: https://github.com/spring-projects/spring-batch/blob/master/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java#L236-L246


/**
* @param jobInstance {@link JobInstance} instance containing the step executions.
* @param stepName the name of the step execution that might have run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,31 @@ public void updateExecutionContext(JobExecution jobExecution) {
}

@Override
@Nullable
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
List<StepExecution> stepExecutions = new ArrayList<>(jobExecutions.size());

for (JobExecution jobExecution : jobExecutions) {
stepExecutionDao.addStepExecutions(jobExecution);
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepName.equals(stepExecution.getStepName())) {
stepExecutions.add(stepExecution);
}
stepExecutions.addAll(jobExecution.getStepExecutions());
}
return stepExecutions;
}

@Override
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
Collection<StepExecution> stepExecutions = getStepExecutions(jobInstance);
return getLastStepExecution(stepExecutions, stepName);
}

@Override
public StepExecution getLastStepExecution(Collection<StepExecution> allStepExecutions, String stepName) {

List<StepExecution> stepExecutions = new ArrayList<StepExecution>();

for (StepExecution stepExecution : allStepExecutions) {
if (stepName.equals(stepExecution.getStepName())) {
stepExecutions.add(stepExecution);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
return null;
}

@Override
public StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName) {
return null;
}

@Override
public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
return 0;
Expand All @@ -91,6 +96,11 @@ public void updateExecutionContext(StepExecution stepExecution) {
public void updateExecutionContext(JobExecution jobExecution) {
}

@Override
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be removed since we will use the default method in interface.

}

@Override
public void addAll(Collection<StepExecution> stepExecutions) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
return null;
}

@Override
public StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName) {
return null;
}

@Override
public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
return 0;
Expand Down Expand Up @@ -99,6 +104,11 @@ public JobExecution getLastJobExecution(String jobName, JobParameters jobParamet
public void updateExecutionContext(JobExecution jobExecution) {
}

@Override
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return an empty collection.

}

@Override
public void addAll(Collection<StepExecution> stepExecutions) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
return null;
}

@Override
public StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName) {
return null;
}

@Override
public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
return 0;
Expand Down Expand Up @@ -557,6 +562,11 @@ public JobExecution getLastJobExecution(String jobName, JobParameters jobParamet
public void updateExecutionContext(JobExecution jobExecution) {
}

@Override
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be removed since we will use the default method in interface.

return null;
}

@Override
public void addAll(Collection<StepExecution> stepExecutions) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
return null;
}

@Override
public StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName) {
return null;
}

/* (non-Javadoc)
* @see org.springframework.batch.core.repository.JobRepository#getStepExecutionCount(org.springframework.batch.core.JobInstance, org.springframework.batch.core.Step)
*/
Expand Down Expand Up @@ -75,6 +80,11 @@ public void updateExecutionContext(StepExecution stepExecution) {
public void updateExecutionContext(JobExecution jobExecution) {
}

@Override
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be removed since we will use the default method in interface.

}

public void add(StepExecution stepExecution) {
}

Expand Down