Skip to content

Commit

Permalink
Be more cautious when cleaning up during failed write and re-throw us…
Browse files Browse the repository at this point in the history
…er exceptions
  • Loading branch information
holdenk committed Jun 20, 2015
1 parent 165f52f commit 2e6abf7
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,22 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next());
}
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
} catch (Exception e) {
// Trigger a cleanup if we encountered an error
if (sorter != null) {
try {
sorter.cleanupAfterError();
} catch (Exception e2) {
throw new RuntimeException("Failed to perform cleanup after exception", e);
}
}
// re-throw the exception
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,20 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
createWriter(false).stop(false);
}

@Test(expected=RuntimeException.class)
public void writeFailurePropegates() throws Exception {
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
@Override public boolean hasNext() {
throw new RuntimeException("panda magic");
}
@Override public Product2<Object, Object> next() {
return null;
}
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(Iterators.<Product2<Object, Object>>emptyIterator());
}

@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
Expand Down

0 comments on commit 2e6abf7

Please sign in to comment.