From 39e4826310e507d1a2fa6fe1bd95bac590608df7 Mon Sep 17 00:00:00 2001 From: Yifei Huang Date: Tue, 4 Jun 2019 17:12:38 -0700 Subject: [PATCH] Revert "[SPARK-25299] Add attempt ID in shuffle API (#570)" This reverts commit a010c6effa973f6a87095a66330b8833f00ce798. --- .../spark/api/shuffle/ShuffleBlockInfo.java | 8 +--- .../api/shuffle/ShuffleWriteSupport.java | 3 +- .../sort/BypassMergeSortShuffleWriter.java | 11 ++--- .../shuffle/sort/UnsafeShuffleWriter.java | 5 +-- .../sort/io/DefaultShuffleWriteSupport.java | 3 +- .../apache/spark/scheduler/MapStatus.scala | 36 +++++---------- .../shuffle/BlockStoreShuffleReader.scala | 5 +-- .../io/DefaultShuffleReadSupport.scala | 9 +--- .../shuffle/sort/SortShuffleManager.scala | 1 - .../shuffle/sort/SortShuffleWriter.scala | 4 +- .../org/apache/spark/storage/BlockId.scala | 9 +--- .../apache/spark/MapOutputTrackerSuite.scala | 44 +++++++++---------- .../DAGSchedulerShufflePluginSuite.scala | 4 +- .../spark/scheduler/DAGSchedulerSuite.scala | 10 ++--- .../spark/scheduler/MapStatusSuite.scala | 12 ++--- .../serializer/KryoSerializerSuite.scala | 2 +- .../BlockStoreShuffleReaderSuite.scala | 5 ++- ...ypassMergeSortShuffleWriterBenchmark.scala | 1 - .../BypassMergeSortShuffleWriterSuite.scala | 5 --- 19 files changed, 65 insertions(+), 112 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java index 4bd8675e21987..45effd206f797 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java @@ -32,16 +32,14 @@ public class ShuffleBlockInfo { private final int mapId; private final int reduceId; private final long length; - private final int attemptNumber; private final Optional shuffleLocation; public ShuffleBlockInfo(int shuffleId, int mapId, int reduceId, long length, - int attemptNumber, Optional shuffleLocation) { + Optional shuffleLocation) { this.shuffleId = shuffleId; this.mapId = mapId; this.reduceId = reduceId; this.length = length; - this.attemptNumber = attemptNumber; this.shuffleLocation = shuffleLocation; } @@ -61,10 +59,6 @@ public long getLength() { return length; } - public int getAttemptNumber() { - return attemptNumber; - } - public Optional getShuffleLocation() { return shuffleLocation; } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java index 137282fa2ae8d..7e2b6cf4133fd 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleWriteSupport.java @@ -32,6 +32,5 @@ public interface ShuffleWriteSupport { ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, int mapId, - int numPartitions, - int attemptNumber) throws IOException; + int numPartitions) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 2ded7d581efef..3e622d00b3aaa 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -88,7 +88,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { private final ShuffleWriteMetricsReporter writeMetrics; private final int shuffleId; private final int mapId; - private final int attemptNumber; private final Serializer serializer; private final ShuffleWriteSupport shuffleWriteSupport; @@ -109,7 +108,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { BlockManager blockManager, BypassMergeSortShuffleHandle handle, int mapId, - int attemptNumber, SparkConf conf, ShuffleWriteMetricsReporter writeMetrics, ShuffleWriteSupport shuffleWriteSupport) { @@ -120,7 +118,6 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { final ShuffleDependency dep = handle.dependency(); this.mapId = mapId; this.shuffleId = dep.shuffleId(); - this.attemptNumber = attemptNumber; this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); this.writeMetrics = writeMetrics; @@ -132,13 +129,12 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { public void write(Iterator> records) throws IOException { assert (partitionWriters == null); ShuffleMapOutputWriter mapOutputWriter = shuffleWriteSupport - .createMapOutputWriter(shuffleId, mapId, numPartitions, attemptNumber); + .createMapOutputWriter(shuffleId, mapId, numPartitions); try { if (!records.hasNext()) { partitionLengths = new long[numPartitions]; Optional location = mapOutputWriter.commitAllPartitions(); - mapStatus = MapStatus$.MODULE$.apply( - Option.apply(location.orNull()), partitionLengths, attemptNumber); + mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); @@ -172,8 +168,7 @@ public void write(Iterator> records) throws IOException { partitionLengths = writePartitionedData(mapOutputWriter); Optional location = mapOutputWriter.commitAllPartitions(); - mapStatus = MapStatus$.MODULE$.apply( - Option.apply(location.orNull()), partitionLengths, attemptNumber); + mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths); } catch (Exception e) { try { mapOutputWriter.abort(e); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index ad514389f4bfc..b2c1b49370f4a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -217,7 +217,7 @@ void closeAndWriteOutput() throws IOException { final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport - .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions(), taskContext.attemptNumber()); + .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); final long[] partitionLengths; Optional location; try { @@ -239,8 +239,7 @@ void closeAndWriteOutput() throws IOException { } throw e; } - mapStatus = MapStatus$.MODULE$.apply( - Option.apply(location.orNull()), partitionLengths, taskContext.attemptNumber()); + mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java index 9aa0fdede55f4..86f1583495689 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java @@ -43,8 +43,7 @@ public DefaultShuffleWriteSupport( public ShuffleMapOutputWriter createMapOutputWriter( int shuffleId, int mapId, - int numPartitions, - int attemptNumber) { + int numPartitions) { return new DefaultShuffleMapOutputWriter( shuffleId, mapId, numPartitions, shuffleServerId, TaskContext.get().taskMetrics().shuffleWriteMetrics(), blockResolver, sparkConf); diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index b04d67e46e174..7ec87641a8900 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -36,8 +36,6 @@ private[spark] sealed trait MapStatus { /** Location where this task was run. */ def location: Option[BlockManagerId] - def attemptNumber: Int - /** * Estimated size for the reduce block, in bytes. * @@ -58,12 +56,11 @@ private[spark] object MapStatus { .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS)) .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get) - def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int) - : MapStatus = { + def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { - HighlyCompressedMapStatus(maybeLoc, uncompressedSizes, attemptNumber) + HighlyCompressedMapStatus(maybeLoc, uncompressedSizes) } else { - new CompressedMapStatus(maybeLoc, uncompressedSizes, attemptNumber) + new CompressedMapStatus(maybeLoc, uncompressedSizes) } } @@ -106,15 +103,13 @@ private[spark] object MapStatus { */ private[spark] class CompressedMapStatus( private[this] var loc: Option[BlockManagerId], - private[this] var compressedSizes: Array[Byte], - private[this] var attemptNum: Int) + private[this] var compressedSizes: Array[Byte]) extends MapStatus with Externalizable { - // For deserialization only - protected def this() = this(null, null.asInstanceOf[Array[Byte]], -1) + protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only - def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int) { - this(loc, uncompressedSizes.map(MapStatus.compressSize), attemptNumber) + def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long]) { + this(loc, uncompressedSizes.map(MapStatus.compressSize)) } override def location: Option[BlockManagerId] = loc @@ -132,7 +127,6 @@ private[spark] class CompressedMapStatus( } out.writeInt(compressedSizes.length) out.write(compressedSizes) - out.writeInt(attemptNum) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -144,10 +138,7 @@ private[spark] class CompressedMapStatus( val len = in.readInt() compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) - attemptNum = in.readInt() } - - override def attemptNumber: Int = attemptNum } /** @@ -166,15 +157,14 @@ private[spark] class HighlyCompressedMapStatus private ( private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, - private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte], - private[this] var attemptNum: Int) + private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte]) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1, null, -1) // For deserialization only + protected def this() = this(null, -1, null, -1, null) // For deserialization only override def location: Option[BlockManagerId] = loc @@ -204,7 +194,6 @@ private[spark] class HighlyCompressedMapStatus private ( out.writeInt(kv._1) out.writeByte(kv._2) } - out.writeInt(attemptNum) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { @@ -224,14 +213,11 @@ private[spark] class HighlyCompressedMapStatus private ( hugeBlockSizesImpl(block) = size } hugeBlockSizes = hugeBlockSizesImpl - attemptNum = in.readInt() } - - override def attemptNumber: Int = attemptNum } private[spark] object HighlyCompressedMapStatus { - def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long], attemptNumber: Int) + def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long]) : HighlyCompressedMapStatus = { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. @@ -273,6 +259,6 @@ private[spark] object HighlyCompressedMapStatus { emptyBlocks.trim() emptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, - hugeBlockSizes, attemptNumber) + hugeBlockSizes) } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 29b1efcc404a1..8d6745ba397d3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.io.DefaultShuffleReadSupport -import org.apache.spark.storage.ShuffleBlockAttemptId +import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter @@ -63,13 +63,12 @@ private[spark] class BlockStoreShuffleReader[K, C]( .getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) .flatMap { shuffleLocationInfo => shuffleLocationInfo._2.map { blockInfo => - val block = blockInfo._1.asInstanceOf[ShuffleBlockAttemptId] + val block = blockInfo._1.asInstanceOf[ShuffleBlockId] new ShuffleBlockInfo( block.shuffleId, block.mapId, block.reduceId, blockInfo._2, - block.attemptNumber, Optional.ofNullable(shuffleLocationInfo._1.orNull)) } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala b/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala index db66eb1066168..928a6f32739fd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala @@ -26,7 +26,7 @@ import org.apache.spark.api.shuffle.{ShuffleBlockInfo, ShuffleReadSupport} import org.apache.spark.internal.config import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.ShuffleReadMetricsReporter -import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockAttemptId, ShuffleBlockFetcherIterator, ShuffleBlockId} +import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator} class DefaultShuffleReadSupport( blockManager: BlockManager, @@ -93,12 +93,7 @@ private class ShuffleBlockFetcherIterable( blockManager.shuffleClient, blockManager, mapOutputTracker.getMapSizesByExecutorId(shuffleId, minReduceId, maxReduceId + 1) - .map(loc => ( - loc._1.get, - loc._2.map { case(shuffleBlockAttemptId, size) => - val block = shuffleBlockAttemptId.asInstanceOf[ShuffleBlockAttemptId] - (ShuffleBlockId(block.shuffleId, block.mapId, block.reduceId), size) - })), + .map(loc => (loc._1.get, loc._2)), serializerManager.wrapStream, maxBytesInFlight, maxReqsInFlight, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 67c8ed5a0acc6..947753f6b40e8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -158,7 +158,6 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager env.blockManager, bypassMergeSortHandle, mapId, - context.attemptNumber(), env.conf, metrics, shuffleExecutorComponents.writes()) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 27f5c4078df2d..3ffafd288125d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C]( // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val mapOutputWriter = writeSupport.createMapOutputWriter( - dep.shuffleId, mapId, dep.partitioner.numPartitions, context.attemptNumber()) + dep.shuffleId, mapId, dep.partitioner.numPartitions) val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) val location = mapOutputWriter.commitAllPartitions - mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths, context.attemptNumber()) + mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths) } /** Close this writer, passing along whether the map completed */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 87deef6593343..7ac2c71c18eb3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import java.util.UUID import org.apache.spark.SparkException -import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.annotation.DeveloperApi /** * :: DeveloperApi :: @@ -56,13 +56,6 @@ case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends Blo override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId } -@Experimental -case class ShuffleBlockAttemptId(shuffleId: Int, mapId: Int, reduceId: Int, attemptNumber: Int) - extends BlockId { - override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + - reduceId + "_" + attemptNumber -} - @DeveloperApi case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId { override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 5df674699eb49..98d999679867f 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -64,9 +64,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(1000L, 10000L), 0)) + Array(1000L, 10000L))) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(10000L, 1000L), 0)) + Array(10000L, 1000L))) val statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.map(status => (status._1.get, status._2)).toSet === Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))), @@ -86,9 +86,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { val compressedSize1000 = MapStatus.compressSize(1000L) val compressedSize10000 = MapStatus.compressSize(10000L) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(compressedSize1000, compressedSize10000), 0)) + Array(compressedSize1000, compressedSize10000))) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(compressedSize10000, compressedSize1000), 0)) + Array(compressedSize10000, compressedSize1000))) assert(tracker.containsShuffle(10)) assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty) assert(0 == tracker.getNumCachedSerializedBroadcast) @@ -109,9 +109,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { val compressedSize1000 = MapStatus.compressSize(1000L) val compressedSize10000 = MapStatus.compressSize(10000L) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(compressedSize1000, compressedSize1000, compressedSize1000), 0)) + Array(compressedSize1000, compressedSize1000, compressedSize1000))) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(compressedSize10000, compressedSize1000, compressedSize1000), 0)) + Array(compressedSize10000, compressedSize1000, compressedSize1000))) assert(0 == tracker.getNumCachedSerializedBroadcast) // As if we had two simultaneous fetch failures @@ -147,7 +147,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, MapStatus( - Some(BlockManagerId("a", "hostA", 1000)), Array(1000L), 0)) + Some(BlockManagerId("a", "hostA", 1000)), Array(1000L))) slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getMapSizesByExecutorId(10, 0) .map(status => (status._1.get, status._2)).toSeq === @@ -185,7 +185,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, MapStatus( - Some(BlockManagerId("88", "mph", 1000)), Array.fill[Long](10)(0), 0)) + Some(BlockManagerId("88", "mph", 1000)), Array.fill[Long](10)(0))) val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) when(rpcCallContext.senderAddress).thenReturn(senderAddress) @@ -219,11 +219,11 @@ class MapOutputTrackerSuite extends SparkFunSuite { // on hostB with output size 3 tracker.registerShuffle(10, 3) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(2L), 0)) + Array(2L))) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(2L), 0)) + Array(2L))) tracker.registerMapOutput(10, 2, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(3L), 0)) + Array(3L))) // When the threshold is 50%, only host A should be returned as a preferred location // as it has 4 out of 7 bytes of output. @@ -263,7 +263,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - Some(BlockManagerId("999", "mps", 1000)), Array.fill[Long](4000000)(0), 0)) + Some(BlockManagerId("999", "mps", 1000)), Array.fill[Long](4000000)(0))) } val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) @@ -312,9 +312,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(size0, size1000, size0, size10000), 0)) + Array(size0, size1000, size0, size10000))) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(size10000, size0, size1000, size0), 0)) + Array(size10000, size0, size1000, size0))) assert(tracker.containsShuffle(10)) assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === Seq( @@ -340,9 +340,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), - Array(1000L, 10000L), 0)) - tracker.registerMapOutput(10, 1, MapStatus(None, Array(10000L, 1000L), 0)) - tracker.registerMapOutput(10, 2, MapStatus(None, Array(1000L, 10000L), 0)) + Array(1000L, 10000L))) + tracker.registerMapOutput(10, 1, MapStatus(None, Array(10000L, 1000L))) + tracker.registerMapOutput(10, 2, MapStatus(None, Array(1000L, 10000L))) var statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === Seq( @@ -356,7 +356,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.removeOutputsOnHost("hostA") tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(1000L, 10000L), 0)) + Array(1000L, 10000L))) statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === Seq( @@ -369,7 +369,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.unregisterMapOutput(10, 1, null) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(1000L, 10000L), 0)) + Array(1000L, 10000L))) statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === Seq( @@ -395,9 +395,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId(null, "hostA", 1000)), - Array(1000L, 10000L), 0)) + Array(1000L, 10000L))) tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId(null, "hostB", 1000)), - Array(10000L, 1000L), 0)) + Array(10000L, 1000L))) var statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === Seq( @@ -420,7 +420,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.unregisterMapOutput(10, 1, BlockManagerId(null, "hostA", 1000)) tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), - Array(1000L, 10000L), 0)) + Array(1000L, 10000L))) statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === Seq( diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala index 98ea8f7a76c28..d0f32fbfd67a3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala @@ -155,11 +155,11 @@ class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite { } def makeMapStatus(execId: String, host: String): MapStatus = { - MapStatus(Some(BlockManagerId(execId, host, 1234)), Array.fill[Long](2)(2), 0) + MapStatus(Some(BlockManagerId(execId, host, 1234)), Array.fill[Long](2)(2)) } def makeEmptyMapStatus(): MapStatus = { - MapStatus(None, Array.fill[Long](2)(2), 0) + MapStatus(None, Array.fill[Long](2)(2)) } def assertMapShuffleLocations(shuffleId: Int, set: Seq[MapStatus]): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f47b0bdf49834..7d18e9cbb6f36 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -445,17 +445,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // map stage1 completes successfully, with one task on each executor complete(taskSets(0), Seq( (Success, - MapStatus(Some(BlockManagerId("exec-hostA1", "hostA", 12345)), Array.fill[Long](1)(2), 0)), + MapStatus(Some(BlockManagerId("exec-hostA1", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, - MapStatus(Some(BlockManagerId("exec-hostA2", "hostA", 12345)), Array.fill[Long](1)(2), 0)), + MapStatus(Some(BlockManagerId("exec-hostA2", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, makeMapStatus("hostB", 1)) )) // map stage2 completes successfully, with one task on each executor complete(taskSets(1), Seq( (Success, - MapStatus(Some(BlockManagerId("exec-hostA1", "hostA", 12345)), Array.fill[Long](1)(2), 0)), + MapStatus(Some(BlockManagerId("exec-hostA1", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, - MapStatus(Some(BlockManagerId("exec-hostA2", "hostA", 12345)), Array.fill[Long](1)(2), 0)), + MapStatus(Some(BlockManagerId("exec-hostA2", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, makeMapStatus("hostB", 1)) )) // make sure our test setup is correct @@ -2904,7 +2904,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi object DAGSchedulerSuite { def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = - MapStatus(Some(makeBlockManagerId(host)), Array.fill[Long](reduces)(sizes), 0) + MapStatus(Some(makeBlockManagerId(host)), Array.fill[Long](reduces)(sizes)) def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 8f4e31d8714e7..0140bb5d25687 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -61,7 +61,7 @@ class MapStatusSuite extends SparkFunSuite { stddev <- Seq(0.0, 0.01, 0.5, 1.0) ) { val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) - val status = MapStatus(Some(BlockManagerId("a", "b", 10)), sizes, 0) + val status = MapStatus(Some(BlockManagerId("a", "b", 10)), sizes) val status1 = compressAndDecompressMapStatus(status) for (i <- 0 until numSizes) { if (sizes(i) != 0) { @@ -75,7 +75,7 @@ class MapStatusSuite extends SparkFunSuite { test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { val sizes = Array.fill[Long](2001)(150L) - val status = MapStatus(null, sizes, 0) + val status = MapStatus(null, sizes) assert(status.isInstanceOf[HighlyCompressedMapStatus]) assert(status.getSizeForBlock(10) === 150L) assert(status.getSizeForBlock(50) === 150L) @@ -87,7 +87,7 @@ class MapStatusSuite extends SparkFunSuite { val sizes = Array.tabulate[Long](3000) { i => i.toLong } val avg = sizes.sum / sizes.count(_ != 0) val loc = BlockManagerId("a", "b", 10) - val status = MapStatus(Some(loc), sizes, 0) + val status = MapStatus(Some(loc), sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) assert(status1.location.get == loc) @@ -109,7 +109,7 @@ class MapStatusSuite extends SparkFunSuite { val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold) val avg = smallBlockSizes.sum / smallBlockSizes.length val loc = BlockManagerId("a", "b", 10) - val status = MapStatus(Some(loc), sizes, 0) + val status = MapStatus(Some(loc), sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) assert(status1.location.get == loc) @@ -165,7 +165,7 @@ class MapStatusSuite extends SparkFunSuite { SparkEnv.set(env) // Value of element in sizes is equal to the corresponding index. val sizes = (0L to 2000L).toArray - val status1 = MapStatus(Some(BlockManagerId("exec-0", "host-0", 100)), sizes, 0) + val status1 = MapStatus(Some(BlockManagerId("exec-0", "host-0", 100)), sizes) val arrayStream = new ByteArrayOutputStream(102400) val objectOutputStream = new ObjectOutputStream(arrayStream) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) @@ -192,7 +192,7 @@ class MapStatusSuite extends SparkFunSuite { test("Location can be empty") { val sizes = (0L to 3000L).toArray - val status = MapStatus(None, sizes, 0) + val status = MapStatus(None, sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) assert(status1.location.isEmpty) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index ac83a47ae5f1e..cd538719d536b 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -352,7 +352,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes => ser.serialize(HighlyCompressedMapStatus( - Some(BlockManagerId("exec-1", "host", 1234)), blockSizes, 0)) + Some(BlockManagerId("exec-1", "host", 1234)), blockSizes)) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 64f8cbc970d54..343049ff2e0c4 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -30,7 +30,8 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.io.DefaultShuffleReadSupport -import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, ShuffleBlockAttemptId, ShuffleBlockId} +import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId} +import org.apache.spark.storage.BlockId /** * Wrapper for a managed buffer that keeps track of how many times retain and release are called. @@ -116,7 +117,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => - val shuffleBlockId = ShuffleBlockAttemptId(shuffleId, mapId, reduceId, 0) + val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) (shuffleBlockId, byteOutputStream.size().toLong) } Seq((Some(localBlockManagerId), shuffleBlockIdsAndSizes)).toIterator diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala index 51205babe8319..72e71915c79f5 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala @@ -55,7 +55,6 @@ object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase blockManager, shuffleHandle, 0, - taskContext.attemptNumber(), conf, taskContext.taskMetrics().shuffleWriteMetrics, shuffleWriteSupport diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 55085e2946d50..e0c49f04dc309 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -160,7 +160,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId - taskContext.attemptNumber(), conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -186,7 +185,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId - taskContext.attemptNumber(), transferConf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -211,7 +209,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId - taskContext.attemptNumber(), conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -247,7 +244,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId - taskContext.attemptNumber(), conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport @@ -270,7 +266,6 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte blockManager, shuffleHandle, 0, // MapId - taskContext.attemptNumber(), conf, taskContext.taskMetrics().shuffleWriteMetrics, writeSupport