Skip to content

Commit

Permalink
Revert "[SPARK-25299] Add attempt ID in shuffle API (#570)"
Browse files Browse the repository at this point in the history
This reverts commit a010c6e.
  • Loading branch information
yifeih committed Jun 5, 2019
1 parent a010c6e commit 39e4826
Show file tree
Hide file tree
Showing 19 changed files with 65 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ public class ShuffleBlockInfo {
private final int mapId;
private final int reduceId;
private final long length;
private final int attemptNumber;
private final Optional<BlockManagerId> shuffleLocation;

public ShuffleBlockInfo(int shuffleId, int mapId, int reduceId, long length,
int attemptNumber, Optional<BlockManagerId> shuffleLocation) {
Optional<BlockManagerId> shuffleLocation) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.reduceId = reduceId;
this.length = length;
this.attemptNumber = attemptNumber;
this.shuffleLocation = shuffleLocation;
}

Expand All @@ -61,10 +59,6 @@ public long getLength() {
return length;
}

public int getAttemptNumber() {
return attemptNumber;
}

public Optional<BlockManagerId> getShuffleLocation() {
return shuffleLocation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ public interface ShuffleWriteSupport {
ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions,
int attemptNumber) throws IOException;
int numPartitions) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final ShuffleWriteMetricsReporter writeMetrics;
private final int shuffleId;
private final int mapId;
private final int attemptNumber;
private final Serializer serializer;
private final ShuffleWriteSupport shuffleWriteSupport;

Expand All @@ -109,7 +108,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
BlockManager blockManager,
BypassMergeSortShuffleHandle<K, V> handle,
int mapId,
int attemptNumber,
SparkConf conf,
ShuffleWriteMetricsReporter writeMetrics,
ShuffleWriteSupport shuffleWriteSupport) {
Expand All @@ -120,7 +118,6 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final ShuffleDependency<K, V, V> dep = handle.dependency();
this.mapId = mapId;
this.shuffleId = dep.shuffleId();
this.attemptNumber = attemptNumber;
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = writeMetrics;
Expand All @@ -132,13 +129,12 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, numPartitions, attemptNumber);
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
Optional<BlockManagerId> location = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
Option.apply(location.orNull()), partitionLengths, attemptNumber);
mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -172,8 +168,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {

partitionLengths = writePartitionedData(mapOutputWriter);
Optional<BlockManagerId> location = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
Option.apply(location.orNull()), partitionLengths, attemptNumber);
mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ void closeAndWriteOutput() throws IOException {
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions(), taskContext.attemptNumber());
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
final long[] partitionLengths;
Optional<BlockManagerId> location;
try {
Expand All @@ -239,8 +239,7 @@ void closeAndWriteOutput() throws IOException {
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(
Option.apply(location.orNull()), partitionLengths, taskContext.attemptNumber());
mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public DefaultShuffleWriteSupport(
public ShuffleMapOutputWriter createMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions,
int attemptNumber) {
int numPartitions) {
return new DefaultShuffleMapOutputWriter(
shuffleId, mapId, numPartitions, shuffleServerId,
TaskContext.get().taskMetrics().shuffleWriteMetrics(), blockResolver, sparkConf);
Expand Down
36 changes: 11 additions & 25 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: Option[BlockManagerId]

def attemptNumber: Int

/**
* Estimated size for the reduce block, in bytes.
*
Expand All @@ -58,12 +56,11 @@ private[spark] object MapStatus {
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int)
: MapStatus = {
def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(maybeLoc, uncompressedSizes, attemptNumber)
HighlyCompressedMapStatus(maybeLoc, uncompressedSizes)
} else {
new CompressedMapStatus(maybeLoc, uncompressedSizes, attemptNumber)
new CompressedMapStatus(maybeLoc, uncompressedSizes)
}
}

Expand Down Expand Up @@ -106,15 +103,13 @@ private[spark] object MapStatus {
*/
private[spark] class CompressedMapStatus(
private[this] var loc: Option[BlockManagerId],
private[this] var compressedSizes: Array[Byte],
private[this] var attemptNum: Int)
private[this] var compressedSizes: Array[Byte])
extends MapStatus with Externalizable {

// For deserialization only
protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1)
protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only

def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int) {
this(loc, uncompressedSizes.map(MapStatus.compressSize), attemptNumber)
def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long]) {
this(loc, uncompressedSizes.map(MapStatus.compressSize))
}

