Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* Batch in a declarative way through {@link EnableBatchProcessing}.
*
* @author Mahmoud Ben Hassine
* @author Myeongha Shin
* @since 5.0
* @see EnableBatchProcessing
*/
Expand Down Expand Up @@ -185,6 +186,11 @@ private void registerMongoJobRepository(BeanDefinitionRegistry registry,
beanDefinitionBuilder.addPropertyValue("isolationLevelForCreate", isolationLevelForCreate);
}

String collectionPrefix = mongoJobRepositoryAnnotation.collectionPrefix();
if (collectionPrefix != null) {
beanDefinitionBuilder.addPropertyValue("collectionPrefix", collectionPrefix);
}

String jobKeyGeneratorRef = mongoJobRepositoryAnnotation.jobKeyGeneratorRef();
if (registry.containsBeanDefinition(jobKeyGeneratorRef)) {
beanDefinitionBuilder.addPropertyReference("jobKeyGenerator", jobKeyGeneratorRef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,10 @@
*/
String stepExecutionIncrementerRef() default "stepExecutionIncrementer";

/**
* Set the prefix for MongoDB collection names. Defaults to {@literal BATCH_}.
* @return the collection prefix to use
*/
String collectionPrefix() default "BATCH_";

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,42 @@
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.util.Assert;

import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;

/**
* @author Mahmoud Ben Hassine
* @author Myeongha Shin
* @since 5.2.0
*/
public class MongoExecutionContextDao implements ExecutionContextDao {

private static final String STEP_EXECUTIONS_COLLECTION_NAME = "BATCH_STEP_EXECUTION";
private static final String STEP_EXECUTIONS_COLLECTION_NAME = "STEP_EXECUTION";

private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION";

private final MongoOperations mongoOperations;

public MongoExecutionContextDao(MongoOperations mongoOperations) {
private final String stepExecutionCollectionName;

private final String jobExecutionCollectionName;

public MongoExecutionContextDao(MongoOperations mongoOperations, String collectionPrefix) {
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
Assert.notNull(collectionPrefix, "collectionPrefix must not be null.");
this.mongoOperations = mongoOperations;
this.stepExecutionCollectionName = collectionPrefix + STEP_EXECUTIONS_COLLECTION_NAME;
this.jobExecutionCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME;
}

@Override
public ExecutionContext getExecutionContext(JobExecution jobExecution) {
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
org.springframework.batch.core.repository.persistence.JobExecution execution = this.mongoOperations.findOne(
query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
jobExecutionCollectionName);
if (execution == null) {
return new ExecutionContext();
}
Expand All @@ -61,7 +71,7 @@ public ExecutionContext getExecutionContext(StepExecution stepExecution) {
Query query = query(where("stepExecutionId").is(stepExecution.getId()));
org.springframework.batch.core.repository.persistence.StepExecution execution = this.mongoOperations.findOne(
query, org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
stepExecutionCollectionName);
if (execution == null) {
return new ExecutionContext();
}
Expand All @@ -77,8 +87,7 @@ public void saveExecutionContext(JobExecution jobExecution) {
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
executionContext.isDirty()));
this.mongoOperations.updateFirst(query, update,
org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionCollectionName);
}

@Override
Expand All @@ -90,8 +99,7 @@ public void saveExecutionContext(StepExecution stepExecution) {
new org.springframework.batch.core.repository.persistence.ExecutionContext(executionContext.toMap(),
executionContext.isDirty()));
this.mongoOperations.updateFirst(query, update,
org.springframework.batch.core.repository.persistence.StepExecution.class,
STEP_EXECUTIONS_COLLECTION_NAME);
org.springframework.batch.core.repository.persistence.StepExecution.class, stepExecutionCollectionName);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,30 @@

/**
* @author Mahmoud Ben Hassine
* @author Myeongha Shin
* @since 5.2.0
*/
public class MongoJobExecutionDao implements JobExecutionDao {

private static final String JOB_EXECUTIONS_COLLECTION_NAME = "BATCH_JOB_EXECUTION";
private static final String JOB_EXECUTIONS_COLLECTION_NAME = "JOB_EXECUTION";

private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "BATCH_JOB_EXECUTION_SEQ";
private static final String JOB_EXECUTIONS_SEQUENCE_NAME = "JOB_EXECUTION_SEQ";

private final MongoOperations mongoOperations;

private final String jobExecutionsCollectionName;

private final JobExecutionConverter jobExecutionConverter = new JobExecutionConverter();

private DataFieldMaxValueIncrementer jobExecutionIncrementer;

private MongoJobInstanceDao jobInstanceDao;

public MongoJobExecutionDao(MongoOperations mongoOperations) {
public MongoJobExecutionDao(MongoOperations mongoOperations, String collectionPrefix) {
this.mongoOperations = mongoOperations;
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations, JOB_EXECUTIONS_SEQUENCE_NAME);
this.jobExecutionsCollectionName = collectionPrefix + JOB_EXECUTIONS_COLLECTION_NAME;
this.jobExecutionIncrementer = new MongoSequenceIncrementer(mongoOperations,
collectionPrefix + JOB_EXECUTIONS_SEQUENCE_NAME);
}

public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
Expand All @@ -69,7 +74,7 @@ public JobExecution createJobExecution(JobInstance jobInstance, JobParameters jo

org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToSave = this.jobExecutionConverter
.fromJobExecution(jobExecution);
this.mongoOperations.insert(jobExecutionToSave, JOB_EXECUTIONS_COLLECTION_NAME);
this.mongoOperations.insert(jobExecutionToSave, jobExecutionsCollectionName);

return jobExecution;
}
Expand All @@ -79,15 +84,15 @@ public void updateJobExecution(JobExecution jobExecution) {
Query query = query(where("jobExecutionId").is(jobExecution.getId()));
org.springframework.batch.core.repository.persistence.JobExecution jobExecutionToUpdate = this.jobExecutionConverter
.fromJobExecution(jobExecution);
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, JOB_EXECUTIONS_COLLECTION_NAME);
this.mongoOperations.findAndReplace(query, jobExecutionToUpdate, jobExecutionsCollectionName);
}

@Override
public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
Query query = query(where("jobInstanceId").is(jobInstance.getId()));
List<org.springframework.batch.core.repository.persistence.JobExecution> jobExecutions = this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
jobExecutionsCollectionName);
return jobExecutions.stream()
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
.toList();
Expand All @@ -99,8 +104,7 @@ public JobExecution getLastJobExecution(JobInstance jobInstance) {
Sort.Order sortOrder = Sort.Order.desc("jobExecutionId");
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
query.with(Sort.by(sortOrder)),
org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
org.springframework.batch.core.repository.persistence.JobExecution.class, jobExecutionsCollectionName);
return jobExecution != null ? this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance) : null;
}

