Skip to content

Commit

Permalink
Add tests that automatically trigger spills.
Browse files Browse the repository at this point in the history
This bumps up line coverage to 93% in UnsafeShuffleExternalSorter; now,
the only branches that are missed are exception-handling code.
  • Loading branch information
JoshRosen committed May 11, 2015
1 parent 7c953f9 commit 8531286
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {

private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();

@VisibleForTesting
static final int INITIAL_SORT_BUFFER_SIZE = 4096;

private final BlockManager blockManager;
private final IndexShuffleBlockResolver shuffleBlockResolver;
private final TaskMemoryManager memoryManager;
Expand Down Expand Up @@ -152,7 +155,7 @@ private void open() throws IOException {
shuffleMemoryManager,
blockManager,
taskContext,
4096, // Initial size (TODO: tune this!)
INITIAL_SORT_BUFFER_SIZE,
partitioner.numPartitions(),
sparkConf);
serBuffer = new MyByteArrayOutputStream(1024 * 1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,47 @@ public void mergeSpillsWithFileStreamAndNoCompression() throws Exception {
testMergingSpills(false, null);
}

@Test
public void writeEnoughDataToTriggerSpill() throws Exception {
when(shuffleMemoryManager.tryToAcquire(anyLong()))
.then(returnsFirstArg()) // Allocate initial sort buffer
.then(returnsFirstArg()) // Allocate initial data page
.thenReturn(0L) // Deny request to allocate new data page
.then(returnsFirstArg()); // Grant new sort buffer and data page.
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<Product2<Object, Object>>();
final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 128];
for (int i = 0; i < 128 + 1; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, bigByteArray));
}
writer.write(dataToWrite.iterator());
verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong());
Assert.assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
}

@Test
public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
when(shuffleMemoryManager.tryToAcquire(anyLong()))
.then(returnsFirstArg()) // Allocate initial sort buffer
.then(returnsFirstArg()) // Allocate initial data page
.thenReturn(0L) // Deny request to grow sort buffer
.then(returnsFirstArg()); // Grant new sort buffer and data page.
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<Product2<Object, Object>>();
for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
writer.write(dataToWrite.iterator());
verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong());
Assert.assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
}

@Test
public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception {
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
Expand Down

0 comments on commit 8531286

Please sign in to comment.