From 2d4e4f42f917aa78dacbc0d58e818f5014401e32 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Tue, 12 May 2015 13:36:21 -0700 Subject: [PATCH] Address some minor comments in UnsafeShuffleExternalSorter. --- .../unsafe/UnsafeShuffleExternalSorter.java | 52 ++++++++++--------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java index 36f05e3df8753..d4db1d298e833 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java @@ -36,6 +36,7 @@ import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.unsafe.memory.TaskMemoryManager; +import org.apache.spark.util.Utils; /** * An external sorter that is specialized for sort-based shuffle. @@ -85,7 +86,7 @@ final class UnsafeShuffleExternalSorter { private final LinkedList spills = new LinkedList(); - // All three of these variables are reset after spilling: + // These variables are reset after spilling: private UnsafeShuffleInMemorySorter sorter; private MemoryBlock currentPage = null; private long currentPagePosition = -1; @@ -110,21 +111,20 @@ public UnsafeShuffleExternalSorter( // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; this.writeMetrics = writeMetrics; - openSorter(); + initializeForWriting(); } /** - * Allocates a new sorter. Called when opening the spill writer for the first time and after - * each spill. + * Allocates new sort data structures. Called when creating the sorter and after each spill. */ - private void openSorter() throws IOException { + private void initializeForWriting() throws IOException { // TODO: move this sizing calculation logic into a static method of sorter: final long memoryRequested = initialSize * 8L; if (spillingEnabled) { final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested); if (memoryAcquired != memoryRequested) { shuffleMemoryManager.release(memoryAcquired); - throw new IOException("Could not acquire memory!"); + throw new IOException("Could not acquire " + memoryRequested + " bytes of memory"); } } @@ -132,24 +132,25 @@ private void openSorter() throws IOException { } /** - * Sorts the in-memory records and writes the sorted records to a spill file. + * Sorts the in-memory records and writes the sorted records to an on-disk file. * This method does not free the sort data structures. * - * @param isSpill if true, this indicates that we're writing a spill and that bytes written should - * be counted towards shuffle spill metrics rather than shuffle write metrics. + * @param isLastFile if true, this indicates that we're writing the final output file and that the + * bytes written should be counted towards shuffle spill metrics rather than + * shuffle write metrics. */ - private void writeSpillFile(boolean isSpill) throws IOException { + private void writeSortedFile(boolean isLastFile) throws IOException { final ShuffleWriteMetrics writeMetricsToUse; - if (isSpill) { + if (isLastFile) { + // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. + writeMetricsToUse = writeMetrics; + } else { // We're spilling, so bytes written should be counted towards spill rather than write. // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count // them towards shuffle bytes written. writeMetricsToUse = new ShuffleWriteMetrics(); - } else { - // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. - writeMetricsToUse = writeMetrics; } // This call performs the actual sort. @@ -221,16 +222,16 @@ private void writeSpillFile(boolean isSpill) throws IOException { if (writer != null) { writer.commitAndClose(); - // If `writeSpillFile()` was called from `closeAndGetSpills()` and no records were inserted, - // then the spill file might be empty. Note that it might be better to avoid calling - // writeSpillFile() in that case. + // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, + // then the file might be empty. Note that it might be better to avoid calling + // writeSortedFile() in that case. if (currentPartition != -1) { spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length(); spills.add(spillInfo); } } - if (isSpill) { + if (!isLastFile) { writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten()); // Consistent with ExternalSorter, we do not count this IO towards shuffle write time. // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this. @@ -244,19 +245,20 @@ private void writeSpillFile(boolean isSpill) throws IOException { */ @VisibleForTesting void spill() throws IOException { - final long threadId = Thread.currentThread().getId(); - logger.info("Thread " + threadId + " spilling sort data of " + - org.apache.spark.util.Utils.bytesToString(getMemoryUsage()) + " to disk (" + - (spills.size() + (spills.size() > 1 ? " times" : " time")) + " so far)"); + logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", + Thread.currentThread().getId(), + Utils.bytesToString(getMemoryUsage()), + spills.size(), + spills.size() > 1 ? " times" : " time"); - writeSpillFile(true); + writeSortedFile(false); final long sorterMemoryUsage = sorter.getMemoryUsage(); sorter = null; shuffleMemoryManager.release(sorterMemoryUsage); final long spillSize = freeMemory(); taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - openSorter(); + initializeForWriting(); } private long getMemoryUsage() { @@ -405,7 +407,7 @@ public SpillInfo[] closeAndGetSpills() throws IOException { try { if (sorter != null) { // Do not count the final file towards the spill count. - writeSpillFile(false); + writeSortedFile(true); freeMemory(); } return spills.toArray(new SpillInfo[spills.size()]);