From d983f71da9cf8fa014d5cb2657174a84e966c17c Mon Sep 17 00:00:00 2001 From: Yanming Zhou Date: Mon, 8 Dec 2025 17:32:48 +0800 Subject: [PATCH] Fix step execution context is not persisted and restored 1. Step execution context is not persisted in `SimpleStepExecutionSplitter::split` 2. Step execution context is not restored in `SimpleJobRepository::getStepExecution` Closes GH-5138 Signed-off-by: Yanming Zhou --- .../support/SimpleStepExecutionSplitter.java | 3 ++ .../explore/support/SimpleJobExplorer.java | 34 +++++++++---------- .../support/SimpleJobRepository.java | 9 ++++- .../core/partition/PartitionStepTests.java | 22 ++++++++++-- .../SimpleJobRepositoryIntegrationTests.java | 7 ++++ 5 files changed, 54 insertions(+), 21 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java index d46840ede0..4b9a079d47 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/support/SimpleStepExecutionSplitter.java @@ -45,6 +45,7 @@ * * @author Dave Syer * @author Mahmoud Ben Hassine + * @author Yanming Zhou * @since 2.0 */ public class SimpleStepExecutionSplitter implements StepExecutionSplitter { @@ -138,6 +139,7 @@ public Set split(StepExecution stepExecution, int gridSize) throw if (lastStepExecution == null) { // fresh start StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution); currentStepExecution.setExecutionContext(context.getValue()); + jobRepository.updateExecutionContext(currentStepExecution); set.add(currentStepExecution); } else { // restart @@ -145,6 +147,7 @@ public Set split(StepExecution stepExecution, int gridSize) throw && shouldStart(allowStartIfComplete, stepExecution, lastStepExecution)) { StepExecution currentStepExecution = jobRepository.createStepExecution(stepName, jobExecution); currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); + jobRepository.updateExecutionContext(currentStepExecution); set.add(currentStepExecution); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/SimpleJobExplorer.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/SimpleJobExplorer.java index 22f7f7ec9d..cf0de06da5 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/SimpleJobExplorer.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/explore/support/SimpleJobExplorer.java @@ -44,6 +44,7 @@ * @author Mahmoud Ben Hassine * @author Parikshit Dutta * @author Glenn Renfro + * @author Yanming Zhou * @see JobExplorer * @see JobInstanceDao * @see JobExecutionDao @@ -157,9 +158,9 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException { public List getJobExecutions(JobInstance jobInstance) { List executions = jobExecutionDao.findJobExecutions(jobInstance); for (JobExecution jobExecution : executions) { - getJobExecutionDependencies(jobExecution); + fillJobExecutionDependencies(jobExecution); for (StepExecution stepExecution : jobExecution.getStepExecutions()) { - getStepExecutionDependencies(stepExecution); + fillStepExecutionDependencies(stepExecution); } } return executions; @@ -170,9 +171,9 @@ public List getJobExecutions(JobInstance jobInstance) { public JobExecution getLastJobExecution(JobInstance jobInstance) { JobExecution lastJobExecution = jobExecutionDao.getLastJobExecution(jobInstance); if (lastJobExecution != null) { - getJobExecutionDependencies(lastJobExecution); + fillJobExecutionDependencies(lastJobExecution); for (StepExecution stepExecution : lastJobExecution.getStepExecutions()) { - getStepExecutionDependencies(stepExecution); + fillStepExecutionDependencies(stepExecution); } } return lastJobExecution; @@ -198,7 +199,7 @@ public List findJobExecutions(JobInstance jobInstance) { JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance); if (jobExecution != null) { - getJobExecutionDependencies(jobExecution); + fillJobExecutionDependencies(jobExecution); } return jobExecution; } @@ -207,9 +208,9 @@ public List findJobExecutions(JobInstance jobInstance) { public Set findRunningJobExecutions(@Nullable String jobName) { Set executions = jobExecutionDao.findRunningJobExecutions(jobName); for (JobExecution jobExecution : executions) { - getJobExecutionDependencies(jobExecution); + fillJobExecutionDependencies(jobExecution); for (StepExecution stepExecution : jobExecution.getStepExecutions()) { - getStepExecutionDependencies(stepExecution); + fillStepExecutionDependencies(stepExecution); } } return executions; @@ -222,20 +223,18 @@ public JobExecution getJobExecution(long executionId) { if (jobExecution == null) { return null; } - getJobExecutionDependencies(jobExecution); + fillJobExecutionDependencies(jobExecution); for (StepExecution stepExecution : jobExecution.getStepExecutions()) { - getStepExecutionDependencies(stepExecution); + fillStepExecutionDependencies(stepExecution); } return jobExecution; } /* - * Find all dependencies for a JobExecution, including JobInstance (which requires + * Fill all dependencies for a JobExecution, including JobInstance (which requires * JobParameters) plus StepExecutions */ - // TODO rename to something more representative of what it does (side effect on the - // parameter) - private void getJobExecutionDependencies(JobExecution jobExecution) { + protected void fillJobExecutionDependencies(JobExecution jobExecution) { JobInstance jobInstance = jobInstanceDao.getJobInstance(jobExecution); jobExecution.setJobInstance(jobInstance); jobExecution.addStepExecutions(stepExecutionDao.getStepExecutions(jobExecution)); @@ -257,9 +256,9 @@ public StepExecution getStepExecution(long jobExecutionId, long executionId) { if (jobExecution == null) { return null; } - getJobExecutionDependencies(jobExecution); + fillJobExecutionDependencies(jobExecution); StepExecution stepExecution = stepExecutionDao.getStepExecution(jobExecution, executionId); - getStepExecutionDependencies(stepExecution); + fillStepExecutionDependencies(stepExecution); return stepExecution; } @@ -268,8 +267,7 @@ public StepExecution getStepExecution(long jobExecutionId, long executionId) { StepExecution latest = stepExecutionDao.getLastStepExecution(jobInstance, stepName); if (latest != null) { - ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest); - latest.setExecutionContext(stepExecutionContext); + fillStepExecutionDependencies(latest); ExecutionContext jobExecutionContext = ecDao.getExecutionContext(latest.getJobExecution()); latest.getJobExecution().setExecutionContext(jobExecutionContext); } @@ -287,7 +285,7 @@ public long getStepExecutionCount(JobInstance jobInstance, String stepName) thro return stepExecutionDao.countStepExecutions(jobInstance, stepName); } - private void getStepExecutionDependencies(StepExecution stepExecution) { + protected void fillStepExecutionDependencies(StepExecution stepExecution) { if (stepExecution != null) { stepExecution.setExecutionContext(ecDao.getExecutionContext(stepExecution)); } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java index 21f3c84d13..1c043e79f5 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java @@ -52,6 +52,7 @@ * @author Baris Cubukcuoglu * @author Parikshit Dutta * @author Mark John Moreno + * @author Yanming Zhou * @see JobRepository * @see JobInstanceDao * @see JobExecutionDao @@ -82,7 +83,13 @@ public List findJobInstances(String jobName) { @Nullable @Override public StepExecution getStepExecution(long executionId) { - return this.stepExecutionDao.getStepExecution(executionId); + StepExecution stepExecution = this.stepExecutionDao.getStepExecution(executionId); + if (stepExecution != null) { + fillStepExecutionDependencies(stepExecution); + ExecutionContext jobExecutionContext = this.ecDao.getExecutionContext(stepExecution.getJobExecution()); + stepExecution.getJobExecution().setExecutionContext(jobExecutionContext); + } + return stepExecution; } /** diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java index e54ea4e136..4643321594 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/partition/PartitionStepTests.java @@ -18,6 +18,8 @@ import java.time.LocalDateTime; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,10 +43,12 @@ import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; /** * @author Dave Syer * @author Mahmoud Ben Hassine + * @author Yanming Zhou * */ class PartitionStepTests { @@ -71,12 +75,24 @@ void setUp() throws Exception { @Test void testVanillaStepExecution() throws Exception { SimpleStepExecutionSplitter stepExecutionSplitter = new SimpleStepExecutionSplitter(jobRepository, - step.getName(), new SimplePartitioner()); + step.getName(), gridSize -> { + Map map = new HashMap<>(gridSize); + for (int i = 0; i < gridSize; i++) { + ExecutionContext context = new ExecutionContext(); + context.putString("foo", "foo" + i); + map.put("partition" + i, context); + } + return map; + }); stepExecutionSplitter.setAllowStartIfComplete(true); step.setStepExecutionSplitter(stepExecutionSplitter); step.setPartitionHandler((stepSplitter, stepExecution) -> { Set executions = stepSplitter.split(stepExecution, 2); for (StepExecution execution : executions) { + // Query from repository to ensure it's persisted + ExecutionContext context = jobRepository.getStepExecution(execution.getId()).getExecutionContext(); + assertNotNull(context.getString("foo")); + execution.setStatus(BatchStatus.COMPLETED); execution.setExitStatus(ExitStatus.COMPLETED); jobRepository.update(execution); @@ -144,7 +160,9 @@ void testRestartStepExecution() throws Exception { else { for (StepExecution execution : executions) { // On restart the execution context should have been restored - assertEquals(execution.getStepName(), execution.getExecutionContext().getString("foo")); + // Query from repository to ensure it's persisted + ExecutionContext context = jobRepository.getStepExecution(execution.getId()).getExecutionContext(); + assertEquals(execution.getStepName(), context.getString("foo")); } } for (StepExecution execution : executions) { diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/SimpleJobRepositoryIntegrationTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/SimpleJobRepositoryIntegrationTests.java index 7488945bce..7320363334 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/SimpleJobRepositoryIntegrationTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/support/SimpleJobRepositoryIntegrationTests.java @@ -48,6 +48,7 @@ * @author Robert Kasanicky * @author Dimitrios Liapis * @author Mahmoud Ben Hassine + * @author Yanming Zhou */ // TODO rename to JdbcJobRepositoryIntegrationTests and update to new domain model // TODO should add a mongodb similar test suite @@ -168,13 +169,19 @@ void testSaveExecutionContext() throws Exception { JobExecution jobExec = jobRepository.createJobExecution(jobInstance, jobParameters, new ExecutionContext()); jobExec.setStartTime(LocalDateTime.now()); jobExec.setExecutionContext(ctx); + jobRepository.updateExecutionContext(jobExec); Step step = new StepSupport("step1"); StepExecution stepExec = jobRepository.createStepExecution(step.getName(), jobExec); stepExec.setExecutionContext(ctx); + jobRepository.updateExecutionContext(stepExec); StepExecution retrievedStepExec = jobRepository.getLastStepExecution(jobExec.getJobInstance(), step.getName()); assertEquals(stepExec, retrievedStepExec); assertEquals(ctx, retrievedStepExec.getExecutionContext()); + + retrievedStepExec = jobRepository.getStepExecution(stepExec.getId()); + assertEquals(stepExec, retrievedStepExec); + assertEquals(ctx, retrievedStepExec.getExecutionContext()); } /*