Skip to content

Commit

Permalink
Clarify fileBufferSize units
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent 2d4e4f4 commit 57312c9
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ final class UnsafeShuffleExternalSorter {
private final ShuffleWriteMetrics writeMetrics;

/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSize;
private final int fileBufferSizeBytes;

/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
Expand Down Expand Up @@ -108,8 +108,9 @@ public UnsafeShuffleExternalSorter(
this.initialSize = initialSize;
this.numPartitions = numPartitions;
this.spillingEnabled = conf.getBoolean("spark.shuffle.spill", true);
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;

this.writeMetrics = writeMetrics;
initializeForWriting();
}
Expand Down Expand Up @@ -182,7 +183,7 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// around this, we pass a dummy no-op serializer.
final SerializerInstance ser = DummySerializerInstance.INSTANCE;

writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetricsToUse);
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);

int currentPartition = -1;
while (sortedRecords.hasNext()) {
Expand All @@ -196,7 +197,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
}
currentPartition = partition;
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetricsToUse);
writer =
blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
}

final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
private val consolidateShuffleFiles =
conf.getBoolean("spark.shuffle.consolidateFiles", false)

// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Number of bytes spilled in total
private var _diskBytesSpilled = 0L

// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val fileBufferSize =
sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] class ExternalSorter[K, V, C](
private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)

// Use getSizeAsKb (not bytes) to maintain backwards compatibility of on units are provided
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
private val transferToEnabled = conf.getBoolean("spark.file.transferTo", true)

Expand Down

0 comments on commit 57312c9

Please sign in to comment.