Expand All @@ -113,7 +117,7 @@ public Set<JobExecution> findRunningJobExecutions(String jobName) {
where("jobInstanceId").is(jobInstance.getId()).and("status").in("STARTING", "STARTED", "STOPPING"));
this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME)
jobExecutionsCollectionName)
.stream()
.map(jobExecution -> this.jobExecutionConverter.toJobExecution(jobExecution, jobInstance))
.forEach(runningJobExecutions::add);
Expand All @@ -126,7 +130,7 @@ public JobExecution getJobExecution(long executionId) {
Query jobExecutionQuery = query(where("jobExecutionId").is(executionId));
org.springframework.batch.core.repository.persistence.JobExecution jobExecution = this.mongoOperations.findOne(
jobExecutionQuery, org.springframework.batch.core.repository.persistence.JobExecution.class,
JOB_EXECUTIONS_COLLECTION_NAME);
jobExecutionsCollectionName);
if (jobExecution == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,31 @@

/**
* @author Mahmoud Ben Hassine
* @author Myeongha Shin
* @since 5.2.0
*/
public class MongoJobInstanceDao implements JobInstanceDao {

private static final String COLLECTION_NAME = "BATCH_JOB_INSTANCE";
private static final String COLLECTION_NAME = "JOB_INSTANCE";

private static final String SEQUENCE_NAME = "BATCH_JOB_INSTANCE_SEQ";
private static final String SEQUENCE_NAME = "JOB_INSTANCE_SEQ";

private final MongoOperations mongoOperations;

private final String collectionName;

private DataFieldMaxValueIncrementer jobInstanceIncrementer;

private JobKeyGenerator jobKeyGenerator = new DefaultJobKeyGenerator();

private final JobInstanceConverter jobInstanceConverter = new JobInstanceConverter();

public MongoJobInstanceDao(MongoOperations mongoOperations) {
public MongoJobInstanceDao(MongoOperations mongoOperations, String collectionPrefix) {
Assert.notNull(mongoOperations, "mongoOperations must not be null.");
Assert.notNull(collectionPrefix, "collectionPrefix must not be null.");
this.mongoOperations = mongoOperations;
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, SEQUENCE_NAME);
this.collectionName = collectionPrefix + COLLECTION_NAME;
this.jobInstanceIncrementer = new MongoSequenceIncrementer(mongoOperations, collectionPrefix + SEQUENCE_NAME);
}

public void setJobKeyGenerator(JobKeyGenerator jobKeyGenerator) {
Expand All @@ -79,7 +84,7 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
jobInstanceToSave.setJobKey(key);
long instanceId = jobInstanceIncrementer.nextLongValue();
jobInstanceToSave.setJobInstanceId(instanceId);
this.mongoOperations.insert(jobInstanceToSave, COLLECTION_NAME);
this.mongoOperations.insert(jobInstanceToSave, this.collectionName);

JobInstance jobInstance = new JobInstance(instanceId, jobName);
jobInstance.incrementVersion(); // TODO is this needed?
Expand All @@ -90,16 +95,16 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
String key = this.jobKeyGenerator.generateKey(jobParameters);
Query query = query(where("jobName").is(jobName).and("jobKey").is(key));
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations
.findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME);
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName);
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
}

