Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25299] Add attempt ID in shuffle API #570

Merged
merged 3 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ public class ShuffleBlockInfo {
private final int mapId;
private final int reduceId;
private final long length;
private final int attemptNumber;
private final Optional<BlockManagerId> shuffleLocation;

public ShuffleBlockInfo(int shuffleId, int mapId, int reduceId, long length,
Optional<BlockManagerId> shuffleLocation) {
int attemptNumber, Optional<BlockManagerId> shuffleLocation) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.reduceId = reduceId;
this.length = length;
this.attemptNumber = attemptNumber;
this.shuffleLocation = shuffleLocation;
}

Expand All @@ -59,6 +61,10 @@ public long getLength() {
return length;
}

public int getAttemptNumber() {
return attemptNumber;
}

public Optional<BlockManagerId> getShuffleLocation() {
return shuffleLocation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ public interface ShuffleWriteSupport {
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions) throws IOException;
int numPartitions,
int attemptNumber) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final int mapId;
private final int attemptNumber;
private final Serializer serializer;
private final ShuffleWriteSupport shuffleWriteSupport;

Expand All @@ -108,6 +109,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
int attemptNumber,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
ShuffleWriteSupport shuffleWriteSupport) {
Expand All @@ -118,6 +120,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.attemptNumber = attemptNumber;
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
Expand All @@ -129,12 +132,13 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, numPartitions);
.createMapOutputWriter(shuffleId, mapId, numPartitions, attemptNumber);
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
Optional<BlockManagerId> location = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
Option.apply(location.orNull()), partitionLengths, attemptNumber);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -168,7 +172,8 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

partitionLengths = writePartitionedData(mapOutputWriter);
Optional<BlockManagerId> location = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
Option.apply(location.orNull()), partitionLengths, attemptNumber);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ void closeAndWriteOutput() throws IOException {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions(), taskContext.attemptNumber());
final long[] partitionLengths;
Optional<BlockManagerId> location;
try {
Expand All @@ -239,7 +239,8 @@ void closeAndWriteOutput() throws IOException {
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
Option.apply(location.orNull()), partitionLengths, taskContext.attemptNumber());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public DefaultShuffleWriteSupport(
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions) {
int numPartitions,
int attemptNumber) {
return new DefaultShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, shuffleServerId,
TaskContext.get().taskMetrics().shuffleWriteMetrics(), blockResolver, sparkConf);
Expand Down
36 changes: 25 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: Option[BlockManagerId]

def attemptNumber: Int

/**
* Estimated size for the reduce block, in bytes.
*
Expand All @@ -56,11 +58,12 @@ private[spark] object MapStatus {
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long]): MapStatus = {
def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int)
: MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(maybeLoc, uncompressedSizes)
HighlyCompressedMapStatus(maybeLoc, uncompressedSizes, attemptNumber)
} else {
new CompressedMapStatus(maybeLoc, uncompressedSizes)
new CompressedMapStatus(maybeLoc, uncompressedSizes, attemptNumber)
}
}

Expand Down Expand Up @@ -103,13 +106,15 @@ private[spark] object MapStatus {
*/
private[spark] class CompressedMapStatus(
private[this] var loc: Option[BlockManagerId],
private[this] var compressedSizes: Array[Byte])
private[this] var compressedSizes: Array[Byte],
private[this] var attemptNum: Int)
extends MapStatus with Externalizable {

protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only
// For deserialization only
protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1)

def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.map(MapStatus.compressSize))
def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int) {
this(loc, uncompressedSizes.map(MapStatus.compressSize), attemptNumber)
}

override def location: Option[BlockManagerId] = loc
Expand All @@ -127,6 +132,7 @@ private[spark] class CompressedMapStatus(
}
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
out.writeInt(attemptNum)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -138,7 +144,10 @@ private[spark] class CompressedMapStatus(
val len = in.readInt()
compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
attemptNum = in.readInt()
}

override def attemptNumber: Int = attemptNum
}

/**
Expand All @@ -157,14 +166,15 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte])
private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte],
private[this] var attemptNum: Int)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, null) // For deserialization only
protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only

override def location: Option[BlockManagerId] = loc

Expand Down Expand Up @@ -194,6 +204,7 @@ private[spark] class HighlyCompressedMapStatus private (
out.writeInt(kv._1)
out.writeByte(kv._2)
}
out.writeInt(attemptNum)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -213,11 +224,14 @@ private[spark] class HighlyCompressedMapStatus private (
hugeBlockSizesImpl(block) = size
}
hugeBlockSizes = hugeBlockSizesImpl
attemptNum = in.readInt()
}

override def attemptNumber: Int = attemptNum
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long])
def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int)
: HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
Expand Down Expand Up @@ -259,6 +273,6 @@ private[spark] object HighlyCompressedMapStatus {
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizes)
hugeBlockSizes, attemptNumber)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.io.DefaultShuffleReadSupport
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.storage.ShuffleBlockAttemptId
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

Expand Down Expand Up @@ -63,12 +63,13 @@ private[spark] class BlockStoreShuffleReader[K, C](
.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)
.flatMap { shuffleLocationInfo =>
shuffleLocationInfo._2.map { blockInfo =>
val block = blockInfo._1.asInstanceOf[ShuffleBlockId]
val block = blockInfo._1.asInstanceOf[ShuffleBlockAttemptId]
new ShuffleBlockInfo(
block.shuffleId,
block.mapId,
block.reduceId,
blockInfo._2,
block.attemptNumber,
Optional.ofNullable(shuffleLocationInfo._1.orNull))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.api.shuffle.{ShuffleBlockInfo, ShuffleReadSupport}
import org.apache.spark.internal.config
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockAttemptId, ShuffleBlockFetcherIterator, ShuffleBlockId}

class DefaultShuffleReadSupport(
blockManager: BlockManager,
Expand Down Expand Up @@ -93,7 +93,12 @@ private class ShuffleBlockFetcherIterable(
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(shuffleId, minReduceId, maxReduceId + 1)
.map(loc => (loc._1.get, loc._2)),
.map(loc => (
loc._1.get,
loc._2.map { case(shuffleBlockAttemptId, size) =>
val block = shuffleBlockAttemptId.asInstanceOf[ShuffleBlockAttemptId]
(ShuffleBlockId(block.shuffleId, block.mapId, block.reduceId), size)
})),
serializerManager.wrapStream,
maxBytesInFlight,
maxReqsInFlight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
env.blockManager,
bypassMergeSortHandle,
mapId,
context.attemptNumber(),
env.conf,
metrics,
shuffleExecutorComponents.writes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val mapOutputWriter = writeSupport.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions)
dep.shuffleId, mapId, dep.partitioner.numPartitions, context.attemptNumber())
val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val location = mapOutputWriter.commitAllPartitions
mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths)
mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths, context.attemptNumber())
}

/** Close this writer, passing along whether the map completed */
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.{DeveloperApi, Experimental}

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -56,6 +56,13 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends Blo
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@Experimental
case class ShuffleBlockAttemptId(shuffleId: Int, mapId: Int, reduceId: Int, attemptNumber: Int)
extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" +
reduceId + "_" + attemptNumber
}

@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
Expand Down
Loading