override def location: Option[BlockManagerId] = loc
Expand All @@ -132,7 +127,6 @@ private[spark] class CompressedMapStatus(
}
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
out.writeInt(attemptNum)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -144,10 +138,7 @@ private[spark] class CompressedMapStatus(
val len = in.readInt()
compressedSizes = new Array[Byte](len)
in.readFully(compressedSizes)
attemptNum = in.readInt()
}

override def attemptNumber: Int = attemptNum
}

/**
Expand All @@ -166,15 +157,14 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte],
private[this] var attemptNum: Int)
private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte])
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only
protected def this() = this(null, -1, null, -1, null) // For deserialization only

override def location: Option[BlockManagerId] = loc

Expand Down Expand Up @@ -204,7 +194,6 @@ private[spark] class HighlyCompressedMapStatus private (
out.writeInt(kv._1)
out.writeByte(kv._2)
}
out.writeInt(attemptNum)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
Expand All @@ -224,14 +213,11 @@ private[spark] class HighlyCompressedMapStatus private (
hugeBlockSizesImpl(block) = size
}
hugeBlockSizes = hugeBlockSizesImpl
attemptNum = in.readInt()
}

override def attemptNumber: Int = attemptNum
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int)
def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long])
: HighlyCompressedMapStatus = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
Expand Down Expand Up @@ -273,6 +259,6 @@ private[spark] object HighlyCompressedMapStatus {
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizes, attemptNumber)
hugeBlockSizes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.io.DefaultShuffleReadSupport
import org.apache.spark.storage.ShuffleBlockAttemptId
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.ExternalSorter

Expand Down Expand Up @@ -63,13 +63,12 @@ private[spark] class BlockStoreShuffleReader[K, C](
.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition)
.flatMap { shuffleLocationInfo =>
shuffleLocationInfo._2.map { blockInfo =>
val block = blockInfo._1.asInstanceOf[ShuffleBlockAttemptId]
val block = blockInfo._1.asInstanceOf[ShuffleBlockId]
new ShuffleBlockInfo(
block.shuffleId,
block.mapId,
block.reduceId,
blockInfo._2,
block.attemptNumber,
Optional.ofNullable(shuffleLocationInfo._1.orNull))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.api.shuffle.{ShuffleBlockInfo, ShuffleReadSupport}
import org.apache.spark.internal.config
import org.apache.spark.serializer.SerializerManager
import org.apache.spark.shuffle.ShuffleReadMetricsReporter
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockAttemptId, ShuffleBlockFetcherIterator, ShuffleBlockId}
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}

class DefaultShuffleReadSupport(
blockManager: BlockManager,
Expand Down Expand Up @@ -93,12 +93,7 @@ private class ShuffleBlockFetcherIterable(
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(shuffleId, minReduceId, maxReduceId + 1)
.map(loc => (
loc._1.get,
loc._2.map { case(shuffleBlockAttemptId, size) =>
val block = shuffleBlockAttemptId.asInstanceOf[ShuffleBlockAttemptId]
(ShuffleBlockId(block.shuffleId, block.mapId, block.reduceId), size)
})),
.map(loc => (loc._1.get, loc._2)),
serializerManager.wrapStream,
maxBytesInFlight,
maxReqsInFlight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
env.blockManager,
bypassMergeSortHandle,
mapId,
context.attemptNumber(),
env.conf,
metrics,
shuffleExecutorComponents.writes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val mapOutputWriter = writeSupport.createMapOutputWriter(
dep.shuffleId, mapId, dep.partitioner.numPartitions, context.attemptNumber())
dep.shuffleId, mapId, dep.partitioner.numPartitions)
val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
val location = mapOutputWriter.commitAllPartitions
mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths, context.attemptNumber())
mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths)
}

/** Close this writer, passing along whether the map completed */
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -56,13 +56,6 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends Blo
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@Experimental
case class ShuffleBlockAttemptId(shuffleId: Int, mapId: Int, reduceId: Int, attemptNumber: Int)
extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" +
reduceId + "_" + attemptNumber
}

@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
Expand Down
Loading

0 comments on commit 39e4826

Please sign in to comment.