Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.OptimisticLockingFailureException;
Expand Down Expand Up @@ -83,6 +84,16 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement

private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID = ?";

private static final String GET_LAST_STEP_EXECUTION = "SELECT " +
" SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION," +
" JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION" +
" from %PREFIX%JOB_EXECUTION JE, %PREFIX%STEP_EXECUTION SE" +
" where " +
" SE.JOB_EXECUTION_ID in (SELECT JOB_EXECUTION_ID from %PREFIX%JOB_EXECUTION where JE.JOB_INSTANCE_ID = ?)" +
" and SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID " +
" and SE.STEP_NAME = ?" +
" order by SE.START_TIME desc, SE.STEP_EXECUTION_ID desc";

private static final String CURRENT_VERSION_STEP_EXECUTION = "SELECT VERSION FROM %PREFIX%STEP_EXECUTION WHERE STEP_EXECUTION_ID=?";

private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
Expand Down Expand Up @@ -297,6 +308,30 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
}
}

@Override
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
List<StepExecution> executions = getJdbcTemplate().query(
getQuery(GET_LAST_STEP_EXECUTION),
(rs, rowNum) -> {
Long jobExecutionId = rs.getLong(18);
JobExecution jobExecution = new JobExecution(jobExecutionId);
jobExecution.setStartTime(rs.getTimestamp(19));
jobExecution.setEndTime(rs.getTimestamp(20));
jobExecution.setStatus(BatchStatus.valueOf(rs.getString(21)));
jobExecution.setExitStatus(new ExitStatus(rs.getString(22), rs.getString(23)));
jobExecution.setCreateTime(rs.getTimestamp(24));
jobExecution.setLastUpdated(rs.getTimestamp(25));
jobExecution.setVersion(rs.getInt(26));
return new StepExecutionRowMapper(jobExecution).mapRow(rs, rowNum);
},
jobInstance.getInstanceId(), stepName);
if (executions.isEmpty()) {
return null;
} else {
return executions.get(0);
}
}

@Override
public void addStepExecutions(JobExecution jobExecution) {
getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@

import org.springframework.batch.core.Entity;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -119,6 +120,29 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
return executionsByStepExecutionId.get(stepExecutionId);
}

@Override
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
StepExecution latest = null;
for (StepExecution stepExecution : executionsByStepExecutionId.values()) {
if (!stepExecution.getStepName().equals(stepName)
|| stepExecution.getJobExecution().getJobInstance().getInstanceId() != jobInstance.getInstanceId()) {
continue;
}
if (latest == null) {
latest = stepExecution;
}
if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
latest = stepExecution;
}
// Use step execution ID as the tie breaker if start time is identical
if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() &&
latest.getId() < stepExecution.getId()) {
latest = stepExecution;
}
}
return latest;
}

@Override
public void addStepExecutions(JobExecution jobExecution) {
Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Collection;

import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.StepExecution;
import org.springframework.lang.Nullable;

Expand Down Expand Up @@ -65,6 +66,19 @@ public interface StepExecutionDao {
@Nullable
StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId);

/**
* Retrieve the last {@link StepExecution} for a given {@link JobInstance}
* ordered by starting time and then id.
*
* @param jobInstance the parent {@link JobInstance}
* @param stepName the name of the step
* @return a {@link StepExecution}
*/
@Nullable
default StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
throw new UnsupportedOperationException();
}

/**
* Retrieve all the {@link StepExecution} for the parent {@link JobExecution}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2018 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,6 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -219,32 +218,7 @@ public void updateExecutionContext(JobExecution jobExecution) {
@Override
@Nullable
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
List<StepExecution> stepExecutions = new ArrayList<>(jobExecutions.size());

for (JobExecution jobExecution : jobExecutions) {
stepExecutionDao.addStepExecutions(jobExecution);
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepName.equals(stepExecution.getStepName())) {
stepExecutions.add(stepExecution);
}
}
}

StepExecution latest = null;
for (StepExecution stepExecution : stepExecutions) {
if (latest == null) {
latest = stepExecution;
}
if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
latest = stepExecution;
}
// Use step execution ID as the tie breaker if start time is identical
if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() &&
latest.getId() < stepExecution.getId()) {
latest = stepExecution;
}
}
StepExecution latest = stepExecutionDao.getLastStepExecution(jobInstance, stepName);

if (latest != null) {
ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2007 the original author or authors.
* Copyright 2006-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,9 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -152,6 +154,38 @@ public void testSaveAndGetExecutions() {
}
}

@Transactional
@Test
public void testSaveAndGetLastExecution() {
Instant now = Instant.now();
StepExecution stepExecution1 = new StepExecution("step1", jobExecution);
stepExecution1.setStartTime(Date.from(now));
StepExecution stepExecution2 = new StepExecution("step1", jobExecution);
stepExecution2.setStartTime(Date.from(now.plusMillis(500)));

dao.saveStepExecutions(Arrays.asList(stepExecution1, stepExecution2));

StepExecution lastStepExecution = dao.getLastStepExecution(jobInstance, "step1");
assertNotNull(lastStepExecution);
assertEquals(stepExecution2.getId(), lastStepExecution.getId());
}

@Transactional
@Test
public void testSaveAndGetLastExecutionWhenSameStartTime() {
Instant now = Instant.now();
StepExecution stepExecution1 = new StepExecution("step1", jobExecution);
stepExecution1.setStartTime(Date.from(now));
StepExecution stepExecution2 = new StepExecution("step1", jobExecution);
stepExecution2.setStartTime(Date.from(now));

dao.saveStepExecutions(Arrays.asList(stepExecution1, stepExecution2));
StepExecution lastStepExecution = stepExecution1.getId() > stepExecution2.getId() ? stepExecution1 : stepExecution2;
StepExecution retrieved = dao.getLastStepExecution(jobInstance, "step1");
assertNotNull(retrieved);
assertEquals(lastStepExecution.getId(), retrieved.getId());
}

@Transactional
@Test(expected = IllegalArgumentException.class)
public void testSaveNullCollectionThrowsException() {
Expand Down