Skip to content

Commit

Permalink
[SPARK-8498] [TUNGSTEN] fix npe in errorhandling path in unsafeshuffl…
Browse files Browse the repository at this point in the history
…e writer

Author: Holden Karau <holden@pigscanfly.ca>

Closes apache#6918 from holdenk/SPARK-8498-fix-npe-in-errorhandling-path-in-unsafeshuffle-writer and squashes the following commits:

f807832 [Holden Karau] Log error if we can't throw it
855f9aa [Holden Karau] Spelling - not my strongest suite. Fix Propegates to Propagates.
039d620 [Holden Karau] Add missing closeandwriteoutput
30e558d [Holden Karau] go back to try/finally
e503b8c [Holden Karau] Improve the test to ensure we aren't masking the underlying exception
ae0b7a7 [Holden Karau] Fix the test
2e6abf7 [Holden Karau] Be more cautious when cleaning up during failed write and re-throw user exceptions
  • Loading branch information
holdenk authored and JoshRosen committed Jun 23, 2015
1 parent 6ceb169 commit 0f92be5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we ecountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
Expand All @@ -147,8 +150,19 @@ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOEx
closeAndWriteOutput();
success = true;
} finally {
if (!success) {
sorter.cleanupAfterError();
if (sorter != null) {
try {
sorter.cleanupAfterError();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,23 @@ public void doNotNeedToCallWriteBeforeUnsuccessfulStop() throws IOException {
createWriter(false).stop(false);
}

class PandaException extends RuntimeException {
}

@Test(expected=PandaException.class)
public void writeFailurePropagates() throws Exception {
class BadRecords extends scala.collection.AbstractIterator<Product2<Object, Object>> {
@Override public boolean hasNext() {
throw new PandaException();
}
@Override public Product2<Object, Object> next() {
return null;
}
}
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
writer.write(new BadRecords());
}

@Test
public void writeEmptyIterator() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
Expand Down

0 comments on commit 0f92be5

Please sign in to comment.