From 4afd5d813968a7afa0b27ab4e4e9a7721c222c9a Mon Sep 17 00:00:00 2001 From: Yanming Zhou Date: Tue, 16 Dec 2025 16:50:23 +0800 Subject: [PATCH] Allow reusing CompositeItemReader Before this commit, CompositeItemReader cannot be reused for open and close. Fix GH-4926 Signed-off-by: Yanming Zhou --- .../item/support/CompositeItemReader.java | 23 +++--- .../support/CompositeItemReaderTests.java | 70 ++++++++++++++++++- 2 files changed, 82 insertions(+), 11 deletions(-) diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/support/CompositeItemReader.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/support/CompositeItemReader.java index bf91cad8d7..c23465ad18 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/support/CompositeItemReader.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/support/CompositeItemReader.java @@ -31,6 +31,7 @@ * * @author Mahmoud Ben Hassine * @author Elimelec Burghelea + * @author Yanming Zhou * @param type of objects to read * @since 5.2 */ @@ -38,7 +39,7 @@ public class CompositeItemReader implements ItemStreamReader { private final List> delegates; - private final Iterator> delegatesIterator; + private @Nullable Iterator> delegatesIterator; private @Nullable ItemStreamReader currentDelegate; @@ -48,27 +49,29 @@ public class CompositeItemReader implements ItemStreamReader { */ public CompositeItemReader(List> delegates) { this.delegates = delegates; - this.delegatesIterator = this.delegates.iterator(); - this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null; } // TODO: check if we need to open/close delegates on the fly in read() to avoid // opening resources early for a long time @Override public void open(ExecutionContext executionContext) throws ItemStreamException { - for (ItemStreamReader delegate : delegates) { + + this.delegatesIterator = this.delegates.iterator(); + this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null; + + for (ItemStreamReader delegate : this.delegates) { delegate.open(executionContext); } } @Override public @Nullable T read() throws Exception { - if (this.currentDelegate == null) { + if (this.currentDelegate == null || this.delegatesIterator == null) { return null; } - T item = currentDelegate.read(); + T item = this.currentDelegate.read(); if (item == null) { - currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null; + this.currentDelegate = this.delegatesIterator.hasNext() ? this.delegatesIterator.next() : null; return read(); } return item; @@ -89,9 +92,13 @@ public void update(ExecutionContext executionContext) throws ItemStreamException */ @Override public void close() throws ItemStreamException { + + this.delegatesIterator = null; + this.currentDelegate = null; + List exceptions = new ArrayList<>(); - for (ItemStreamReader delegate : delegates) { + for (ItemStreamReader delegate : this.delegates) { try { delegate.close(); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/support/CompositeItemReaderTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/support/CompositeItemReaderTests.java index 1630c18860..f230dbc9ab 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/support/CompositeItemReaderTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/support/CompositeItemReaderTests.java @@ -23,13 +23,13 @@ import org.springframework.batch.infrastructure.item.ExecutionContext; import org.springframework.batch.infrastructure.item.ItemStreamException; import org.springframework.batch.infrastructure.item.ItemStreamReader; -import org.springframework.batch.infrastructure.item.support.CompositeItemReader; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; /** @@ -37,6 +37,7 @@ * * @author Mahmoud Ben Hassine * @author Elimelec Burghelea + * @author Yanming Zhou */ public class CompositeItemReaderTests { @@ -62,6 +63,7 @@ void testCompositeItemReaderRead() throws Exception { ItemStreamReader reader1 = mock(); ItemStreamReader reader2 = mock(); CompositeItemReader compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2)); + compositeItemReader.open(new ExecutionContext()); when(reader1.read()).thenReturn("foo1", "foo2", null); when(reader2.read()).thenReturn("bar1", "bar2", null); @@ -88,13 +90,15 @@ void testCompositeItemReaderUpdate() { ItemStreamReader reader2 = mock(); CompositeItemReader compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2)); ExecutionContext executionContext = new ExecutionContext(); + compositeItemReader.open(executionContext); // when compositeItemReader.update(executionContext); // then verify(reader1).update(executionContext); - verifyNoInteractions(reader2); // reader1 is the current delegate in this setup + verify(reader2, times(0)).update(executionContext); // reader1 is the current + // delegate in this setup } @Test @@ -103,6 +107,7 @@ void testCompositeItemReaderClose() { ItemStreamReader reader1 = mock(); ItemStreamReader reader2 = mock(); CompositeItemReader compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2)); + compositeItemReader.open(new ExecutionContext()); // when compositeItemReader.close(); @@ -118,6 +123,7 @@ void testCompositeItemReaderCloseWithDelegateThatThrowsException() { ItemStreamReader reader1 = mock(); ItemStreamReader reader2 = mock(); CompositeItemReader compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2)); + compositeItemReader.open(new ExecutionContext()); doThrow(new ItemStreamException("A failure")).when(reader1).close(); @@ -135,4 +141,62 @@ void testCompositeItemReaderCloseWithDelegateThatThrowsException() { verify(reader2).close(); } + @Test + void testCompositeItemReaderRepeatableRead() throws Exception { + // given + ItemStreamReader reader1 = new ItemStreamReader<>() { + int counter = 0; + + @Override + public String read() { + return switch (this.counter++) { + case 0 -> "a"; + case 1 -> "b"; + default -> null; + }; + } + + @Override + public void close() { + this.counter = 0; + } + }; + ItemStreamReader reader2 = new ItemStreamReader<>() { + int counter = 0; + + @Override + public String read() { + return switch (this.counter++) { + case 0 -> "c"; + case 1 -> "d"; + default -> null; + }; + } + + @Override + public void close() { + this.counter = 0; + } + }; + CompositeItemReader compositeItemReader = new CompositeItemReader<>(Arrays.asList(reader1, reader2)); + + for (int i = 0; i < 5; i++) { + verifyRead(compositeItemReader); + } + } + + private void verifyRead(CompositeItemReader compositeItemReader) throws Exception { + // when + compositeItemReader.open(new ExecutionContext()); + + // then + assertEquals("a", compositeItemReader.read()); + assertEquals("b", compositeItemReader.read()); + assertEquals("c", compositeItemReader.read()); + assertEquals("d", compositeItemReader.read()); + assertNull(compositeItemReader.read()); + + compositeItemReader.close(); + } + } \ No newline at end of file