Skip to content

Commit

Permalink
Address some minor comments in UnsafeShuffleExternalSorter.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent fdcac08 commit 2d4e4f4
Showing 1 changed file with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

/**
* An external sorter that is specialized for sort-based shuffle.
Expand Down Expand Up @@ -85,7 +86,7 @@ final class UnsafeShuffleExternalSorter {

private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();

// All three of these variables are reset after spilling:
// These variables are reset after spilling:
private UnsafeShuffleInMemorySorter sorter;
private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
Expand All @@ -110,46 +111,46 @@ public UnsafeShuffleExternalSorter(
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.writeMetrics = writeMetrics;
openSorter();
initializeForWriting();
}

/**
* Allocates a new sorter. Called when opening the spill writer for the first time and after
* each spill.
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
private void openSorter() throws IOException {
private void initializeForWriting() throws IOException {
// TODO: move this sizing calculation logic into a static method of sorter:
final long memoryRequested = initialSize * 8L;
if (spillingEnabled) {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
if (memoryAcquired != memoryRequested) {
shuffleMemoryManager.release(memoryAcquired);
throw new IOException("Could not acquire memory!");
throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
}
}

this.sorter = new UnsafeShuffleInMemorySorter(initialSize);
}

/**
* Sorts the in-memory records and writes the sorted records to a spill file.
* Sorts the in-memory records and writes the sorted records to an on-disk file.
* This method does not free the sort data structures.
*
* @param isSpill if true, this indicates that we're writing a spill and that bytes written should
* be counted towards shuffle spill metrics rather than shuffle write metrics.
* @param isLastFile if true, this indicates that we're writing the final output file and that the
* bytes written should be counted towards shuffle spill metrics rather than
* shuffle write metrics.
*/
private void writeSpillFile(boolean isSpill) throws IOException {
private void writeSortedFile(boolean isLastFile) throws IOException {

final ShuffleWriteMetrics writeMetricsToUse;

if (isSpill) {
if (isLastFile) {
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
writeMetricsToUse = writeMetrics;
} else {
// We're spilling, so bytes written should be counted towards spill rather than write.
// Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
// them towards shuffle bytes written.
writeMetricsToUse = new ShuffleWriteMetrics();
} else {
// We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
writeMetricsToUse = writeMetrics;
}

// This call performs the actual sort.
Expand Down Expand Up @@ -221,16 +222,16 @@ private void writeSpillFile(boolean isSpill) throws IOException {

if (writer != null) {
writer.commitAndClose();
// If `writeSpillFile()` was called from `closeAndGetSpills()` and no records were inserted,
// then the spill file might be empty. Note that it might be better to avoid calling
// writeSpillFile() in that case.
// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
// then the file might be empty. Note that it might be better to avoid calling
// writeSortedFile() in that case.
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
spills.add(spillInfo);
}
}

if (isSpill) {
if (!isLastFile) {
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
Expand All @@ -244,19 +245,20 @@ private void writeSpillFile(boolean isSpill) throws IOException {
*/
@VisibleForTesting
void spill() throws IOException {
final long threadId = Thread.currentThread().getId();
logger.info("Thread " + threadId + " spilling sort data of " +
org.apache.spark.util.Utils.bytesToString(getMemoryUsage()) + " to disk (" +
(spills.size() + (spills.size() > 1 ? " times" : " time")) + " so far)");
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() > 1 ? " times" : " time");

writeSpillFile(true);
writeSortedFile(false);
final long sorterMemoryUsage = sorter.getMemoryUsage();
sorter = null;
shuffleMemoryManager.release(sorterMemoryUsage);
final long spillSize = freeMemory();
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);

openSorter();
initializeForWriting();
}

private long getMemoryUsage() {
Expand Down Expand Up @@ -405,7 +407,7 @@ public SpillInfo[] closeAndGetSpills() throws IOException {
try {
if (sorter != null) {
// Do not count the final file towards the spill count.
writeSpillFile(false);
writeSortedFile(true);
freeMemory();
}
return spills.toArray(new SpillInfo[spills.size()]);
Expand Down

0 comments on commit 2d4e4f4

Please sign in to comment.