Skip to content

Commit

Permalink
onWriteError is called when items fail in recovery mode
Browse files Browse the repository at this point in the history
  • Loading branch information
magott committed Nov 23, 2011
1 parent 7fb5c64 commit a654017
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ private void scan(final StepContribution contribution, final Chunk<I> inputs, fi
outputIterator.remove();
}
catch (Exception e) {
doOnWriteError(e, items);
if (!shouldSkip(itemWriteSkipPolicy, e, -1) && !rollbackClassifier.classify(e)) {
inputIterator.remove();
outputIterator.remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected final void doWrite(List<O> items) throws Exception {
doAfterWrite(items);
}
catch (Exception e) {
listener.onWriteError(e, items);
doOnWriteError(e, items);
throw e;
}

Expand All @@ -165,6 +165,9 @@ protected final void doWrite(List<O> items) throws Exception {
protected final void doAfterWrite(List<O> items) {
listener.afterWrite(items);
}
protected final void doOnWriteError(Exception e, List<O> items) {
listener.onWriteError(e, items);
}

protected void writeItems(List<O> items) throws Exception {
if (itemWriter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class FaultTolerantChunkProcessorTests {
private List<String> list = new ArrayList<String>();

private List<String> after = new ArrayList<String>();

private List<String> writeError = new ArrayList<String>();

private FaultTolerantChunkProcessor<String, String> processor;

Expand Down Expand Up @@ -213,22 +215,10 @@ public void afterWrite(List<? extends String> item) {
}
}));
processor.setWriteSkipPolicy(new AlwaysSkipItemSkipPolicy());
try {
processor.process(contribution, chunk);
fail();
}
catch (RuntimeException e) {
assertEquals("Planned failure!", e.getMessage());
}
processAndExpectPlannedRuntimeException(chunk);
processor.process(contribution, chunk);
assertEquals(2, chunk.getItems().size());
try {
processor.process(contribution, chunk);
fail();
}
catch (RuntimeException e) {
assertEquals("Planned failure!", e.getMessage());
}
processAndExpectPlannedRuntimeException(chunk);
assertEquals(1, chunk.getItems().size());
processor.process(contribution, chunk);
assertEquals(0, chunk.getItems().size());
Expand Down Expand Up @@ -260,17 +250,65 @@ public void afterWrite(List<? extends String> item) {
}));
processor.setWriteSkipPolicy(new AlwaysSkipItemSkipPolicy());

processAndExpectPlannedRuntimeException(chunk);
processor.process(contribution, chunk);
processor.process(contribution, chunk);

assertEquals("[foo, bar]", list.toString());
assertEquals("[foo, bar]", after.toString());
}

@Test
public void testOnErrorInWrite() throws Exception{
Chunk<String> chunk = new Chunk<String>(Arrays.asList("foo", "fail"));
processor.setListeners(Arrays.asList(new ItemListenerSupport<String, String>() {
@Override
public void onWriteError(Exception e, List<? extends String> item) {
writeError.addAll(item);
}
}));
processor.setWriteSkipPolicy(new AlwaysSkipItemSkipPolicy());

processAndExpectPlannedRuntimeException(chunk);//Process foo, fail
processor.process(contribution, chunk);;//Process foo
processAndExpectPlannedRuntimeException(chunk);//Process fail

assertEquals("[foo, fail, fail]", writeError.toString());
}

@Test
public void testOnErrorInWriteAllItemsFail() throws Exception{
Chunk<String> chunk = new Chunk<String>(Arrays.asList("foo", "bar"));
processor = new FaultTolerantChunkProcessor<String, String>(new PassThroughItemProcessor<String>(),
new ItemWriter<String>() {
public void write(List<? extends String> items) throws Exception {
//Always fail in writer
throw new RuntimeException("Planned failure!");
}
}, batchRetryTemplate);
processor.setListeners(Arrays.asList(new ItemListenerSupport<String, String>() {
@Override
public void onWriteError(Exception e, List<? extends String> item) {
writeError.addAll(item);
}
}));
processor.setWriteSkipPolicy(new AlwaysSkipItemSkipPolicy());

processAndExpectPlannedRuntimeException(chunk);//Process foo, bar
processAndExpectPlannedRuntimeException(chunk);//Process foo
processAndExpectPlannedRuntimeException(chunk);//Process bar

assertEquals("[foo, bar, foo, bar]", writeError.toString());
}

protected void processAndExpectPlannedRuntimeException(Chunk<String> chunk)
throws Exception {
try {
processor.process(contribution, chunk);
fail();
}
catch (RuntimeException e) {
assertEquals("Planned failure!", e.getMessage());
}
processor.process(contribution, chunk);
processor.process(contribution, chunk);

assertEquals("[foo, bar]", list.toString());
assertEquals("[foo, bar]", after.toString());
}
}

0 comments on commit a654017

Please sign in to comment.