From 16d9f17c46764d624da6359d5739309cfa9e4829 Mon Sep 17 00:00:00 2001 From: Mahmoud Ben Hassine Date: Mon, 2 Sep 2019 14:02:38 +0200 Subject: [PATCH] Improve the performance of step partitioning This commit improves the performance of splitting a step execution. It moves the logic of finding the last step execution of a job instance to the database (instead of doing it in memory). Resolves BATCH-2716 --- .../repository/dao/JdbcStepExecutionDao.java | 35 ++++++++++++++++++ .../repository/dao/MapStepExecutionDao.java | 26 +++++++++++++- .../core/repository/dao/StepExecutionDao.java | 16 ++++++++- .../support/SimpleJobRepository.java | 30 ++-------------- .../dao/AbstractStepExecutionDaoTests.java | 36 ++++++++++++++++++- 5 files changed, 112 insertions(+), 31 deletions(-) diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java index 22fbba91fe..71cc31257c 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java @@ -33,6 +33,7 @@ import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.StepExecution; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.OptimisticLockingFailureException; @@ -83,6 +84,16 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID = ?"; + private static final String GET_LAST_STEP_EXECUTION = "SELECT " + + " SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION," + + " JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION" + + " from %PREFIX%JOB_EXECUTION JE, %PREFIX%STEP_EXECUTION SE" + + " where " + + " SE.JOB_EXECUTION_ID in (SELECT JOB_EXECUTION_ID from %PREFIX%JOB_EXECUTION where JE.JOB_INSTANCE_ID = ?)" + + " and SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID " + + " and SE.STEP_NAME = ?" + + " order by SE.START_TIME desc, SE.STEP_EXECUTION_ID desc"; + private static final String CURRENT_VERSION_STEP_EXECUTION = "SELECT VERSION FROM %PREFIX%STEP_EXECUTION WHERE STEP_EXECUTION_ID=?"; private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH; @@ -297,6 +308,30 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut } } + @Override + public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { + List executions = getJdbcTemplate().query( + getQuery(GET_LAST_STEP_EXECUTION), + (rs, rowNum) -> { + Long jobExecutionId = rs.getLong(18); + JobExecution jobExecution = new JobExecution(jobExecutionId); + jobExecution.setStartTime(rs.getTimestamp(19)); + jobExecution.setEndTime(rs.getTimestamp(20)); + jobExecution.setStatus(BatchStatus.valueOf(rs.getString(21))); + jobExecution.setExitStatus(new ExitStatus(rs.getString(22), rs.getString(23))); + jobExecution.setCreateTime(rs.getTimestamp(24)); + jobExecution.setLastUpdated(rs.getTimestamp(25)); + jobExecution.setVersion(rs.getInt(26)); + return new StepExecutionRowMapper(jobExecution).mapRow(rs, rowNum); + }, + jobInstance.getInstanceId(), stepName); + if (executions.isEmpty()) { + return null; + } else { + return executions.get(0); + } + } + @Override public void addStepExecutions(JobExecution jobExecution) { getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution), diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapStepExecutionDao.java index ed3bc18c19..5b3154a5fb 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/MapStepExecutionDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import org.springframework.batch.core.Entity; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.StepExecution; import org.springframework.dao.OptimisticLockingFailureException; import org.springframework.lang.Nullable; @@ -119,6 +120,29 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut return executionsByStepExecutionId.get(stepExecutionId); } + @Override + public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { + StepExecution latest = null; + for (StepExecution stepExecution : executionsByStepExecutionId.values()) { + if (!stepExecution.getStepName().equals(stepName) + || stepExecution.getJobExecution().getJobInstance().getInstanceId() != jobInstance.getInstanceId()) { + continue; + } + if (latest == null) { + latest = stepExecution; + } + if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) { + latest = stepExecution; + } + // Use step execution ID as the tie breaker if start time is identical + if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() && + latest.getId() < stepExecution.getId()) { + latest = stepExecution; + } + } + return latest; + } + @Override public void addStepExecutions(JobExecution jobExecution) { Map executions = executionsByJobExecutionId.get(jobExecution.getId()); diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java index e109e000d2..cedcb82372 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.util.Collection; import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobInstance; import org.springframework.batch.core.StepExecution; import org.springframework.lang.Nullable; @@ -65,6 +66,19 @@ public interface StepExecutionDao { @Nullable StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId); + /** + * Retrieve the last {@link StepExecution} for a given {@link JobInstance} + * ordered by starting time and then id. + * + * @param jobInstance the parent {@link JobInstance} + * @param stepName the name of the step + * @return a {@link StepExecution} + */ + @Nullable + default StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { + throw new UnsupportedOperationException(); + } + /** * Retrieve all the {@link StepExecution} for the parent {@link JobExecution}. * diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java index 05ef5127f9..be7effa317 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/SimpleJobRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2018 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,7 +35,6 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; -import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.List; @@ -219,32 +218,7 @@ public void updateExecutionContext(JobExecution jobExecution) { @Override @Nullable public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { - List jobExecutions = jobExecutionDao.findJobExecutions(jobInstance); - List stepExecutions = new ArrayList<>(jobExecutions.size()); - - for (JobExecution jobExecution : jobExecutions) { - stepExecutionDao.addStepExecutions(jobExecution); - for (StepExecution stepExecution : jobExecution.getStepExecutions()) { - if (stepName.equals(stepExecution.getStepName())) { - stepExecutions.add(stepExecution); - } - } - } - - StepExecution latest = null; - for (StepExecution stepExecution : stepExecutions) { - if (latest == null) { - latest = stepExecution; - } - if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) { - latest = stepExecution; - } - // Use step execution ID as the tie breaker if start time is identical - if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() && - latest.getId() < stepExecution.getId()) { - latest = stepExecution; - } - } + StepExecution latest = stepExecutionDao.getLastStepExecution(jobInstance, stepName); if (latest != null) { ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest); diff --git a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractStepExecutionDaoTests.java b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractStepExecutionDaoTests.java index 28dbb75a4b..fbcaba3d80 100644 --- a/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractStepExecutionDaoTests.java +++ b/spring-batch-core/src/test/java/org/springframework/batch/core/repository/dao/AbstractStepExecutionDaoTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2007 the original author or authors. + * Copyright 2006-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; +import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.List; @@ -152,6 +154,38 @@ public void testSaveAndGetExecutions() { } } + @Transactional + @Test + public void testSaveAndGetLastExecution() { + Instant now = Instant.now(); + StepExecution stepExecution1 = new StepExecution("step1", jobExecution); + stepExecution1.setStartTime(Date.from(now)); + StepExecution stepExecution2 = new StepExecution("step1", jobExecution); + stepExecution2.setStartTime(Date.from(now.plusMillis(500))); + + dao.saveStepExecutions(Arrays.asList(stepExecution1, stepExecution2)); + + StepExecution lastStepExecution = dao.getLastStepExecution(jobInstance, "step1"); + assertNotNull(lastStepExecution); + assertEquals(stepExecution2.getId(), lastStepExecution.getId()); + } + + @Transactional + @Test + public void testSaveAndGetLastExecutionWhenSameStartTime() { + Instant now = Instant.now(); + StepExecution stepExecution1 = new StepExecution("step1", jobExecution); + stepExecution1.setStartTime(Date.from(now)); + StepExecution stepExecution2 = new StepExecution("step1", jobExecution); + stepExecution2.setStartTime(Date.from(now)); + + dao.saveStepExecutions(Arrays.asList(stepExecution1, stepExecution2)); + StepExecution lastStepExecution = stepExecution1.getId() > stepExecution2.getId() ? stepExecution1 : stepExecution2; + StepExecution retrieved = dao.getLastStepExecution(jobInstance, "step1"); + assertNotNull(retrieved); + assertEquals(lastStepExecution.getId(), retrieved.getId()); + } + @Transactional @Test(expected = IllegalArgumentException.class) public void testSaveNullCollectionThrowsException() {