Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25299] Move shuffle writers back to being given specific partition ids #540

Merged
merged 1 commit into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@Experimental
public interface ShuffleMapOutputWriter {
ShufflePartitionWriter getNextPartitionWriter() throws IOException;
ShufflePartitionWriter getPartitionWriter(int partitionId) throws IOException;

void commitAllPartitions() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) thro
boolean copyThrewException = true;
ShufflePartitionWriter writer = null;
try {
writer = mapOutputWriter.getNextPartitionWriter();
writer = mapOutputWriter.getPartitionWriter(i);
if (!file.exists()) {
copyThrewException = false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,18 +285,6 @@ private long[] mergeSpills(SpillInfo[] spills,
long[] partitionLengths = new long[numPartitions];
try {
if (spills.length == 0) {
// The contract we are working under states that we will open a partition writer for
// each partition, regardless of number of spills
for (int i = 0; i < numPartitions; i++) {
ShufflePartitionWriter writer = null;
try {
writer = mapWriter.getNextPartitionWriter();
} finally {
if (writer != null) {
writer.close();
}
}
}
return partitionLengths;
} else {
// There are multiple spills to merge, so none of these spill files' lengths were counted
Expand Down Expand Up @@ -372,7 +360,7 @@ private long[] mergeSpillsWithFileStream(
boolean copyThrewExecption = true;
ShufflePartitionWriter writer = null;
try {
writer = mapWriter.getNextPartitionWriter();
writer = mapWriter.getPartitionWriter(partition);
OutputStream partitionOutput = null;
try {
// Shield the underlying output stream from close() calls, so that we can close the
Expand Down Expand Up @@ -451,7 +439,7 @@ private long[] mergeSpillsWithTransferTo(
boolean copyThrewExecption = true;
ShufflePartitionWriter writer = null;
try {
writer = mapWriter.getNextPartitionWriter();
writer = mapWriter.getPartitionWriter(partition);
WritableByteChannel channel = writer.toChannel();
for (int i = 0; i < spills.length; i++) {
long partitionLengthInSpill = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {
private final IndexShuffleBlockResolver blockResolver;
private final long[] partitionLengths;
private final int bufferSize;
private int currPartitionId = 0;
private int lastPartitionId = -1;
private long currChannelPosition;

private final File outputFile;
Expand Down Expand Up @@ -77,7 +77,11 @@ public DefaultShuffleMapOutputWriter(
}

@Override
public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
public ShufflePartitionWriter getPartitionWriter(int partitionId) throws IOException {
if (partitionId <= lastPartitionId) {
throw new IllegalArgumentException("Partitions should be requested in increasing order.");
}
lastPartitionId = partitionId;
if (outputTempFile == null) {
outputTempFile = Utils.tempFileWith(outputFile);
}
Expand All @@ -86,7 +90,7 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
} else {
currChannelPosition = 0L;
}
return new DefaultShufflePartitionWriter(currPartitionId++);
return new DefaultShufflePartitionWriter(partitionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,17 +721,6 @@ private[spark] class ExternalSorter[K, V, C](
lengths
}

private def writeEmptyPartition(mapOutputWriter: ShuffleMapOutputWriter): Unit = {
var partitionWriter: ShufflePartitionWriter = null
try {
partitionWriter = mapOutputWriter.getNextPartitionWriter
} finally {
if (partitionWriter != null) {
partitionWriter.close()
}
}
}

/**
* Write all the data added into this ExternalSorter into a map output writer that pushes bytes
* to some arbitrary backing store. This is called by the SortShuffleWriter.
Expand All @@ -742,26 +731,16 @@ private[spark] class ExternalSorter[K, V, C](
shuffleId: Int, mapId: Int, mapOutputWriter: ShuffleMapOutputWriter): Array[Long] = {
// Track location of each range in the map output
val lengths = new Array[Long](numPartitions)
var nextPartitionId = 0
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext()) {
val partitionId = it.nextPartition()
// The contract for the plugin is that we will ask for a writer for every partition
// even if it's empty. However, the external sorter will return non-contiguous
// partition ids. So this loop "backfills" the empty partitions that form the gaps.

// The algorithm as a whole is correct because the partition ids are returned by the
// iterator in ascending order.
for (emptyPartition <- nextPartitionId until partitionId) {
writeEmptyPartition(mapOutputWriter)
}
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
try {
partitionWriter = mapOutputWriter.getNextPartitionWriter
partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)
val blockId = ShuffleBlockId(shuffleId, mapId, partitionId)
partitionPairsWriter = new ShufflePartitionPairsWriter(
partitionWriter,
Expand All @@ -783,7 +762,6 @@ private[spark] class ExternalSorter[K, V, C](
if (partitionWriter != null) {
lengths(partitionId) = partitionWriter.getNumBytesWritten
}
nextPartitionId = partitionId + 1
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
Expand All @@ -794,14 +772,11 @@ private[spark] class ExternalSorter[K, V, C](

// The algorithm as a whole is correct because the partition ids are returned by the
// iterator in ascending order.
for (emptyPartition <- nextPartitionId until id) {
writeEmptyPartition(mapOutputWriter)
}
val blockId = ShuffleBlockId(shuffleId, mapId, id)
var partitionWriter: ShufflePartitionWriter = null
var partitionPairsWriter: ShufflePartitionPairsWriter = null
try {
partitionWriter = mapOutputWriter.getNextPartitionWriter
partitionWriter = mapOutputWriter.getPartitionWriter(id)
partitionPairsWriter = new ShufflePartitionPairsWriter(
partitionWriter,
serializerManager,
Expand All @@ -821,16 +796,9 @@ private[spark] class ExternalSorter[K, V, C](
if (partitionWriter != null) {
lengths(id) = partitionWriter.getNumBytesWritten
}
nextPartitionId = id + 1
}
}

// The iterator may have stopped short of opening a writer for every partition. So fill in the
// remaining empty partitions.
for (emptyPartition <- nextPartitionId until numPartitions) {
writeEmptyPartition(mapOutputWriter)
}

context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("writing to an outputstream") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val stream = writer.toStream()
data(p).foreach { i => stream.write(i)}
stream.close()
Expand All @@ -152,7 +152,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("writing to a channel") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val channel = writer.toChannel()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
Expand All @@ -172,7 +172,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("copyStreams with an outputstream") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val stream = writer.toStream()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
Expand All @@ -193,7 +193,7 @@ class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAft

test("copyStreamsWithNIO with a channel") {
(0 until NUM_PARTITIONS).foreach{ p =>
val writer = mapOutputWriter.getNextPartitionWriter
val writer = mapOutputWriter.getPartitionWriter(p)
val channel = writer.toChannel()
val byteBuffer = ByteBuffer.allocate(D_LEN * 4)
val intBuffer = byteBuffer.asIntBuffer()
Expand Down