-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Bug description
Running a job through Scheduler, that checks the status of a field in the database, Postgres, runs the first time, but all jobs afterwards, find no changes in the database, even if new records are added to the monitored table.
Environment
Spring Batch 6.0.0-RC1
Spring Framework 7.0.0-RC2
Java 25
Postgresql 16
Steps to reproduce
Create a new spring project, a job that reads from a table with status = U, process, and update the table to status P. Run a scheduler. The first time the scheduler runs, picks up the unprocessed records, and updates them successfully. The next run, the reader doesn't see any changes, if the status for existing records has changed to U, or if new records are added to the table.
Expected behavior
The job should pick up any changes once committed.
Minimal Complete Reproducible example
@Scheduled(cron = "${batch.schedule.cron:0 * * * * *}") // Every minute at second 0
public void runBatchJob() {
String triggerType = "cron";
if (!schedulingEnabled) {
logger.debug("Batch scheduling is disabled");
return;
}
if (jobRunning) {
logger.warn("Previous batch job is still running. Skipping this execution.");
return;
}
try {
jobRunning = true;
LocalDateTime startTime = LocalDateTime.now();
logger.info("Starting scheduled batch job execution at: {}", java.time.LocalDateTime.now());
logger.info("Starting scheduled batch job execution at: {} (trigger: {})",
LocalDateTime.now(), triggerType);
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.addString("runId", UUID.randomUUID().toString())
.addString("scheduledTime", startTime.toString())
.toJobParameters();
JobExecution execution = jobOperator.start(processRecordsJob, params);
logger.info("Completed scheduled batch job execution at: {}", java.time.LocalDateTime.now());
} catch (Exception e) {
logger.error("Error executing scheduled batch job: {}", e.getMessage(), e);
} finally {
jobRunning = false;
}
}
public class BatchConfig {
@Autowired
@Qualifier("batchDataSource") // Inject app datasource
private DataSource batchDataSource;
@Autowired
@Qualifier("appDataSource") // Inject app datasource
private DataSource appDataSource;
@Bean
public DataSource dataSource() {
return appDataSource;
}
private static final String READ_SQL =
"SELECT id, name, created_date, updated_date, status, process_count, last_processed_date " +
"FROM process_table " +
"WHERE status IN ('U', 'R') " +
"ORDER BY id";
private static final String WRITE_SQL =
"UPDATE process_table " +
"SET status = :status, " +
" updated_date = :updatedDate, " +
" process_count = :processCount, " +
" last_processed_date = :lastProcessedDate " +
"WHERE id = :id";
@Bean
@StepScope
public JdbcCursorItemReader<ProcessRecord> reader( @Qualifier("appDataSource") DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<ProcessRecord>()
.name("processRecordReader")
.dataSource(dataSource)
.sql(READ_SQL)
.rowMapper(new BeanPropertyRowMapper<>(ProcessRecord.class))
.fetchSize(100)
.build();
}
@Bean
@StepScope
public JdbcBatchItemWriter<ProcessRecord> writer( @Qualifier("appDataSource") DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<ProcessRecord>()
.dataSource(dataSource)
.sql(WRITE_SQL)
.beanMapped()
.assertUpdates(false) // Allow no updates if record was changed by another process
.build();
}
@Bean
public Job processRecordsJob(JobRepository jobRepository,
Step processStep,
JobExecutionListener jobCompletionListener) {
return new JobBuilder("processRecordsJob", jobRepository)
.start(processStep)
.listener(jobCompletionListener)
.preventRestart()
.build();
}
@Bean
public Step processStep(JobRepository jobRepository,
@Qualifier("appTransactionManager") PlatformTransactionManager transactionManager,
JdbcCursorItemReader<ProcessRecord> reader,
ProcessRecordProcessor processor,
JdbcBatchItemWriter<ProcessRecord> writer,
ProcessorStateResetListener processorStateResetListener,
MyItemProcessListener myItemProcessListener) {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
attribute.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
return new StepBuilder("processStep", jobRepository)
.<ProcessRecord, ProcessRecord>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.transactionAttribute(attribute)
.listener(myItemProcessListener)
// .listener(processorStateResetListener)
.allowStartIfComplete(true)
.build();
}
@Bean
@StepScope
public CachingItemProcessor processor() {
return new CachingItemProcessor();
}
}
Output for the first run:
=== Starting Scheduled Batch Job ===
Job ID: 1445
Job Parameters: {JobParameter{name='runId', value=76789662-a579-4893-91e1-ae908cc13f23, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908820009, type=class java.lang.Long, identifying=true}}
Start Time: 2025-10-31T13:07:00.086289
2025-10-31T13:07:00.102+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [processStep]
***MyItemProcessListener*** Processing item: ProcessRecord{id=4, name='Record 4', createdDate=2025-10-30T15:56:42.185616, updatedDate=2025-10-31T11:51:00.220401, status='U', processCount=7, lastProcessedDate=2025-10-31T11:51:00.220417}
2025-10-31T13:07:00.119+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.ProcessRecordProcessor : Processing record: ID=4, Name=Record 4, Current Status=U, Previous Process Count=7
2025-10-31T13:07:00.174+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.ProcessRecordProcessor : Successfully processed record: ID=4, New Status=V, Process Count=8
***MyItemProcessListener*** Finished processing item. Result is null? false
***MyItemProcessListener*** Processing item: ProcessRecord{id=7, name='Record 7', createdDate=2025-10-30T15:56:42.185616, updatedDate=null, status='U', processCount=2, lastProcessedDate=null}
2025-10-31T13:07:00.174+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.ProcessRecordProcessor : Processing record: ID=7, Name=Record 7, Current Status=U, Previous Process Count=2
2025-10-31T13:07:00.229+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.ProcessRecordProcessor : Successfully processed record: ID=7, New Status=V, Process Count=3
***MyItemProcessListener*** Finished processing item. Result is null? false
2025-10-31T13:07:00.242+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [processStep] executed in 138ms
=== Scheduled Batch Job Completed ===
Status: COMPLETED
Exit Status: exitCode=COMPLETED;exitDescription=
Read Count: 2
Write Count: 2
Skip Count: 0
End Time: 2025-10-31T13:07:00.246310
Duration: 160ms
2025-10-31T13:07:00.249+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=processRecordsJob]] completed with the following parameters: [{JobParameter{name='runId', value=76789662-a579-4893-91e1-ae908cc13f23, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908820009, type=class java.lang.Long, identifying=true}}] and the following status: [COMPLETED] in 160ms
2025-10-31T13:07:00.251+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Completed scheduled batch job execution at: 2025-10-31T13:07:00.251505
Second and third run, after two records have been changed in the database.
2025-10-31T13:08:00.003+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Starting scheduled batch job execution at: 2025-10-31T13:08:00.003126
2025-10-31T13:08:00.003+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Starting scheduled batch job execution at: 2025-10-31T13:08:00.003320 (trigger: cron)
2025-10-31T13:08:00.010+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=processRecordsJob]] launched with the following parameters: [{JobParameter{name='runId', value=ac8554ad-8b73-4393-bd0d-97139219d70f, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908880003, type=class java.lang.Long, identifying=true}}]
=== Starting Scheduled Batch Job ===
Job ID: 1446
Job Parameters: {JobParameter{name='runId', value=ac8554ad-8b73-4393-bd0d-97139219d70f, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908880003, type=class java.lang.Long, identifying=true}}
Start Time: 2025-10-31T13:08:00.010333
2025-10-31T13:08:00.015+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [processStep]
2025-10-31T13:08:00.017+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [processStep] executed in 1ms
=== Scheduled Batch Job Completed ===
Status: COMPLETED
Exit Status: exitCode=COMPLETED;exitDescription=
Read Count: 0
Write Count: 0
Skip Count: 0
End Time: 2025-10-31T13:08:00.020466
Duration: 10ms
2025-10-31T13:08:00.022+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=processRecordsJob]] completed with the following parameters: [{JobParameter{name='runId', value=ac8554ad-8b73-4393-bd0d-97139219d70f, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908880003, type=class java.lang.Long, identifying=true}}] and the following status: [COMPLETED] in 10ms
2025-10-31T13:08:00.023+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Completed scheduled batch job execution at: 2025-10-31T13:08:00.023521
2025-10-31T13:09:00.003+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Starting scheduled batch job execution at: 2025-10-31T13:09:00.003535
2025-10-31T13:09:00.003+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Starting scheduled batch job execution at: 2025-10-31T13:09:00.003707 (trigger: cron)
2025-10-31T13:09:00.010+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=processRecordsJob]] launched with the following parameters: [{JobParameter{name='runId', value=27d1ab5e-4c55-4561-8d1f-e9b8fa622da4, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908940003, type=class java.lang.Long, identifying=true}}]
=== Starting Scheduled Batch Job ===
Job ID: 1447
Job Parameters: {JobParameter{name='runId', value=27d1ab5e-4c55-4561-8d1f-e9b8fa622da4, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908940003, type=class java.lang.Long, identifying=true}}
Start Time: 2025-10-31T13:09:00.010389
2025-10-31T13:09:00.015+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [processStep]
2025-10-31T13:09:00.017+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.batch.core.step.AbstractStep : Step: [processStep] executed in 1ms
=== Scheduled Batch Job Completed ===
Status: COMPLETED
Exit Status: exitCode=COMPLETED;exitDescription=
Read Count: 0
Write Count: 0
Skip Count: 0
End Time: 2025-10-31T13:09:00.021035
Duration: 10ms
2025-10-31T13:09:00.022+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] o.s.b.c.l.s.TaskExecutorJobLauncher : Job: [SimpleJob: [name=processRecordsJob]] completed with the following parameters: [{JobParameter{name='runId', value=27d1ab5e-4c55-4561-8d1f-e9b8fa622da4, type=class java.lang.String, identifying=true},JobParameter{name='timestamp', value=1761908940003, type=class java.lang.Long, identifying=true}}] and the following status: [COMPLETED] in 10ms
2025-10-31T13:09:00.023+02:00 INFO 339 --- [demo_ts01] [ scheduling-1] a.c.a.i.d.config.BatchJobScheduler : Completed scheduled batch job execution at: 2025-10-31T13:09:00.023265