Skip to content

Commit

Permalink
Update TODOs related to shuffle write metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 7, 2015
1 parent b674412 commit 11feeb6
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public final class UnsafeShuffleSpillWriter {
private final BlockManager blockManager;
private final TaskContext taskContext;
private final boolean spillingEnabled;
private ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSize;
Expand Down Expand Up @@ -107,15 +106,11 @@ public UnsafeShuffleSpillWriter(
openSorter();
}

// TODO: metrics tracking + integration with shuffle write metrics

/**
* Allocates a new sorter. Called when opening the spill writer for the first time and after
* each spill.
*/
private void openSorter() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
// TODO: connect write metrics to task metrics?
// TODO: move this sizing calculation logic into a static method of sorter:
final long memoryRequested = initialSize * 8L;
if (spillingEnabled) {
Expand All @@ -130,8 +125,8 @@ private void openSorter() throws IOException {
}

/**
* Sorts the in-memory records, writes the sorted records to a spill file, and frees the in-memory
* data structures associated with this sort. New data structures are not automatically allocated.
* Sorts the in-memory records and writes the sorted records to a spill file.
* This method does not free the sort data structures.
*/
private SpillInfo writeSpillFile() throws IOException {
// This call performs the actual sort.
Expand Down Expand Up @@ -161,7 +156,17 @@ private SpillInfo writeSpillFile() throws IOException {
// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = new DummySerializerInstance();
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics);
// TODO: audit the metrics-related code and ensure proper metrics integration:
// It's not clear how we should handle shuffle write metrics for spill files; currently, Spark
// doesn't report IO time spent writing spill files (see SPARK-7413). This method,
// writeSpillFile(), is called both when writing spill files and when writing the single output
// file in cases where we didn't spill. As a result, we don't necessarily know whether this
// should be reported as bytes spilled or as shuffle bytes written. We could defer the updating
// of these metrics until the end of the shuffle write, but that would mean that that users
// wouldn't get useful metrics updates in the UI from long-running tasks. Given this complexity,
// I'm deferring these decisions to a separate follow-up commit or patch.
writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, new ShuffleWriteMetrics());

int currentPartition = -1;
while (sortedRecords.hasNext()) {
Expand All @@ -175,7 +180,8 @@ private SpillInfo writeSpillFile() throws IOException {
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
}
currentPartition = partition;
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics);
writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, new ShuffleWriteMetrics());
}

final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
Expand Down Expand Up @@ -295,7 +301,6 @@ private void ensureSpaceInDataPage(int requiredSpace) throws IOException {
currentPage = memoryManager.allocatePage(PAGE_SIZE);
currentPagePosition = currentPage.getBaseOffset();
allocatedPages.add(currentPage);
logger.info("Acquired new page! " + allocatedPages.size() * PAGE_SIZE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ private SpillInfo[] insertRecordsIntoSorter(
taskContext,
4096, // Initial size (TODO: tune this!)
partitioner.numPartitions(),
sparkConf
);
sparkConf);

final byte[] serArray = new byte[SER_BUFFER_SIZE];
final ByteBuffer serByteBuffer = ByteBuffer.wrap(serArray);
Expand Down Expand Up @@ -182,10 +181,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {

for (int partition = 0; partition < numPartitions; partition++) {
for (int i = 0; i < spills.length; i++) {
System.out.println("In partition " + partition + " and spill " + i );
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
System.out.println("Partition length in spill is " + partitionLengthInSpill);
System.out.println("input channel position is " + spillInputChannels[i].position());
long bytesRemainingToBeTransferred = partitionLengthInSpill;
final FileChannel spillInputChannel = spillInputChannels[i];
while (bytesRemainingToBeTransferred > 0) {
Expand Down Expand Up @@ -228,7 +224,6 @@ public Option<MapStatus> stop(boolean success) {
}
} finally {
freeMemory();
// TODO: increment the shuffle write time metrics
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}

/** A group of writers for ShuffleMapTask, one writer per reducer. */
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
val writers: Array[BlockObjectWriter]

Expand Down

0 comments on commit 11feeb6

Please sign in to comment.