@Override
public JobInstance getJobInstance(long instanceId) {
Query query = query(where("jobInstanceId").is(instanceId));
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations
.findOne(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME);
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName);
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
}

Expand All @@ -114,7 +119,7 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
Sort.Order sortOrder = Sort.Order.desc("jobInstanceId");
List<org.springframework.batch.core.repository.persistence.JobInstance> jobInstances = this.mongoOperations
.find(query.with(Sort.by(sortOrder)),
org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
.stream()
.toList();
return jobInstances.subList(start, jobInstances.size())
Expand All @@ -134,7 +139,7 @@ public List<JobInstance> getJobInstances(String jobName, int start, int count) {
public List<JobInstance> getJobInstances(String jobName) {
Query query = query(where("jobName").is(jobName));
return this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
.stream()
.map(this.jobInstanceConverter::toJobInstance)
.toList();
Expand All @@ -144,7 +149,7 @@ public List<JobInstance> getJobInstances(String jobName) {
public List<Long> getJobInstanceIds(String jobName) {
Query query = query(where("jobName").is(jobName));
return this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
.stream()
.map(org.springframework.batch.core.repository.persistence.JobInstance::getJobInstanceId)
.toList();
Expand All @@ -153,7 +158,7 @@ public List<Long> getJobInstanceIds(String jobName) {
public List<JobInstance> findJobInstancesByName(String jobName) {
Query query = query(where("jobName").is(jobName));
return this.mongoOperations
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
.find(query, org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
.stream()
.map(this.jobInstanceConverter::toJobInstance)
.toList();
Expand All @@ -165,14 +170,14 @@ public JobInstance getLastJobInstance(String jobName) {
Sort.Order sortOrder = Sort.Order.desc("jobInstanceId");
org.springframework.batch.core.repository.persistence.JobInstance jobInstance = this.mongoOperations.findOne(
query.with(Sort.by(sortOrder)), org.springframework.batch.core.repository.persistence.JobInstance.class,
COLLECTION_NAME);
this.collectionName);
return jobInstance != null ? this.jobInstanceConverter.toJobInstance(jobInstance) : null;
}

@Override
public List<String> getJobNames() {
return this.mongoOperations
.findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, COLLECTION_NAME)
.findAll(org.springframework.batch.core.repository.persistence.JobInstance.class, this.collectionName)
.stream()
.map(org.springframework.batch.core.repository.persistence.JobInstance::getJobName)
.toList();
Expand All @@ -195,7 +200,7 @@ public long getJobInstanceCount(String jobName) throws NoSuchJobException {
throw new NoSuchJobException("Job not found " + jobName);
}
Query query = query(where("jobName").is(jobName));
return this.mongoOperations.count(query, COLLECTION_NAME);
return this.mongoOperations.count(query, this.collectionName);
}

}
Loading