Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2014 the original author or authors.
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ExecutionContext;

import java.util.Collection;
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
Expand All @@ -31,6 +32,7 @@
* Provides JSR-352 specific behavior for the splitting of {@link StepExecution}s.
*
* @author Michael Minella
* @author Mahmoud Ben Hassine
* @since 3.0
*/
public class JsrStepExecutionSplitter extends SimpleStepExecutionSplitter {
Expand Down Expand Up @@ -78,13 +80,18 @@ public int compare(StepExecution arg0, StepExecution arg1) {
}
});
JobExecution jobExecution = stepExecution.getJobExecution();
Collection<StepExecution> allStepExecutions = jobRepository.getStepExecutions(jobExecution.getJobInstance());

for(int i = 0; i < gridSize; i++) {
String stepName = this.stepName + ":partition" + i;
JobExecution curJobExecution = new JobExecution(jobExecution);
StepExecution curStepExecution = new StepExecution(stepName, curJobExecution);

if(!restoreState || isStartable(curStepExecution, new ExecutionContext())) {
StepExecution lastStepExecution = getLastStepExecution(allStepExecutions, curStepExecution.getStepName());
if (lastStepExecution != null) {
lastStepExecution.getJobExecution().setExecutionContext(jobRepository.getJobExecutionContext(lastStepExecution.getJobExecution()));
lastStepExecution.setExecutionContext(jobRepository.getStepExecutionContext(lastStepExecution));
}
if(!restoreState || isStartable(curStepExecution, lastStepExecution, new ExecutionContext())) {
executions.add(curStepExecution);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.batch.core.partition.support;

import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -178,14 +180,21 @@ public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throw
Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize);
Set<StepExecution> set = new HashSet<>(contexts.size());

Collection<StepExecution> allStepExecutions = jobRepository.getStepExecutions(jobExecution.getJobInstance());

for (Entry<String, ExecutionContext> context : contexts.entrySet()) {

// Make the step execution name unique and repeatable
String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();

StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);
StepExecution lastStepExecution = getLastStepExecution(allStepExecutions, currentStepExecution.getStepName());
if (lastStepExecution != null) {
lastStepExecution.getJobExecution().setExecutionContext(jobRepository.getJobExecutionContext(lastStepExecution.getJobExecution()));
lastStepExecution.setExecutionContext(jobRepository.getStepExecutionContext(lastStepExecution));
}

boolean startable = isStartable(currentStepExecution, context.getValue());
boolean startable = isStartable(currentStepExecution, lastStepExecution, context.getValue());

if (startable) {
set.add(currentStepExecution);
Expand All @@ -201,6 +210,14 @@ public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throw

}

@Nullable
protected StepExecution getLastStepExecution(Collection<StepExecution> stepExecutions, String stepName) {
return stepExecutions.parallelStream()
.filter(stepExecution -> stepExecution.getStepName().equals(stepName))
.min(new StepExecutionComparator())
.orElse(null);
}

private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {

ExecutionContext context = stepExecution.getExecutionContext();
Expand Down Expand Up @@ -239,17 +256,58 @@ private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, i
return result;
}

/**
* Check if a step execution is startable.
* @param stepExecution the step execution to check
* @param lastStepExecution the last step execution of the same step
* @param context the execution context of the step
* @return true if the step execution is startable, false otherwise
* @throws JobExecutionException if unable to check if the step execution is startable
*/
protected boolean isStartable(StepExecution stepExecution, StepExecution lastStepExecution, ExecutionContext context) throws JobExecutionException {
return getStartable(stepExecution, lastStepExecution, context);
}

/**
* Check if a step execution is startable.
* @param stepExecution the step execution to check
* @param context the execution context of the step
* @return true if the step execution is startable, false otherwise
* @throws JobExecutionException if unable to check if the step execution is startable
* @deprecated This method is less performant than and deprecated in favor of
* {@link SimpleStepExecutionSplitter#isStartable(StepExecution, StepExecution, ExecutionContext)}
*/
@Deprecated
protected boolean isStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
return getStartable(stepExecution, context);
}

/**
* Check if a step execution is startable.
* @param stepExecution the step execution to check
* @param context the execution context of the step
* @return true if the step execution is startable, false otherwise
* @throws JobExecutionException if unable to check if the step execution is startable
* @deprecated This method is deprecated in favor of
* {@link SimpleStepExecutionSplitter#isStartable} and will be removed in a
* future version.
*/
@Deprecated
protected boolean getStartable(StepExecution stepExecution, StepExecution lastStepExecution, ExecutionContext context) throws JobExecutionException {

boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED);

if (isRestart) {
stepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
}
else {
stepExecution.setExecutionContext(context);
}

return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart;

}

