Skip to content

Commit 52875e7

Browse files
committed
Improve listener registration in chunk-oriented steps
This commit updates the chunk-oriented step builder to automatically register batch artifacts as step listeners if they implement the StepExecutionListener interface.
1 parent 97dd465 commit 52875e7

File tree

1 file changed

+21
-8
lines changed

1 file changed

+21
-8
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.batch.core.listener.ItemReadListener;
4242
import org.springframework.batch.core.listener.ItemWriteListener;
4343
import org.springframework.batch.core.listener.SkipListener;
44+
import org.springframework.batch.core.listener.StepExecutionListener;
4445
import org.springframework.batch.core.listener.StepListener;
4546
import org.springframework.batch.core.listener.StepListenerFactoryBean;
4647
import org.springframework.batch.core.repository.JobRepository;
@@ -380,11 +381,20 @@ public ChunkOrientedStepBuilder<I, O> observationRegistry(ObservationRegistry ob
380381
public ChunkOrientedStep<I, O> build() {
381382
Assert.notNull(this.reader, "Item reader must not be null");
382383
Assert.notNull(this.writer, "Item writer must not be null");
384+
if (this.reader instanceof StepExecutionListener listener) {
385+
this.stepListeners.add(listener);
386+
}
387+
if (this.writer instanceof StepExecutionListener listener) {
388+
this.stepListeners.add(listener);
389+
}
383390
ChunkOrientedStep<I, O> chunkOrientedStep = new ChunkOrientedStep<>(this.getName(), this.chunkSize, this.reader,
384391
this.writer, this.getJobRepository());
385392
if (this.processor != null) {
386393
chunkOrientedStep.setItemProcessor(this.processor);
387394
}
395+
if (this.processor instanceof StepExecutionListener listener) {
396+
this.stepListeners.add(listener);
397+
}
388398
chunkOrientedStep.setTransactionManager(this.transactionManager);
389399
chunkOrientedStep.setTransactionAttribute(this.transactionAttribute);
390400
chunkOrientedStep.setInterruptionPolicy(this.interruptionPolicy);
@@ -416,17 +426,20 @@ public ChunkOrientedStep<I, O> build() {
416426
}
417427
streams.forEach(chunkOrientedStep::registerItemStream);
418428
stepListeners.forEach(stepListener -> {
419-
if (stepListener instanceof ItemReadListener) {
420-
chunkOrientedStep.registerItemReadListener((ItemReadListener<I>) stepListener);
429+
if (stepListener instanceof ItemReadListener listener) {
430+
chunkOrientedStep.registerItemReadListener(listener);
431+
}
432+
if (stepListener instanceof ItemProcessListener listener) {
433+
chunkOrientedStep.registerItemProcessListener(listener);
421434
}
422-
if (stepListener instanceof ItemProcessListener) {
423-
chunkOrientedStep.registerItemProcessListener((ItemProcessListener<I, O>) stepListener);
435+
if (stepListener instanceof ItemWriteListener listener) {
436+
chunkOrientedStep.registerItemWriteListener(listener);
424437
}
425-
if (stepListener instanceof ItemWriteListener) {
426-
chunkOrientedStep.registerItemWriteListener((ItemWriteListener<O>) stepListener);
438+
if (stepListener instanceof ChunkListener listener) {
439+
chunkOrientedStep.registerChunkListener(listener);
427440
}
428-
if (stepListener instanceof ChunkListener) {
429-
chunkOrientedStep.registerChunkListener((ChunkListener<I, O>) stepListener);
441+
if (stepListener instanceof StepExecutionListener listener) {
442+
chunkOrientedStep.registerStepExecutionListener(listener);
430443
}
431444
});
432445
retryListeners.forEach(chunkOrientedStep::registerRetryListener);

0 commit comments

Comments
 (0)