Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
*
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
* @since 2.0
*/
public class SimpleStepExecutionSplitter implements StepExecutionSplitter {
Expand Down Expand Up @@ -138,13 +139,15 @@ public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throw
if (lastStepExecution == null) { // fresh start
StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution);
currentStepExecution.setExecutionContext(context.getValue());
jobRepository.updateExecutionContext(currentStepExecution);
set.add(currentStepExecution);
}
else { // restart
if (lastStepExecution.getStatus() != BatchStatus.COMPLETED
&& shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) {
StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution);
currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
jobRepository.updateExecutionContext(currentStepExecution);
set.add(currentStepExecution);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* @author Mahmoud Ben Hassine
* @author Parikshit Dutta
* @author Glenn Renfro
* @author Yanming Zhou
* @see JobExplorer
* @see JobInstanceDao
* @see JobExecutionDao
Expand Down Expand Up @@ -157,9 +158,9 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException {
public List<JobExecution> getJobExecutions(JobInstance jobInstance) {
List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
for (JobExecution jobExecution : executions) {
getJobExecutionDependencies(jobExecution);
fillJobExecutionDependencies(jobExecution);
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
getStepExecutionDependencies(stepExecution);
fillStepExecutionDependencies(stepExecution);
}
}
return executions;
Expand All @@ -170,9 +171,9 @@ public List<JobExecution> getJobExecutions(JobInstance jobInstance) {
public JobExecution getLastJobExecution(JobInstance jobInstance) {
JobExecution lastJobExecution = jobExecutionDao.getLastJobExecution(jobInstance);
if (lastJobExecution != null) {
getJobExecutionDependencies(lastJobExecution);
fillJobExecutionDependencies(lastJobExecution);
for (StepExecution stepExecution : lastJobExecution.getStepExecutions()) {
getStepExecutionDependencies(stepExecution);
fillStepExecutionDependencies(stepExecution);
}
}
return lastJobExecution;
Expand All @@ -198,7 +199,7 @@ public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);

if (jobExecution != null) {
getJobExecutionDependencies(jobExecution);
fillJobExecutionDependencies(jobExecution);
}
return jobExecution;
}
Expand All @@ -207,9 +208,9 @@ public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
public Set<JobExecution> findRunningJobExecutions(@Nullable String jobName) {
Set<JobExecution> executions = jobExecutionDao.findRunningJobExecutions(jobName);
for (JobExecution jobExecution : executions) {
getJobExecutionDependencies(jobExecution);
fillJobExecutionDependencies(jobExecution);
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
getStepExecutionDependencies(stepExecution);
fillStepExecutionDependencies(stepExecution);
}
}
return executions;
Expand All @@ -222,20 +223,18 @@ public JobExecution getJobExecution(long executionId) {
if (jobExecution == null) {
return null;
}
getJobExecutionDependencies(jobExecution);
fillJobExecutionDependencies(jobExecution);
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
getStepExecutionDependencies(stepExecution);
fillStepExecutionDependencies(stepExecution);
}
return jobExecution;
}

/*
* Find all dependencies for a JobExecution, including JobInstance (which requires
* Fill all dependencies for a JobExecution, including JobInstance (which requires
* JobParameters) plus StepExecutions
*/
// TODO rename to something more representative of what it does (side effect on the
// parameter)
private void getJobExecutionDependencies(JobExecution jobExecution) {
protected void fillJobExecutionDependencies(JobExecution jobExecution) {
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobExecution);
jobExecution.setJobInstance(jobInstance);
jobExecution.addStepExecutions(stepExecutionDao.getStepExecutions(jobExecution));
Expand All @@ -257,9 +256,9 @@ public StepExecution getStepExecution(long jobExecutionId, long executionId) {
if (jobExecution == null) {
return null;
}
getJobExecutionDependencies(jobExecution);
fillJobExecutionDependencies(jobExecution);
StepExecution stepExecution = stepExecutionDao.getStepExecution(jobExecution, executionId);
getStepExecutionDependencies(stepExecution);
fillStepExecutionDependencies(stepExecution);
return stepExecution;
}

Expand All @@ -268,8 +267,7 @@ public StepExecution getStepExecution(long jobExecutionId, long executionId) {
StepExecution latest = stepExecutionDao.getLastStepExecution(jobInstance, stepName);

if (latest != null) {
ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest);
latest.setExecutionContext(stepExecutionContext);
fillStepExecutionDependencies(latest);
ExecutionContext jobExecutionContext = ecDao.getExecutionContext(latest.getJobExecution());
latest.getJobExecution().setExecutionContext(jobExecutionContext);
}
Expand All @@ -287,7 +285,7 @@ public long getStepExecutionCount(JobInstance jobInstance, String stepName) thro
return stepExecutionDao.countStepExecutions(jobInstance, stepName);
}

private void getStepExecutionDependencies(StepExecution stepExecution) {
protected void fillStepExecutionDependencies(StepExecution stepExecution) {
if (stepExecution != null) {
stepExecution.setExecutionContext(ecDao.getExecutionContext(stepExecution));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* @author Baris Cubukcuoglu
* @author Parikshit Dutta
* @author Mark John Moreno
* @author Yanming Zhou
* @see JobRepository
* @see JobInstanceDao
* @see JobExecutionDao
Expand Down Expand Up @@ -82,7 +83,13 @@ public List<JobInstance> findJobInstances(String jobName) {
@Nullable
@Override
public StepExecution getStepExecution(long executionId) {
return this.stepExecutionDao.getStepExecution(executionId);
StepExecution stepExecution = this.stepExecutionDao.getStepExecution(executionId);
if (stepExecution != null) {
fillStepExecutionDependencies(stepExecution);
ExecutionContext jobExecutionContext = this.ecDao.getExecutionContext(stepExecution.getJobExecution());
stepExecution.getJobExecution().setExecutionContext(jobExecutionContext);
}
return stepExecution;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -41,10 +43,12 @@
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
*
*/
class PartitionStepTests {
Expand All @@ -71,12 +75,24 @@ void setUp() throws Exception {
@Test
void testVanillaStepExecution() throws Exception {
SimpleStepExecutionSplitter stepExecutionSplitter = new SimpleStepExecutionSplitter(jobRepository,
step.getName(), new SimplePartitioner());
step.getName(), gridSize -> {
Map<String, ExecutionContext> map = new HashMap<>(gridSize);
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putString("foo", "foo" + i);
map.put("partition" + i, context);
}
return map;
});
stepExecutionSplitter.setAllowStartIfComplete(true);
step.setStepExecutionSplitter(stepExecutionSplitter);
step.setPartitionHandler((stepSplitter, stepExecution) -> {
Set<StepExecution> executions = stepSplitter.split(stepExecution, 2);
for (StepExecution execution : executions) {
// Query from repository to ensure it's persisted
ExecutionContext context = jobRepository.getStepExecution(execution.getId()).getExecutionContext();
assertNotNull(context.getString("foo"));

execution.setStatus(BatchStatus.COMPLETED);
execution.setExitStatus(ExitStatus.COMPLETED);
jobRepository.update(execution);
Expand Down Expand Up @@ -144,7 +160,9 @@ void testRestartStepExecution() throws Exception {
else {
for (StepExecution execution : executions) {
// On restart the execution context should have been restored
assertEquals(execution.getStepName(), execution.getExecutionContext().getString("foo"));
// Query from repository to ensure it's persisted
ExecutionContext context = jobRepository.getStepExecution(execution.getId()).getExecutionContext();
assertEquals(execution.getStepName(), context.getString("foo"));
}
}
for (StepExecution execution : executions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* @author Robert Kasanicky
* @author Dimitrios Liapis
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
*/
// TODO rename to JdbcJobRepositoryIntegrationTests and update to new domain model
// TODO should add a mongodb similar test suite
Expand Down Expand Up @@ -168,13 +169,19 @@ void testSaveExecutionContext() throws Exception {
JobExecution jobExec = jobRepository.createJobExecution(jobInstance, jobParameters, new ExecutionContext());
jobExec.setStartTime(LocalDateTime.now());
jobExec.setExecutionContext(ctx);
jobRepository.updateExecutionContext(jobExec);
Step step = new StepSupport("step1");
StepExecution stepExec = jobRepository.createStepExecution(step.getName(), jobExec);
stepExec.setExecutionContext(ctx);
jobRepository.updateExecutionContext(stepExec);

StepExecution retrievedStepExec = jobRepository.getLastStepExecution(jobExec.getJobInstance(), step.getName());
assertEquals(stepExec, retrievedStepExec);
assertEquals(ctx, retrievedStepExec.getExecutionContext());

retrievedStepExec = jobRepository.getStepExecution(stepExec.getId());
assertEquals(stepExec, retrievedStepExec);
assertEquals(ctx, retrievedStepExec.getExecutionContext());
}

/*
Expand Down