/**
* Check if a step execution is startable.
* @param stepExecution the step execution to check
Expand Down Expand Up @@ -335,4 +393,25 @@ private boolean isSameJobExecution(StepExecution stepExecution, StepExecution la
return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId());
}

private class StepExecutionComparator implements Comparator<StepExecution> {

@Override
public int compare(StepExecution stepExecution1, StepExecution stepExecution2) {
long startTime1 = stepExecution1.getStartTime().getTime();
long startTime2 = stepExecution2.getStartTime().getTime();
if (startTime1 < startTime2) {
return -1;
}
else {
if (startTime1 > startTime2) {
return 1;
}
else {
// Use step execution ID as the tie breaker if start time is identical
return stepExecution1.getId().compareTo(stepExecution2.getId());
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -179,13 +179,23 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters)
void updateExecutionContext(JobExecution jobExecution);

/**
* Get the last step execution by step name for a given job instance.
* @param jobInstance {@link JobInstance} instance containing the step executions.
* @param stepName the name of the step execution that might have run.
* @return the last execution of step for the given job instance.
*/
@Nullable
StepExecution getLastStepExecution(JobInstance jobInstance, String stepName);

/**
* Get all step executions for a given job instance.
* @param jobInstance {@link JobInstance} instance containing the step executions.
* @return all step executions for the given job instance.
*/
default Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
throw new UnsupportedOperationException();
}

/**
* @param jobInstance {@link JobInstance} instance containing the step executions.
* @param stepName the name of the step execution that might have run.
Expand All @@ -201,4 +211,22 @@ JobExecution createJobExecution(String jobName, JobParameters jobParameters)
@Nullable
JobExecution getLastJobExecution(String jobName, JobParameters jobParameters);

/**
* Get the execution context of a given job execution.
* @param jobExecution for which the execution context should be returned
* @return the execution context of the given job execution
*/
default ExecutionContext getJobExecutionContext(JobExecution jobExecution) {
throw new UnsupportedOperationException();
}

/**
* Get the execution context of a given step execution.
* @param stepExecution for which the execution context should be returned
* @return the execution context of the given step execution
*/
default ExecutionContext getStepExecutionContext(StepExecution stepExecution) {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -256,6 +256,29 @@ public StepExecution getLastStepExecution(JobInstance jobInstance, String stepNa
return latest;
}

@Override
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
List<StepExecution> stepExecutions = new ArrayList<>();

for (JobExecution jobExecution : jobExecutions) {
stepExecutionDao.addStepExecutions(jobExecution);
stepExecutions.addAll(jobExecution.getStepExecutions());
}

return stepExecutions;
}

@Override
public ExecutionContext getJobExecutionContext(JobExecution jobExecution) {
return ecDao.getExecutionContext(jobExecution);
}

@Override
public ExecutionContext getStepExecutionContext(StepExecution stepExecution) {
return ecDao.getExecutionContext(stepExecution);
}

/**
* @return number of executions of the step within given job instance
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2013 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,16 +16,19 @@
package org.springframework.batch.core.step;

import java.util.Collection;
import java.util.Collections;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ExecutionContext;

/**
* @author Dave Syer
* @author David Turanski
* @author Mahmoud Ben Hassine
*
*/
public class JobRepositorySupport implements JobRepository {
Expand Down Expand Up @@ -114,4 +117,19 @@ public JobExecution createJobExecution(JobInstance jobInstance,
JobParameters jobParameters, String jobConfigurationLocation) {
return null;
}

@Override
public Collection<StepExecution> getStepExecutions(JobInstance jobInstance) {
return Collections.emptyList();
}

@Override
public ExecutionContext getJobExecutionContext(JobExecution jobExecution) {
return new ExecutionContext();
}

@Override
public ExecutionContext getStepExecutionContext(StepExecution stepExecution) {
return new ExecutionContext();
}
}