diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSpillWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSpillWriter.java index 05cf2e7d0d3cc..68f5b080572ea 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSpillWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleSpillWriter.java @@ -67,7 +67,6 @@ public final class UnsafeShuffleSpillWriter { private final BlockManager blockManager; private final TaskContext taskContext; private final boolean spillingEnabled; - private ShuffleWriteMetrics writeMetrics; /** The buffer size to use when writing spills using DiskBlockObjectWriter */ private final int fileBufferSize; @@ -107,15 +106,11 @@ public UnsafeShuffleSpillWriter( openSorter(); } - // TODO: metrics tracking + integration with shuffle write metrics - /** * Allocates a new sorter. Called when opening the spill writer for the first time and after * each spill. */ private void openSorter() throws IOException { - this.writeMetrics = new ShuffleWriteMetrics(); - // TODO: connect write metrics to task metrics? // TODO: move this sizing calculation logic into a static method of sorter: final long memoryRequested = initialSize * 8L; if (spillingEnabled) { @@ -130,8 +125,8 @@ private void openSorter() throws IOException { } /** - * Sorts the in-memory records, writes the sorted records to a spill file, and frees the in-memory - * data structures associated with this sort. New data structures are not automatically allocated. + * Sorts the in-memory records and writes the sorted records to a spill file. + * This method does not free the sort data structures. */ private SpillInfo writeSpillFile() throws IOException { // This call performs the actual sort. @@ -161,7 +156,17 @@ private SpillInfo writeSpillFile() throws IOException { // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work // around this, we pass a dummy no-op serializer. final SerializerInstance ser = new DummySerializerInstance(); - writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics); + // TODO: audit the metrics-related code and ensure proper metrics integration: + // It's not clear how we should handle shuffle write metrics for spill files; currently, Spark + // doesn't report IO time spent writing spill files (see SPARK-7413). This method, + // writeSpillFile(), is called both when writing spill files and when writing the single output + // file in cases where we didn't spill. As a result, we don't necessarily know whether this + // should be reported as bytes spilled or as shuffle bytes written. We could defer the updating + // of these metrics until the end of the shuffle write, but that would mean that that users + // wouldn't get useful metrics updates in the UI from long-running tasks. Given this complexity, + // I'm deferring these decisions to a separate follow-up commit or patch. + writer = + blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, new ShuffleWriteMetrics()); int currentPartition = -1; while (sortedRecords.hasNext()) { @@ -175,7 +180,8 @@ private SpillInfo writeSpillFile() throws IOException { spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length(); } currentPartition = partition; - writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics); + writer = + blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, new ShuffleWriteMetrics()); } final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); @@ -295,7 +301,6 @@ private void ensureSpaceInDataPage(int requiredSpace) throws IOException { currentPage = memoryManager.allocatePage(PAGE_SIZE); currentPagePosition = currentPage.getBaseOffset(); allocatedPages.add(currentPage); - logger.info("Acquired new page! " + allocatedPages.size() * PAGE_SIZE); } } diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index ad842502bf24f..80e01109eabd4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -125,8 +125,7 @@ private SpillInfo[] insertRecordsIntoSorter( taskContext, 4096, // Initial size (TODO: tune this!) partitioner.numPartitions(), - sparkConf - ); + sparkConf); final byte[] serArray = new byte[SER_BUFFER_SIZE]; final ByteBuffer serByteBuffer = ByteBuffer.wrap(serArray); @@ -182,10 +181,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException { for (int partition = 0; partition < numPartitions; partition++) { for (int i = 0; i < spills.length; i++) { - System.out.println("In partition " + partition + " and spill " + i ); final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - System.out.println("Partition length in spill is " + partitionLengthInSpill); - System.out.println("input channel position is " + spillInputChannels[i].position()); long bytesRemainingToBeTransferred = partitionLengthInSpill; final FileChannel spillInputChannel = spillInputChannels[i]; while (bytesRemainingToBeTransferred > 0) { @@ -228,7 +224,6 @@ public Option stop(boolean success) { } } finally { freeMemory(); - // TODO: increment the shuffle write time metrics } } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index 0a84fdc0e4ca2..e9b4e2b955dc8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -34,7 +34,7 @@ import org.apache.spark.storage._ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap} import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector} -/** A group of writers for ShuffleMapTask, one writer per reducer. */ +/** A group of writers for a ShuffleMapTask, one writer per reducer. */ private[spark] trait ShuffleWriterGroup { val writers: Array[BlockObjectWriter]