Skip to content

Commit

Permalink
More defensive programming RE: cleaning up spill files and memory aft…
Browse files Browse the repository at this point in the history
…er errors
  • Loading branch information
JoshRosen committed May 9, 2015
1 parent 7cd013b commit 9b7ebed
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,22 @@ private long freeMemory() {
return memoryFreed;
}

/**
* Force all memory and spill files to be deleted; called by shuffle error-handling code.
*/
public void cleanupAfterError() {
freeMemory();
for (SpillInfo spill : spills) {
if (spill.file.exists() && !spill.file.delete()) {
logger.error("Unable to delete spill file {}", spill.file.getPath());
}
}
if (spillingEnabled && sorter != null) {
shuffleMemoryManager.release(sorter.getMemoryUsage());
sorter = null;
}
}

/**
* Checks whether there is enough space to insert a new record into the sorter.
*
Expand Down Expand Up @@ -362,11 +378,16 @@ public void insertRecord(
* @throws IOException
*/
public SpillInfo[] closeAndGetSpills() throws IOException {
if (sorter != null) {
writeSpillFile();
freeMemory();
try {
if (sorter != null) {
writeSpillFile();
freeMemory();
}
return spills.toArray(new SpillInfo[spills.size()]);
} catch (IOException e) {
cleanupAfterError();
throw e;
}
return spills.toArray(new SpillInfo[0]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.shuffle.unsafe;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
Expand All @@ -34,6 +31,7 @@
import com.esotericsoftware.kryo.io.ByteBufferOutputStream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -152,16 +150,22 @@ void closeAndWriteOutput() throws IOException {
serArray = null;
serByteBuffer = null;
serOutputStream = null;
final long[] partitionLengths = mergeSpills(sorter.closeAndGetSpills());
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
try {
partitionLengths = mergeSpills(spills);
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockManager.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

private void freeMemory() {
// TODO
}

@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException{
if (sorter == null) {
Expand Down Expand Up @@ -241,17 +245,10 @@ private long[] mergeSpillsWithFileStream(SpillInfo[] spills, File outputFile) th
}
}
} finally {
for (int i = 0; i < spills.length; i++) {
if (spillInputStreams[i] != null) {
spillInputStreams[i].close();
if (!spills[i].file.delete()) {
logger.error("Error while deleting spill file {}", spills[i]);
}
}
}
if (mergedFileOutputStream != null) {
mergedFileOutputStream.close();
for (InputStream stream : spillInputStreams) {
Closeables.close(stream, false);
}
Closeables.close(mergedFileOutputStream, false);
}
return partitionLengths;
}
Expand Down Expand Up @@ -305,16 +302,9 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
} finally {
for (int i = 0; i < spills.length; i++) {
assert(spillInputChannelPositions[i] == spills[i].file.length());
if (spillInputChannels[i] != null) {
spillInputChannels[i].close();
if (!spills[i].file.delete()) {
logger.error("Error while deleting spill file {}", spills[i]);
}
}
}
if (mergedFileOutputChannel != null) {
mergedFileOutputChannel.close();
Closeables.close(spillInputChannels[i], false);
}
Closeables.close(mergedFileOutputChannel, false);
}
return partitionLengths;
}
Expand All @@ -326,7 +316,6 @@ public Option<MapStatus> stop(boolean success) {
return Option.apply(null);
} else {
stopping = true;
freeMemory();
if (success) {
if (mapStatus == null) {
throw new IllegalStateException("Cannot call stop(true) without having called write()");
Expand All @@ -339,7 +328,11 @@ public Option<MapStatus> stop(boolean success) {
}
}
} finally {
freeMemory();
if (sorter != null) {
// If sorter is non-null, then this implies that we called stop() in response to an error,
// so we need to clean up memory and spill files created by the sorter
sorter.cleanupAfterError();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
public class UnsafeShuffleWriterSuite {

static final int NUM_PARTITITONS = 4;
final TaskMemoryManager memoryManager =
final TaskMemoryManager taskMemoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
final HashPartitioner hashPartitioner = new HashPartitioner(NUM_PARTITITONS);
File mergedOutputFile;
Expand All @@ -82,6 +82,10 @@ public OutputStream apply(OutputStream stream) {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
Assert.fail("Test leaked " + leakedMemory + " bytes of managed memory");
}
}

@Before
Expand Down Expand Up @@ -154,7 +158,7 @@ private UnsafeShuffleWriter<Object, Object> createWriter(boolean transferToEnabl
return new UnsafeShuffleWriter<Object, Object>(
blockManager,
shuffleBlockManager,
memoryManager,
taskMemoryManager,
shuffleMemoryManager,
new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
0, // map id
Expand Down Expand Up @@ -216,7 +220,7 @@ public void writeWithoutSpilling() throws Exception {
}

private void testMergingSpills(boolean transferToEnabled) throws IOException {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(true);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(transferToEnabled);
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(3, 3));
Expand Down Expand Up @@ -249,8 +253,17 @@ public void mergeSpillsWithFileStream() throws Exception {
testMergingSpills(false);
}

@Test
public void spillFilesAreDeletedWhenStoppingAfterError() throws IOException {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
writer.forceSorterToSpill();
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(2, 2));
writer.stop(false);
assertSpillFilesWereCleanedUp();
}

// TODO: actually try to read the shuffle output?
// TODO: add a test that manually triggers spills in order to exercise the merging.
// }

}

0 comments on commit 9b7ebed

Please sign in to comment.