From 853128684abc302a965ade6dba8df09b08345c5d Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 11 May 2015 16:42:11 -0700 Subject: [PATCH] Add tests that automatically trigger spills. This bumps up line coverage to 93% in UnsafeShuffleExternalSorter; now, the only branches that are missed are exception-handling code. --- .../shuffle/unsafe/UnsafeShuffleWriter.java | 5 ++- .../unsafe/UnsafeShuffleWriterSuite.java | 41 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index 8977517c0bcbe..02bf7e321df12 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -59,6 +59,9 @@ public class UnsafeShuffleWriter extends ShuffleWriter { private static final ClassTag OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object(); + @VisibleForTesting + static final int INITIAL_SORT_BUFFER_SIZE = 4096; + private final BlockManager blockManager; private final IndexShuffleBlockResolver shuffleBlockResolver; private final TaskMemoryManager memoryManager; @@ -152,7 +155,7 @@ private void open() throws IOException { shuffleMemoryManager, blockManager, taskContext, - 4096, // Initial size (TODO: tune this!) + INITIAL_SORT_BUFFER_SIZE, partitioner.numPartitions(), sparkConf); serBuffer = new MyByteArrayOutputStream(1024 * 1024); diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java index 01bf7a5095970..c53e0fcf44880 100644 --- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java @@ -363,6 +363,47 @@ public void mergeSpillsWithFileStreamAndNoCompression() throws Exception { testMergingSpills(false, null); } + @Test + public void writeEnoughDataToTriggerSpill() throws Exception { + when(shuffleMemoryManager.tryToAcquire(anyLong())) + .then(returnsFirstArg()) // Allocate initial sort buffer + .then(returnsFirstArg()) // Allocate initial data page + .thenReturn(0L) // Deny request to allocate new data page + .then(returnsFirstArg()); // Grant new sort buffer and data page. + final UnsafeShuffleWriter writer = createWriter(false); + final ArrayList> dataToWrite = new ArrayList>(); + final byte[] bigByteArray = new byte[PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES / 128]; + for (int i = 0; i < 128 + 1; i++) { + dataToWrite.add(new Tuple2(i, bigByteArray)); + } + writer.write(dataToWrite.iterator()); + verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong()); + Assert.assertEquals(2, spillFilesCreated.size()); + writer.stop(true); + readRecordsFromFile(); + assertSpillFilesWereCleanedUp(); + } + + @Test + public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { + when(shuffleMemoryManager.tryToAcquire(anyLong())) + .then(returnsFirstArg()) // Allocate initial sort buffer + .then(returnsFirstArg()) // Allocate initial data page + .thenReturn(0L) // Deny request to grow sort buffer + .then(returnsFirstArg()); // Grant new sort buffer and data page. + final UnsafeShuffleWriter writer = createWriter(false); + final ArrayList> dataToWrite = new ArrayList>(); + for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) { + dataToWrite.add(new Tuple2(i, i)); + } + writer.write(dataToWrite.iterator()); + verify(shuffleMemoryManager, times(5)).tryToAcquire(anyLong()); + Assert.assertEquals(2, spillFilesCreated.size()); + writer.stop(true); + readRecordsFromFile(); + assertSpillFilesWereCleanedUp(); + } + @Test public void writeRecordsThatAreBiggerThanDiskWriteBufferSize() throws Exception { final UnsafeShuffleWriter writer = createWriter(false);