diff --git a/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java b/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java deleted file mode 100644 index b0aed4d08d387..0000000000000 --- a/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.api.shuffle; - -import org.apache.spark.annotation.Experimental; - -import java.io.Serializable; - -/** - * Represents metadata about where shuffle blocks were written in a single map task. - *

- * This is optionally returned by shuffle writers. The inner shuffle locations may - * be accessed by shuffle readers. Shuffle locations are only necessary when the - * location of shuffle blocks needs to be managed by the driver; shuffle plugins - * may choose to use an external database or other metadata management systems to - * track the locations of shuffle blocks instead. - */ -@Experimental -public interface MapShuffleLocations extends Serializable { - - /** - * Get the location for a given shuffle block written by this map task. - */ - ShuffleLocation getLocationForBlock(int reduceId); -} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java index a312831cb6282..45effd206f797 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleBlockInfo.java @@ -18,6 +18,7 @@ package org.apache.spark.api.shuffle; import org.apache.spark.api.java.Optional; +import org.apache.spark.storage.BlockManagerId; import java.util.Objects; @@ -31,10 +32,10 @@ public class ShuffleBlockInfo { private final int mapId; private final int reduceId; private final long length; - private final Optional shuffleLocation; + private final Optional shuffleLocation; public ShuffleBlockInfo(int shuffleId, int mapId, int reduceId, long length, - Optional shuffleLocation) { + Optional shuffleLocation) { this.shuffleId = shuffleId; this.mapId = mapId; this.reduceId = reduceId; @@ -58,7 +59,7 @@ public long getLength() { return length; } - public Optional getShuffleLocation() { + public Optional getShuffleLocation() { return shuffleLocation; } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java index 6a0ec8d44fd4f..04986ad7f04f4 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleDriverComponents.java @@ -30,4 +30,8 @@ public interface ShuffleDriverComponents { void cleanupApplication() throws IOException; void removeShuffleData(int shuffleId, boolean blocking) throws IOException; + + default boolean shouldUnregisterOutputOnHostOnFetchFailure() { + return false; + } } diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java deleted file mode 100644 index d06c11b3c01ee..0000000000000 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.shuffle; - -/** - * Marker interface representing a location of a shuffle block. Implementations of shuffle readers - * and writers are expected to cast this down to an implementation-specific representation. - */ -public interface ShuffleLocation {} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java index 062cf4ff0fba9..025fc096faaad 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java @@ -21,6 +21,7 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.api.java.Optional; +import org.apache.spark.storage.BlockManagerId; /** * :: Experimental :: @@ -32,7 +33,7 @@ public interface ShuffleMapOutputWriter { ShufflePartitionWriter getPartitionWriter(int partitionId) throws IOException; - Optional commitAllPartitions() throws IOException; + Optional commitAllPartitions() throws IOException; void abort(Throwable error) throws IOException; } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 128b90429209e..3e622d00b3aaa 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -25,6 +25,7 @@ import java.nio.channels.FileChannel; import javax.annotation.Nullable; +import org.apache.spark.api.java.Optional; import scala.None$; import scala.Option; import scala.Product2; @@ -39,8 +40,6 @@ import org.apache.spark.Partitioner; import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.shuffle.MapShuffleLocations; import org.apache.spark.api.shuffle.SupportsTransferTo; import org.apache.spark.api.shuffle.ShuffleMapOutputWriter; import org.apache.spark.api.shuffle.ShufflePartitionWriter; @@ -134,11 +133,8 @@ public void write(Iterator> records) throws IOException { try { if (!records.hasNext()) { partitionLengths = new long[numPartitions]; - Optional blockLocs = mapOutputWriter.commitAllPartitions(); - mapStatus = MapStatus$.MODULE$.apply( - blockManager.shuffleServerId(), - blockLocs.orNull(), - partitionLengths); + Optional location = mapOutputWriter.commitAllPartitions(); + mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); @@ -171,11 +167,8 @@ public void write(Iterator> records) throws IOException { } partitionLengths = writePartitionedData(mapOutputWriter); - Optional mapLocations = mapOutputWriter.commitAllPartitions(); - mapStatus = MapStatus$.MODULE$.apply( - blockManager.shuffleServerId(), - mapLocations.orNull(), - partitionLengths); + Optional location = mapOutputWriter.commitAllPartitions(); + mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths); } catch (Exception e) { try { mapOutputWriter.abort(e); diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java deleted file mode 100644 index ffd97c0f26605..0000000000000 --- a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.sort; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - -import org.apache.spark.api.shuffle.MapShuffleLocations; -import org.apache.spark.api.shuffle.ShuffleLocation; -import org.apache.spark.storage.BlockManagerId; - -import java.util.Objects; - -public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation { - - /** - * We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be - * feasible. - */ - private static final LoadingCache - DEFAULT_SHUFFLE_LOCATIONS_CACHE = - CacheBuilder.newBuilder() - .maximumSize(BlockManagerId.blockManagerIdCacheSize()) - .build(new CacheLoader() { - @Override - public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) { - return new DefaultMapShuffleLocations(blockManagerId); - } - }); - - private final BlockManagerId location; - - public DefaultMapShuffleLocations(BlockManagerId blockManagerId) { - this.location = blockManagerId; - } - - public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) { - return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId); - } - - @Override - public ShuffleLocation getLocationForBlock(int reduceId) { - return this; - } - - public BlockManagerId getBlockManagerId() { - return location; - } - - @Override - public boolean equals(Object other) { - return other instanceof DefaultMapShuffleLocations - && Objects.equals(((DefaultMapShuffleLocations) other).location, location); - } - - @Override - public int hashCode() { - return Objects.hashCode(location); - } -} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index f147bd79773e1..b2c1b49370f4a 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -23,6 +23,8 @@ import java.nio.channels.FileChannel; import java.util.Iterator; +import org.apache.spark.api.java.Optional; +import org.apache.spark.storage.BlockManagerId; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -37,8 +39,6 @@ import org.apache.spark.*; import org.apache.spark.annotation.Private; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.shuffle.MapShuffleLocations; import org.apache.spark.api.shuffle.TransferrableWritableByteChannel; import org.apache.spark.api.shuffle.ShuffleMapOutputWriter; import org.apache.spark.api.shuffle.ShufflePartitionWriter; @@ -219,7 +219,7 @@ void closeAndWriteOutput() throws IOException { final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); final long[] partitionLengths; - Optional mapLocations; + Optional location; try { try { partitionLengths = mergeSpills(spills, mapWriter); @@ -230,7 +230,7 @@ void closeAndWriteOutput() throws IOException { } } } - mapLocations = mapWriter.commitAllPartitions(); + location = mapWriter.commitAllPartitions(); } catch (Exception e) { try { mapWriter.abort(e); @@ -239,10 +239,7 @@ void closeAndWriteOutput() throws IOException { } throw e; } - mapStatus = MapStatus$.MODULE$.apply( - blockManager.shuffleServerId(), - mapLocations.orNull(), - partitionLengths); + mapStatus = MapStatus$.MODULE$.apply(Option.apply(location.orNull()), partitionLengths); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index e83db4e4bcef6..ad55b3db377f6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -24,21 +24,19 @@ import java.io.OutputStream; import java.nio.channels.FileChannel; +import org.apache.spark.api.java.Optional; +import org.apache.spark.storage.BlockManagerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.shuffle.MapShuffleLocations; import org.apache.spark.api.shuffle.ShuffleMapOutputWriter; import org.apache.spark.api.shuffle.ShufflePartitionWriter; import org.apache.spark.api.shuffle.SupportsTransferTo; import org.apache.spark.api.shuffle.TransferrableWritableByteChannel; import org.apache.spark.internal.config.package$; -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations; import org.apache.spark.shuffle.sort.DefaultTransferrableWritableByteChannel; import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; -import org.apache.spark.storage.BlockManagerId; import org.apache.spark.shuffle.IndexShuffleBlockResolver; import org.apache.spark.storage.TimeTrackingOutputStream; import org.apache.spark.util.Utils; @@ -104,11 +102,11 @@ public ShufflePartitionWriter getPartitionWriter(int partitionId) throws IOExcep } @Override - public Optional commitAllPartitions() throws IOException { + public Optional commitAllPartitions() throws IOException { cleanUp(); File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null; blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp); - return Optional.of(DefaultMapShuffleLocations.get(shuffleServerId)); + return Optional.of(shuffleServerId); } @Override diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java index a3eddc8ec930e..e70369909a8f0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/lifecycle/DefaultShuffleDriverComponents.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.spark.SparkEnv; import org.apache.spark.api.shuffle.ShuffleDriverComponents; +import org.apache.spark.internal.config.package$; import org.apache.spark.storage.BlockManagerMaster; import java.io.IOException; @@ -28,10 +29,15 @@ public class DefaultShuffleDriverComponents implements ShuffleDriverComponents { private BlockManagerMaster blockManagerMaster; + private boolean shouldUnregisterOutputOnHostOnFetchFailure; @Override public Map initializeApplication() { blockManagerMaster = SparkEnv.get().blockManager().master(); + this.shouldUnregisterOutputOnHostOnFetchFailure = + SparkEnv.get().blockManager().externalShuffleServiceEnabled() + && (boolean) SparkEnv.get().conf() + .get(package$.MODULE$.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE()); return ImmutableMap.of(); } @@ -46,6 +52,11 @@ public void removeShuffleData(int shuffleId, boolean blocking) throws IOExceptio blockManagerMaster.removeShuffle(shuffleId, blocking); } + @Override + public boolean shouldUnregisterOutputOnHostOnFetchFailure() { + return shouldUnregisterOutputOnHostOnFetchFailure; + } + private void checkInitialized() { if (blockManagerMaster == null) { throw new IllegalStateException("Driver components must be initialized before using"); diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ebddf5ff6f6e0..bc2399ea27fea 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -28,7 +28,6 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal -import org.apache.spark.api.shuffle.{MapShuffleLocations, ShuffleLocation} import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -103,7 +102,7 @@ private class ShuffleStatus(numPartitions: Int) { * different block manager. */ def removeMapOutput(mapId: Int, bmAddress: BlockManagerId): Unit = synchronized { - if (mapStatuses(mapId) != null && mapStatuses(mapId).location == bmAddress) { + if (mapStatuses(mapId) != null && mapStatuses(mapId).location.orNull == bmAddress) { _numAvailableOutputs -= 1 mapStatuses(mapId) = null invalidateSerializedMapOutputStatusCache() @@ -133,7 +132,8 @@ private class ShuffleStatus(numPartitions: Int) { */ def removeOutputsByFilter(f: (BlockManagerId) => Boolean): Unit = synchronized { for (mapId <- 0 until mapStatuses.length) { - if (mapStatuses(mapId) != null && f(mapStatuses(mapId).location)) { + if (mapStatuses(mapId) != null && mapStatuses(mapId).location.isDefined + && f(mapStatuses(mapId).location.get)) { _numAvailableOutputs -= 1 mapStatuses(mapId) = null invalidateSerializedMapOutputStatusCache() @@ -282,9 +282,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } // For testing - def getMapSizesByShuffleLocation(shuffleId: Int, reduceId: Int) - : Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = { - getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1) + def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) + : Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])] = { + getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) } /** @@ -296,8 +296,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * and the second item is a sequence of (shuffle block id, shuffle block size) tuples * describing the shuffle blocks that are stored at that block manager. */ - def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] + def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + : Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])] /** * Deletes map output status information for the specified shuffle stage. @@ -579,7 +579,7 @@ private[spark] class MapOutputTrackerMaster( /** * Return a list of locations that each have fraction of map output greater than the specified - * threshold. + * threshold. Ignores shuffle blocks without location or executor id. * * @param shuffleId id of the shuffle * @param reducerId id of the reduce task @@ -608,10 +608,12 @@ private[spark] class MapOutputTrackerMaster( // array with null entries for each output, and registerMapOutputs, which populates it // with valid status entries. This is possible if one thread schedules a job which // depends on an RDD which is currently being computed by another thread. - if (status != null) { + // This also ignores locations that are not on executors. + if (status != null && status.location.isDefined + && status.location.get.executorId != null) { val blockSize = status.getSizeForBlock(reducerId) if (blockSize > 0) { - locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize + locs(status.location.get) = locs.getOrElse(status.location.get, 0L) + blockSize totalOutputSize += blockSize } } @@ -646,8 +648,8 @@ private[spark] class MapOutputTrackerMaster( // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. - def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = { + def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + : Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => @@ -683,13 +685,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr private val fetching = new HashSet[Int] // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. - override def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = { + override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + : Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) try { - MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses) + MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -873,9 +874,9 @@ private[spark] object MapOutputTracker extends Logging { shuffleId: Int, startPartition: Int, endPartition: Int, - statuses: Array[MapStatus]): Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = { + statuses: Array[MapStatus]): Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])] = { assert (statuses != null) - val splitsByAddress = new HashMap[Option[ShuffleLocation], ListBuffer[(BlockId, Long)]] + val splitsByAddress = new HashMap[Option[BlockManagerId], ListBuffer[(BlockId, Long)]] for ((status, mapId) <- statuses.iterator.zipWithIndex) { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" @@ -885,14 +886,8 @@ private[spark] object MapOutputTracker extends Logging { for (part <- startPartition until endPartition) { val size = status.getSizeForBlock(part) if (size != 0) { - if (status.mapShuffleLocations == null) { - splitsByAddress.getOrElseUpdate(Option.empty, ListBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, part), size)) - } else { - val shuffleLoc = status.mapShuffleLocations.getLocationForBlock(part) - splitsByAddress.getOrElseUpdate(Option.apply(shuffleLoc), ListBuffer()) += - ((ShuffleBlockId(shuffleId, mapId, part), size)) - } + splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + ((ShuffleBlockId(shuffleId, mapId, part), size)) } } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 999f180193d84..f359022716571 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -308,6 +308,8 @@ class SparkContext(config: SparkConf) extends SafeLogging { _dagScheduler = ds } + private[spark] def shuffleDriverComponents: ShuffleDriverComponents = _shuffleDriverComponents + /** * A unique identifier for the Spark application. * Its format depends on the scheduler implementation. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dd1b2595461fc..06bf23a8592cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -163,6 +163,8 @@ private[spark] class DAGScheduler( private[scheduler] val activeJobs = new HashSet[ActiveJob] + private[scheduler] val shuffleDriverComponents = sc.shuffleDriverComponents + /** * Contains the locations that each RDD's partitions are cached on. This map's keys are RDD ids * and its values are arrays indexed by partition numbers. Each array value is the set of @@ -1434,14 +1436,20 @@ private[spark] class DAGScheduler( val shuffleStage = stage.asInstanceOf[ShuffleMapStage] shuffleStage.pendingPartitions -= task.partitionId val status = event.result.asInstanceOf[MapStatus] - val execId = status.location.executorId - logDebug("ShuffleMapTask finished on " + execId) - if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { - logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") + logDebug(s"ShuffleMapTask finished on ${event.taskInfo.executorId} " + + s"with shuffle files located at ${status.location.getOrElse("N/A")}") + if (status.location.isDefined && status.location.get.executorId != null) { + val execId = status.location.get.executorId + if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { + logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") + } else { + // The epoch of the task is acceptable (i.e., the task was launched after the most + // recent failure we're aware of for the executor), so mark the task's output as + // available. + mapOutputTracker.registerMapOutput( + shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) + } } else { - // The epoch of the task is acceptable (i.e., the task was launched after the most - // recent failure we're aware of for the executor), so mark the task's output as - // available. mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) } @@ -1627,21 +1635,31 @@ private[spark] class DAGScheduler( // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { - val hostToUnregisterOutputs = if (env.blockManager.externalShuffleServiceEnabled && - unRegisterOutputOnHostOnFetchFailure) { - // We had a fetch failure with the external shuffle service, so we - // assume all shuffle data on the node is bad. - Some(bmAddress.host) + if (bmAddress.executorId == null) { + if (shuffleDriverComponents.shouldUnregisterOutputOnHostOnFetchFailure()) { + val currentEpoch = task.epoch + val host = bmAddress.host + logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch)) + mapOutputTracker.removeOutputsOnHost(host) + clearCacheLocs() + } } else { - // Unregister shuffle data just for one executor (we don't have any - // reason to believe shuffle data has been lost for the entire host). - None + val hostToUnregisterOutputs = + if (shuffleDriverComponents.shouldUnregisterOutputOnHostOnFetchFailure()) { + // We had a fetch failure with the external shuffle service, so we + // assume all shuffle data on the node is bad. + Some(bmAddress.host) + } else { + // Unregister shuffle data just for one executor (we don't have any + // reason to believe shuffle data has been lost for the entire host). + None + } + removeExecutorAndUnregisterOutputs( + execId = bmAddress.executorId, + fileLost = true, + hostToUnregisterOutputs = hostToUnregisterOutputs, + maybeEpoch = Some(task.epoch)) } - removeExecutorAndUnregisterOutputs( - execId = bmAddress.executorId, - fileLost = true, - hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index a61f9bd14ef2f..7ec87641a8900 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -24,9 +24,7 @@ import scala.collection.mutable import org.roaringbitmap.RoaringBitmap import org.apache.spark.SparkEnv -import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.internal.config -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -35,17 +33,8 @@ import org.apache.spark.util.Utils * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. */ private[spark] sealed trait MapStatus { - - /** - * Locations where this task stored shuffle blocks. - * - * May be null if the MapOutputTracker is not tracking the location of shuffle blocks, leaving it - * up to the implementation of shuffle plugins to do so. - */ - def mapShuffleLocations: MapShuffleLocations - - /** Location where the task was run. */ - def location: BlockManagerId + /** Location where this task was run. */ + def location: Option[BlockManagerId] /** * Estimated size for the reduce block, in bytes. @@ -67,31 +56,11 @@ private[spark] object MapStatus { .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS)) .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get) - // A temporary concession to the fact that we only expect implementations of shuffle provided by - // Spark to be storing shuffle locations in the driver, meaning we want to introduce as little - // serialization overhead as possible in such default cases. - // - // If more similar cases arise, consider adding a serialization API for these shuffle locations. - private val DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 0 - private val NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 1 - - /** - * Visible for testing. - */ - def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { - apply(loc, DefaultMapShuffleLocations.get(loc), uncompressedSizes) - } - - def apply( - loc: BlockManagerId, - mapShuffleLocs: MapShuffleLocations, - uncompressedSizes: Array[Long]): MapStatus = { + def apply(maybeLoc: Option[BlockManagerId], uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { - HighlyCompressedMapStatus( - loc, mapShuffleLocs, uncompressedSizes) + HighlyCompressedMapStatus(maybeLoc, uncompressedSizes) } else { - new CompressedMapStatus( - loc, mapShuffleLocs, uncompressedSizes) + new CompressedMapStatus(maybeLoc, uncompressedSizes) } } @@ -122,89 +91,50 @@ private[spark] object MapStatus { math.pow(LOG_BASE, compressedSize & 0xFF).toLong } } - - def writeLocations( - loc: BlockManagerId, - mapShuffleLocs: MapShuffleLocations, - out: ObjectOutput): Unit = { - if (mapShuffleLocs != null) { - out.writeBoolean(true) - if (mapShuffleLocs.isInstanceOf[DefaultMapShuffleLocations] - && mapShuffleLocs.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId == loc) { - out.writeByte(MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) - } else { - out.writeByte(MapStatus.NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) - out.writeObject(mapShuffleLocs) - } - } else { - out.writeBoolean(false) - } - loc.writeExternal(out) - } - - def readLocations(in: ObjectInput): (BlockManagerId, MapShuffleLocations) = { - if (in.readBoolean()) { - val locId = in.readByte() - if (locId == MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) { - val blockManagerId = BlockManagerId(in) - (blockManagerId, DefaultMapShuffleLocations.get(blockManagerId)) - } else { - val mapShuffleLocations = in.readObject().asInstanceOf[MapShuffleLocations] - val blockManagerId = BlockManagerId(in) - (blockManagerId, mapShuffleLocations) - } - } else { - val blockManagerId = BlockManagerId(in) - (blockManagerId, null) - } - } } + /** * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is * represented using a single byte. * - * @param loc Location were the task is being executed. - * @param mapShuffleLocs locations where the task stored its shuffle blocks - may be null. + * @param loc location where the task is being executed. * @param compressedSizes size of the blocks, indexed by reduce partition id. */ private[spark] class CompressedMapStatus( - private[this] var loc: BlockManagerId, - private[this] var mapShuffleLocs: MapShuffleLocations, + private[this] var loc: Option[BlockManagerId], private[this] var compressedSizes: Array[Byte]) extends MapStatus with Externalizable { - // For deserialization only - protected def this() = this(null, null, null.asInstanceOf[Array[Byte]]) + protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only - def this( - loc: BlockManagerId, - mapShuffleLocations: MapShuffleLocations, - uncompressedSizes: Array[Long]) { - this( - loc, - mapShuffleLocations, - uncompressedSizes.map(MapStatus.compressSize)) + def this(loc: Option[BlockManagerId], uncompressedSizes: Array[Long]) { + this(loc, uncompressedSizes.map(MapStatus.compressSize)) } - override def location: BlockManagerId = loc - - override def mapShuffleLocations: MapShuffleLocations = mapShuffleLocs + override def location: Option[BlockManagerId] = loc override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - MapStatus.writeLocations(loc, mapShuffleLocs, out) + if (loc.isDefined) { + out.writeBoolean(true) + loc.get.writeExternal(out) + } else { + out.writeBoolean(false) + } out.writeInt(compressedSizes.length) out.write(compressedSizes) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - val (deserializedLoc, deserializedMapShuffleLocs) = MapStatus.readLocations(in) - loc = deserializedLoc - mapShuffleLocs = deserializedMapShuffleLocs + if (in.readBoolean()) { + loc = Some(BlockManagerId(in)) + } else { + loc = None + } val len = in.readInt() compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) @@ -217,15 +147,13 @@ private[spark] class CompressedMapStatus( * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed - * @param mapShuffleLocs location where the task stored shuffle blocks - may be null * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty and non-huge blocks * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( - private[this] var loc: BlockManagerId, - private[this] var mapShuffleLocs: MapShuffleLocations, + private[this] var loc: Option[BlockManagerId], private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, @@ -236,11 +164,9 @@ private[spark] class HighlyCompressedMapStatus private ( require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, null, -1, null, -1, null) // For deserialization only - - override def location: BlockManagerId = loc + protected def this() = this(null, -1, null, -1, null) // For deserialization only - override def mapShuffleLocations: MapShuffleLocations = mapShuffleLocs + override def location: Option[BlockManagerId] = loc override def getSizeForBlock(reduceId: Int): Long = { assert(hugeBlockSizes != null) @@ -255,7 +181,12 @@ private[spark] class HighlyCompressedMapStatus private ( } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - MapStatus.writeLocations(loc, mapShuffleLocs, out) + if (loc.isDefined) { + out.writeBoolean(true) + loc.get.writeExternal(out) + } else { + out.writeBoolean(false) + } emptyBlocks.writeExternal(out) out.writeLong(avgSize) out.writeInt(hugeBlockSizes.size) @@ -266,9 +197,11 @@ private[spark] class HighlyCompressedMapStatus private ( } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - val (deserializedLoc, deserializedMapShuffleLocs) = MapStatus.readLocations(in) - loc = deserializedLoc - mapShuffleLocs = deserializedMapShuffleLocs + if (in.readBoolean()) { + loc = Some(BlockManagerId(in)) + } else { + loc = None + } emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() @@ -284,10 +217,8 @@ private[spark] class HighlyCompressedMapStatus private ( } private[spark] object HighlyCompressedMapStatus { - def apply( - loc: BlockManagerId, - mapShuffleLocs: MapShuffleLocations, - uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + def apply(loc: Option[BlockManagerId], uncompressedSizes: Array[Long]) + : HighlyCompressedMapStatus = { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 @@ -327,12 +258,7 @@ private[spark] object HighlyCompressedMapStatus { } emptyBlocks.trim() emptyBlocks.runOptimize() - new HighlyCompressedMapStatus( - loc, - mapShuffleLocs, - numNonEmptyBlocks, - emptyBlocks, - avgSize, - hugeBlockSizes) + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, + hugeBlockSizes) } } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index ba8c92518f019..2df133dd2b13a 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -31,7 +31,7 @@ import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSe import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} import com.esotericsoftware.kryo.pool.{KryoCallback, KryoFactory, KryoPool} -import com.esotericsoftware.kryo.serializers.{ExternalizableSerializer, JavaSerializer => KryoJavaSerializer} +import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} import org.roaringbitmap.RoaringBitmap @@ -152,8 +152,6 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer()) kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) - kryo.register(classOf[CompressedMapStatus], new ExternalizableSerializer()) - kryo.register(classOf[HighlyCompressedMapStatus], new ExternalizableSerializer()) kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) @@ -487,6 +485,8 @@ private[serializer] object KryoSerializer { private val toRegister: Seq[Class[_]] = Seq( ByteBuffer.allocate(1).getClass, classOf[StorageLevel], + classOf[CompressedMapStatus], + classOf[HighlyCompressedMapStatus], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Boolean]], diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 530c3694ad1ec..8d6745ba397d3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.io.DefaultShuffleReadSupport -import org.apache.spark.storage.{ShuffleBlockFetcherIterator, ShuffleBlockId} +import org.apache.spark.storage.ShuffleBlockId import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter @@ -60,7 +60,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( shuffleReadSupport.getPartitionReaders(new Iterable[ShuffleBlockInfo] { override def iterator: Iterator[ShuffleBlockInfo] = { mapOutputTracker - .getMapSizesByShuffleLocation(handle.shuffleId, startPartition, endPartition) + .getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) .flatMap { shuffleLocationInfo => shuffleLocationInfo._2.map { blockInfo => val block = blockInfo._1.asInstanceOf[ShuffleBlockId] diff --git a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala index 265a8acfa8d61..5518264c15136 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala @@ -68,3 +68,12 @@ private[spark] class MetadataFetchFailedException( reduceId: Int, message: String) extends FetchFailedException(null, shuffleId, -1, reduceId, message) + +private[spark] class RemoteFetchFailedException( + shuffleId: Int, + mapId: Int, + reduceId: Int, + message: String, + host: String, + port: Int) + extends FetchFailedException(BlockManagerId(host, port), shuffleId, mapId, reduceId, message) diff --git a/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala b/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala index 9b9b8508e88aa..928a6f32739fd 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/io/DefaultShuffleReadSupport.scala @@ -26,7 +26,6 @@ import org.apache.spark.api.shuffle.{ShuffleBlockInfo, ShuffleReadSupport} import org.apache.spark.internal.config import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.ShuffleReadMetricsReporter -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator} class DefaultShuffleReadSupport( @@ -93,12 +92,8 @@ private class ShuffleBlockFetcherIterable( context, blockManager.shuffleClient, blockManager, - mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, minReduceId, maxReduceId + 1) - .map { shuffleLocationInfo => - val defaultShuffleLocation = shuffleLocationInfo._1 - .get.asInstanceOf[DefaultMapShuffleLocations] - (defaultShuffleLocation.getBlockManagerId, shuffleLocationInfo._2) - }, + mapOutputTracker.getMapSizesByExecutorId(shuffleId, minReduceId, maxReduceId + 1) + .map(loc => (loc._1.get, loc._2)), serializerManager.wrapStream, maxBytesInFlight, maxReqsInFlight, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 1fcae684b0052..3ffafd288125d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -67,11 +67,8 @@ private[spark] class SortShuffleWriter[K, V, C]( val mapOutputWriter = writeSupport.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions) val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) - val mapLocations = mapOutputWriter.commitAllPartitions() - mapStatus = MapStatus( - blockManager.shuffleServerId, - mapLocations.orNull(), - partitionLengths) + val location = mapOutputWriter.commitAllPartitions + mapStatus = MapStatus(Option.apply(location.orNull), partitionLengths) } /** Close this writer, passing along whether the map completed */ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index d72bd6f9af6bc..8d66cbbfb7562 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -70,7 +70,12 @@ class BlockManagerId private ( } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - out.writeUTF(executorId_) + if (executorId_ != null) { + out.writeBoolean(true) + out.writeUTF(executorId_) + } else { + out.writeBoolean(false) + } out.writeUTF(host_) out.writeInt(port_) out.writeBoolean(topologyInfo_.isDefined) @@ -79,7 +84,9 @@ class BlockManagerId private ( } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - executorId_ = in.readUTF() + if (in.readBoolean()) { + executorId_ = in.readUTF() + } host_ = in.readUTF() port_ = in.readInt() val isTopologyInfoAvailable = in.readBoolean() @@ -91,8 +98,13 @@ class BlockManagerId private ( override def toString: String = s"BlockManagerId($executorId, $host, $port, $topologyInfo)" - override def hashCode: Int = - ((executorId.hashCode * 41 + host.hashCode) * 41 + port) * 41 + topologyInfo.hashCode + override def hashCode: Int = { + if (executorId != null) { + ((executorId.hashCode * 41 + host.hashCode) * 41 + port) * 41 + topologyInfo.hashCode + } else { + (host.hashCode * 41 + port) * 41 + topologyInfo.hashCode + } + } override def equals(that: Any): Boolean = that match { case id: BlockManagerId => @@ -127,20 +139,21 @@ private[spark] object BlockManagerId { topologyInfo: Option[String] = None): BlockManagerId = getCachedBlockManagerId(new BlockManagerId(execId, host, port, topologyInfo)) + def apply(host: String, port: Int): BlockManagerId = + getCachedBlockManagerId(new BlockManagerId(null, host, port, None)) + def apply(in: ObjectInput): BlockManagerId = { val obj = new BlockManagerId() obj.readExternal(in) getCachedBlockManagerId(obj) } - val blockManagerIdCacheSize = 10000 - /** * The max cache size is hardcoded to 10000, since the size of a BlockManagerId * object is about 48B, the total memory cost should be below 1MB which is feasible. */ val blockManagerIdCache = CacheBuilder.newBuilder() - .maximumSize(blockManagerIdCacheSize) + .maximumSize(10000) .build(new CacheLoader[BlockManagerId, BlockManagerId]() { override def load(id: BlockManagerId) = id }) diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 5ea0907277ebf..4c2e6ac6474da 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -172,8 +172,6 @@ public void setUp() throws IOException { when(shuffleDep.serializer()).thenReturn(serializer); when(shuffleDep.partitioner()).thenReturn(hashPartitioner); when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); - when(blockManager.shuffleServerId()).thenReturn(BlockManagerId.apply( - "0", "localhost", 9099, Option.empty())); TaskContext$.MODULE$.setTaskContext(taskContext); } @@ -189,7 +187,8 @@ private UnsafeShuffleWriter createWriter( taskContext, conf, taskContext.taskMetrics().shuffleWriteMetrics(), - new DefaultShuffleWriteSupport(conf, shuffleBlockResolver, blockManager.shuffleServerId())); + new DefaultShuffleWriteSupport(conf, shuffleBlockResolver, blockManager.shuffleServerId()) + ); } private void assertSpillFilesWereCleanedUp() { diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 8fcbc845d1a7b..98d999679867f 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MA import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId} class MapOutputTrackerSuite extends SparkFunSuite { @@ -64,17 +63,14 @@ class MapOutputTrackerSuite extends SparkFunSuite { assert(tracker.containsShuffle(10)) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), Array(1000L, 10000L))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), Array(10000L, 1000L))) - val statuses = tracker.getMapSizesByShuffleLocation(10, 0) - assert(statuses.toSet === - Seq( - (Some(DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000))), - ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))), - (Some(DefaultMapShuffleLocations.get(BlockManagerId("b", "hostB", 1000))), - ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000)))) + val statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.map(status => (status._1.get, status._2)).toSet === + Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))), + (BlockManagerId("b", "hostB", 1000), ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000)))) .toSet) assert(0 == tracker.getNumCachedSerializedBroadcast) tracker.stop() @@ -89,16 +85,16 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerShuffle(10, 2) val compressedSize1000 = MapStatus.compressSize(1000L) val compressedSize10000 = MapStatus.compressSize(10000L) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), Array(compressedSize1000, compressedSize10000))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), Array(compressedSize10000, compressedSize1000))) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByShuffleLocation(10, 0).nonEmpty) + assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty) assert(0 == tracker.getNumCachedSerializedBroadcast) tracker.unregisterShuffle(10) assert(!tracker.containsShuffle(10)) - assert(tracker.getMapSizesByShuffleLocation(10, 0).isEmpty) + assert(tracker.getMapSizesByExecutorId(10, 0).isEmpty) tracker.stop() rpcEnv.shutdown() @@ -112,9 +108,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerShuffle(10, 2) val compressedSize1000 = MapStatus.compressSize(1000L) val compressedSize10000 = MapStatus.compressSize(10000L) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), Array(compressedSize1000, compressedSize1000, compressedSize1000))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), Array(compressedSize10000, compressedSize1000, compressedSize1000))) assert(0 == tracker.getNumCachedSerializedBroadcast) @@ -125,7 +121,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { // The remaining reduce task might try to grab the output despite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. - intercept[FetchFailedException] { tracker.getMapSizesByShuffleLocation(10, 1) } + intercept[FetchFailedException] { tracker.getMapSizesByExecutorId(10, 1) } tracker.stop() rpcEnv.shutdown() @@ -147,26 +143,25 @@ class MapOutputTrackerSuite extends SparkFunSuite { masterTracker.registerShuffle(10, 1) slaveTracker.updateEpoch(masterTracker.getEpoch) // This is expected to fail because no outputs have been registered for the shuffle. - intercept[FetchFailedException] { slaveTracker.getMapSizesByShuffleLocation(10, 0) } + intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) } val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, MapStatus( - BlockManagerId("a", "hostA", 1000), Array(1000L))) + Some(BlockManagerId("a", "hostA", 1000)), Array(1000L))) slaveTracker.updateEpoch(masterTracker.getEpoch) - assert(slaveTracker.getMapSizesByShuffleLocation(10, 0).toSeq === - Seq( - (Some(DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000))), - ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + assert(slaveTracker.getMapSizesByExecutorId(10, 0) + .map(status => (status._1.get, status._2)).toSeq === + Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) assert(0 == masterTracker.getNumCachedSerializedBroadcast) val masterTrackerEpochBeforeLossOfMapOutput = masterTracker.getEpoch masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) assert(masterTracker.getEpoch > masterTrackerEpochBeforeLossOfMapOutput) slaveTracker.updateEpoch(masterTracker.getEpoch) - intercept[FetchFailedException] { slaveTracker.getMapSizesByShuffleLocation(10, 0) } + intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) } // failure should be cached - intercept[FetchFailedException] { slaveTracker.getMapSizesByShuffleLocation(10, 0) } + intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) } assert(0 == masterTracker.getNumCachedSerializedBroadcast) masterTracker.stop() @@ -190,7 +185,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, MapStatus( - BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) + Some(BlockManagerId("88", "mph", 1000)), Array.fill[Long](10)(0))) val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) when(rpcCallContext.senderAddress).thenReturn(senderAddress) @@ -223,11 +218,11 @@ class MapOutputTrackerSuite extends SparkFunSuite { // on hostA with output size 2 // on hostB with output size 3 tracker.registerShuffle(10, 3) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), Array(2L))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), Array(2L))) - tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 2, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), Array(3L))) // When the threshold is 50%, only host A should be returned as a preferred location @@ -267,11 +262,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { // being sent. masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => - val bmId = BlockManagerId("999", "mps", 1000) masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - bmId, - DefaultMapShuffleLocations.get(bmId), - Array.fill[Long](4000000)(0))) + Some(BlockManagerId("999", "mps", 1000)), Array.fill[Long](4000000)(0))) } val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) @@ -319,18 +311,17 @@ class MapOutputTrackerSuite extends SparkFunSuite { val size0 = MapStatus.decompressSize(MapStatus.compressSize(0L)) val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) - tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), Array(size0, size1000, size0, size10000))) - tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), Array(size10000, size0, size1000, size0))) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByShuffleLocation(10, 0, 4) - .map(x => (x._1.get, x._2)).toSeq === + assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === Seq( - (DefaultMapShuffleLocations.get(BlockManagerId("b", "hostB", 1000)), - Seq((ShuffleBlockId(10, 1, 0), size10000), (ShuffleBlockId(10, 1, 2), size1000))), - (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), - Seq((ShuffleBlockId(10, 0, 1), size1000), (ShuffleBlockId(10, 0, 3), size10000))) + (Some(BlockManagerId("b", "hostB", 1000)), + Seq((ShuffleBlockId(10, 1, 0), size10000), (ShuffleBlockId(10, 1, 2), size1000))), + (Some(BlockManagerId("a", "hostA", 1000)), + Seq((ShuffleBlockId(10, 0, 1), size1000), (ShuffleBlockId(10, 0, 3), size10000))) ) ) @@ -339,4 +330,109 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } + test("shuffle map statuses with null blockManagerIds") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + tracker.registerShuffle(10, 3) + assert(tracker.containsShuffle(10)) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("a", "hostA", 1000)), + Array(1000L, 10000L))) + tracker.registerMapOutput(10, 1, MapStatus(None, Array(10000L, 1000L))) + tracker.registerMapOutput(10, 2, MapStatus(None, Array(1000L, 10000L))) + var statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq( + (None, + ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000), + (ShuffleBlockId(10, 2, 0), size1000))), + (Some(BlockManagerId("a", "hostA", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))) + .toSet) + assert(0 == tracker.getNumCachedSerializedBroadcast) + tracker.removeOutputsOnHost("hostA") + + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), + Array(1000L, 10000L))) + statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq( + (None, + ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000), + (ShuffleBlockId(10, 2, 0), size1000))), + (Some(BlockManagerId("b", "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))) + .toSet) + tracker.unregisterMapOutput(10, 1, null) + + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), + Array(1000L, 10000L))) + statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq( + (Some(BlockManagerId("b", "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000), (ShuffleBlockId(10, 1, 0), size1000))), + (None, + ArrayBuffer((ShuffleBlockId(10, 2, 0), size1000)))) + .toSet) + + val outputs = tracker.getLocationsWithLargestOutputs(10, 0, 2, 0.01) + assert(outputs.get.toSeq === Seq(BlockManagerId("b", "hostB", 1000))) + tracker.stop() + rpcEnv.shutdown() + } + + test("shuffle map statuses with null execIds") { + val rpcEnv = createRpcEnv("test") + val tracker = newTrackerMaster() + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf)) + tracker.registerShuffle(10, 2) + assert(tracker.containsShuffle(10)) + val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) + val size10000 = MapStatus.decompressSize(MapStatus.compressSize(10000L)) + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId(null, "hostA", 1000)), + Array(1000L, 10000L))) + tracker.registerMapOutput(10, 1, MapStatus(Some(BlockManagerId(null, "hostB", 1000)), + Array(10000L, 1000L))) + var statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq( + (Some(BlockManagerId(null, "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000))), + (Some(BlockManagerId(null, "hostA", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))) + .toSet) + assert(0 == tracker.getNumCachedSerializedBroadcast) + tracker.removeOutputsOnExecutor("a") + + statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq( + (Some(BlockManagerId(null, "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000))), + (Some(BlockManagerId(null, "hostA", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))) + .toSet) + tracker.unregisterMapOutput(10, 1, BlockManagerId(null, "hostA", 1000)) + + tracker.registerMapOutput(10, 0, MapStatus(Some(BlockManagerId("b", "hostB", 1000)), + Array(1000L, 10000L))) + statuses = tracker.getMapSizesByExecutorId(10, 0) + assert(statuses.toSet === + Seq( + (Some(BlockManagerId(null, "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000))), + (Some(BlockManagerId("b", "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))) + .toSet) + val outputs = tracker.getLocationsWithLargestOutputs(10, 0, 2, 0.01) + assert(outputs.get.toSeq === Seq(BlockManagerId("b", "hostB", 1000))) + tracker.stop() + rpcEnv.shutdown() + } + } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 1d2713151f505..1cd7296e9de53 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -73,7 +73,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // All blocks must have non-zero size (0 until NUM_BLOCKS).foreach { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, id) + val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) assert(statuses.forall(_._2.forall(blockIdSizePair => blockIdSizePair._2 > 0))) } } @@ -112,7 +112,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(c.count === 4) val blockSizes = (0 until NUM_BLOCKS).flatMap { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, id) + val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) statuses.flatMap(_._2.map(_._2)) } val nonEmptyBlocks = blockSizes.filter(x => x > 0) @@ -137,7 +137,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(c.count === 4) val blockSizes = (0 until NUM_BLOCKS).flatMap { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, id) + val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) statuses.flatMap(_._2.map(_._2)) } val nonEmptyBlocks = blockSizes.filter(x => x > 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala new file mode 100644 index 0000000000000..d0f32fbfd67a3 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerShufflePluginSuite.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import java.util + +import org.apache.spark.{FetchFailed, HashPartitioner, ShuffleDependency, SparkConf, Success} +import org.apache.spark.api.shuffle.{ShuffleDataIO, ShuffleDriverComponents, ShuffleExecutorComponents} +import org.apache.spark.internal.config +import org.apache.spark.rdd.RDD +import org.apache.spark.shuffle.sort.io.DefaultShuffleDataIO +import org.apache.spark.storage.BlockManagerId + +class PluginShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO { + val defaultShuffleDataIO = new DefaultShuffleDataIO(sparkConf) + override def driver(): ShuffleDriverComponents = + new PluginShuffleDriverComponents(defaultShuffleDataIO.driver()) + + override def executor(): ShuffleExecutorComponents = defaultShuffleDataIO.executor() +} + +class PluginShuffleDriverComponents(delegate: ShuffleDriverComponents) + extends ShuffleDriverComponents { + override def initializeApplication(): util.Map[String, String] = + delegate.initializeApplication() + + override def cleanupApplication(): Unit = + delegate.cleanupApplication() + + override def removeShuffleData(shuffleId: Int, blocking: Boolean): Unit = + delegate.removeShuffleData(shuffleId, blocking) + + override def shouldUnregisterOutputOnHostOnFetchFailure(): Boolean = true +} + +class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite { + + private def setupTest(): (RDD[_], Int) = { + afterEach() + val conf = new SparkConf() + // unregistering all outputs on a host is enabled for the individual file server case + conf.set(config.SHUFFLE_IO_PLUGIN_CLASS, classOf[PluginShuffleDataIO].getName) + init(conf) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker) + (reduceRdd, shuffleId) + } + + test("Test simple file server") { + val (reduceRdd, shuffleId) = setupTest() + submit(reduceRdd, Array(0, 1)) + + // Perform map task + val mapStatus1 = makeMapStatus(null, "hostA") + val mapStatus2 = makeMapStatus(null, "hostB") + complete(taskSets(0), Seq((Success, mapStatus1), (Success, mapStatus2))) + assertMapShuffleLocations(shuffleId, Seq(mapStatus1, mapStatus2)) + + // perform reduce task + complete(taskSets(1), Seq((Success, 42), (Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty() + } + + test("Test simple file server fetch failure") { + val (reduceRdd, shuffleId) = setupTest() + submit(reduceRdd, Array(0, 1)) + + // Perform map task + val mapStatus1 = makeMapStatus(null, "hostA") + val mapStatus2 = makeMapStatus(null, "hostB") + complete(taskSets(0), Seq((Success, mapStatus1), (Success, mapStatus2))) + + complete(taskSets(1), Seq((Success, 42), + (FetchFailed(BlockManagerId(null, "hostB", 1234), shuffleId, 1, 0, "ignored"), null))) + assertMapShuffleLocations(shuffleId, Seq(mapStatus1, null)) + + scheduler.resubmitFailedStages() + complete(taskSets(2), Seq((Success, mapStatus2))) + + complete(taskSets(3), Seq((Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty() + } + + test("Test simple file fetch server - duplicate host") { + val (reduceRdd, shuffleId) = setupTest() + submit(reduceRdd, Array(0, 1)) + + // Perform map task + val mapStatus1 = makeMapStatus(null, "hostA") + val mapStatus2 = makeMapStatus(null, "hostA") + complete(taskSets(0), Seq((Success, mapStatus1), (Success, mapStatus2))) + + complete(taskSets(1), Seq((Success, 42), + (FetchFailed(BlockManagerId(null, "hostA", 1234), shuffleId, 1, 0, "ignored"), null))) + assertMapShuffleLocations(shuffleId, Seq(null, null)) // removes both + + scheduler.resubmitFailedStages() + complete(taskSets(2), Seq((Success, mapStatus1), (Success, mapStatus2))) + + complete(taskSets(3), Seq((Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty() + } + + test("Test DFS case - empty BlockManagerId") { + val (reduceRdd, shuffleId) = setupTest() + submit(reduceRdd, Array(0, 1)) + + val mapStatus = makeEmptyMapStatus() + complete(taskSets(0), Seq((Success, mapStatus), (Success, mapStatus))) + assertMapShuffleLocations(shuffleId, Seq(mapStatus, mapStatus)) + + // perform reduce task + complete(taskSets(1), Seq((Success, 42), (Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty() + } + + test("Test DFS case - fetch failure") { + val (reduceRdd, shuffleId) = setupTest() + submit(reduceRdd, Array(0, 1)) + + // Perform map task + val mapStatus = makeEmptyMapStatus() + complete(taskSets(0), Seq((Success, mapStatus), (Success, mapStatus))) + + complete(taskSets(1), Seq((Success, 42), + (FetchFailed(null, shuffleId, 1, 0, "ignored"), null))) + assertMapShuffleLocations(shuffleId, Seq(mapStatus, null)) + + scheduler.resubmitFailedStages() + complete(taskSets(2), Seq((Success, mapStatus))) + + complete(taskSets(3), Seq((Success, 43))) + assert(results === Map(0 -> 42, 1 -> 43)) + assertDataStructuresEmpty() + } + + def makeMapStatus(execId: String, host: String): MapStatus = { + MapStatus(Some(BlockManagerId(execId, host, 1234)), Array.fill[Long](2)(2)) + } + + def makeEmptyMapStatus(): MapStatus = { + MapStatus(None, Array.fill[Long](2)(2)) + } + + def assertMapShuffleLocations(shuffleId: Int, set: Seq[MapStatus]): Unit = { + val actualShuffleLocations = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses + assert(set === actualShuffleLocations.toSeq) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 21b4e56c9e801..7d18e9cbb6f36 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -29,14 +29,12 @@ import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.rdd.{DeterministicLevel, RDD} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} @@ -238,7 +236,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi init(new SparkConf()) } - private def init(testConf: SparkConf): Unit = { + def init(testConf: SparkConf): Unit = { sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf) sparkListener.submittedStageInfos.clear() sparkListener.successfulStages.clear() @@ -308,7 +306,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi it.next.asInstanceOf[Tuple2[_, _]]._1 /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ - private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { @@ -334,7 +332,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } /** Submits a job to the scheduler and returns the job id. */ - private def submit( + def submit( rdd: RDD[_], partitions: Array[Int], func: (TaskContext, Iterator[_]) => _ = jobComputeFunc, @@ -447,30 +445,30 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // map stage1 completes successfully, with one task on each executor complete(taskSets(0), Seq( (Success, - MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))), + MapStatus(Some(BlockManagerId("exec-hostA1", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, - MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))), + MapStatus(Some(BlockManagerId("exec-hostA2", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, makeMapStatus("hostB", 1)) )) // map stage2 completes successfully, with one task on each executor complete(taskSets(1), Seq( (Success, - MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2))), + MapStatus(Some(BlockManagerId("exec-hostA1", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, - MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2))), + MapStatus(Some(BlockManagerId("exec-hostA2", "hostA", 12345)), Array.fill[Long](1)(2))), (Success, makeMapStatus("hostB", 1)) )) // make sure our test setup is correct val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus1.count(_ != null) === 3) - assert(initialMapStatus1.map{_.location.executorId}.toSet === + assert(initialMapStatus1.map{_.location.get.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus2.count(_ != null) === 3) - assert(initialMapStatus2.map{_.location.executorId}.toSet === + assert(initialMapStatus2.map{_.location.get.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) // reduce stage fails with a fetch failure from one host @@ -484,13 +482,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses assert(mapStatus1.count(_ != null) === 1) - assert(mapStatus1(2).location.executorId === "exec-hostB") - assert(mapStatus1(2).location.host === "hostB") + assert(mapStatus1(2).location.get.executorId === "exec-hostB") + assert(mapStatus1(2).location.get.host === "hostB") val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses assert(mapStatus2.count(_ != null) === 1) - assert(mapStatus2(2).location.executorId === "exec-hostB") - assert(mapStatus2(2).location.host === "hostB") + assert(mapStatus2(2).location.get.executorId === "exec-hostB") + assert(mapStatus2(2).location.get.host === "hostB") } test("zero split job") { @@ -702,8 +700,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostA"), makeMaybeShuffleLocation("hostB"))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) assertDataStructuresEmpty() @@ -729,10 +727,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the 2nd attempt pass complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) // we can see both result blocks now - assert(mapOutputTracker - .getMapSizesByShuffleLocation(shuffleId, 0) - .map(_._1.get.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) - .toSet === HashSet("hostA", "hostB")) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get.host).toSet === + HashSet("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() @@ -770,11 +766,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(ExecutorLost("exec-hostA", event)) if (expectFileLoss) { intercept[MetadataFetchFailedException] { - mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0) + mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) } } else { - assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostA"), makeMaybeShuffleLocation("hostB"))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) } } } @@ -1067,10 +1063,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) // The MapOutputTracker should know about both map output locations. - assert(mapOutputTracker - .getMapSizesByShuffleLocation(shuffleId, 0) - .map(_._1.get.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) - .toSet === HashSet("hostA", "hostB")) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get.host).toSet === + HashSet("hostA", "hostB")) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(makeCompletionEvent( @@ -1199,14 +1193,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) // The MapOutputTracker should know about both map output locations. - assert(mapOutputTracker - .getMapSizesByShuffleLocation(shuffleId, 0) - .map(_._1.get.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) - .toSet === HashSet("hostA", "hostB")) - assert(mapOutputTracker - .getMapSizesByShuffleLocation(shuffleId, 1) - .map(_._1.get.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) - .toSet === HashSet("hostA", "hostB")) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get.host).toSet === + HashSet("hostA", "hostB")) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 1).map(_._1.get.host).toSet === + HashSet("hostA", "hostB")) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(makeCompletionEvent( @@ -1396,8 +1386,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi Success, makeMapStatus("hostA", reduceRdd.partitions.size))) assert(shuffleStage.numAvailableOutputs === 2) - assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostB"), makeMaybeShuffleLocation("hostA"))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) // finish the next stage normally, which completes the job complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -1551,7 +1541,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi reduceIdx <- reduceIdxs } { // this would throw an exception if the map status hadn't been registered - val statuses = mapOutputTracker.getMapSizesByShuffleLocation(stage, reduceIdx) + val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) @@ -1603,7 +1593,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // check that we have all the map output for stage 0 (0 until reduceRdd.partitions.length).foreach { reduceIdx => - val statuses = mapOutputTracker.getMapSizesByShuffleLocation(0, reduceIdx) + val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) @@ -1802,8 +1792,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have hostC complete the resubmitted task complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostC"), makeMaybeShuffleLocation("hostB"))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) // Make sure that the reduce stage was now submitted. assert(taskSets.size === 3) @@ -2065,8 +2055,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostA"))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostA"))) // Reducer should run on the same host that map task ran val reduceTaskSet = taskSets(1) @@ -2111,8 +2101,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostA"))) + assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostA"))) // Reducer should run where RDD 2 has preferences, even though it also has a shuffle dep val reduceTaskSet = taskSets(1) @@ -2275,8 +2265,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", rdd1.partitions.length)), (Success, makeMapStatus("hostB", rdd1.partitions.length)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostA"), makeMaybeShuffleLocation("hostB"))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) assert(listener1.results.size === 1) // When attempting the second stage, show a fetch failure @@ -2291,9 +2281,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(taskSets(2).stageId === 0) complete(taskSets(2), Seq( (Success, makeMapStatus("hostC", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostC"), makeMaybeShuffleLocation("hostB"))) - + assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) assert(listener2.results.size === 0) // Second stage listener should still not have a result // Stage 1 should now be running as task set 3; make its first task succeed @@ -2301,8 +2290,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(3), Seq( (Success, makeMapStatus("hostB", rdd2.partitions.length)), (Success, makeMapStatus("hostD", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(dep2.shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostB"), makeMaybeShuffleLocation("hostD"))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD"))) assert(listener2.results.size === 1) // Finally, the reduce job should be running as task set 4; make it see a fetch failure, @@ -2340,8 +2329,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", rdd1.partitions.length)), (Success, makeMapStatus("hostB", rdd1.partitions.length)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeMaybeShuffleLocation("hostA"), makeMaybeShuffleLocation("hostB"))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1.get).toSet === + HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) assert(listener1.results.size === 1) // When attempting stage1, trigger a fetch failure. @@ -2366,8 +2355,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(taskSets(2).stageId === 0) complete(taskSets(2), Seq( (Success, makeMapStatus("hostC", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === - Set(makeMaybeShuffleLocation("hostC"), makeMaybeShuffleLocation("hostB"))) + assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1.get).toSet === + Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) // After stage0 is finished, stage1 will be submitted and found there is no missing // partitions in it. Then listener got triggered. @@ -2871,7 +2860,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } - private def assertDataStructuresEmpty(): Unit = { + def assertDataStructuresEmpty(): Unit = { assert(scheduler.activeJobs.isEmpty) assert(scheduler.failedStages.isEmpty) assert(scheduler.jobIdToActiveJob.isEmpty) @@ -2915,18 +2904,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi object DAGSchedulerSuite { def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = - MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) + MapStatus(Some(makeBlockManagerId(host)), Array.fill[Long](reduces)(sizes)) def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) - - def makeShuffleLocation(host: String): MapShuffleLocations = { - DefaultMapShuffleLocations.get(makeBlockManagerId(host)) - } - - def makeMaybeShuffleLocation(host: String): Option[MapShuffleLocations] = { - Some(DefaultMapShuffleLocations.get(makeBlockManagerId(host))) - } } object FailThisAttempt { diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 3c786c0927bc6..0140bb5d25687 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.LocalSparkContext._ import org.apache.spark.internal.config import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.BlockManagerId class MapStatusSuite extends SparkFunSuite { @@ -62,11 +61,7 @@ class MapStatusSuite extends SparkFunSuite { stddev <- Seq(0.0, 0.01, 0.5, 1.0) ) { val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) - val bmId = BlockManagerId("a", "b", 10) - val status = MapStatus( - bmId, - DefaultMapShuffleLocations.get(bmId), - sizes) + val status = MapStatus(Some(BlockManagerId("a", "b", 10)), sizes) val status1 = compressAndDecompressMapStatus(status) for (i <- 0 until numSizes) { if (sizes(i) != 0) { @@ -80,7 +75,7 @@ class MapStatusSuite extends SparkFunSuite { test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { val sizes = Array.fill[Long](2001)(150L) - val status = MapStatus(null, null, sizes) + val status = MapStatus(null, sizes) assert(status.isInstanceOf[HighlyCompressedMapStatus]) assert(status.getSizeForBlock(10) === 150L) assert(status.getSizeForBlock(50) === 150L) @@ -91,13 +86,11 @@ class MapStatusSuite extends SparkFunSuite { test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") { val sizes = Array.tabulate[Long](3000) { i => i.toLong } val avg = sizes.sum / sizes.count(_ != 0) - val bmId = BlockManagerId("a", "b", 10) - val loc = DefaultMapShuffleLocations.get(bmId) - val status = MapStatus(bmId, loc, sizes) + val loc = BlockManagerId("a", "b", 10) + val status = MapStatus(Some(loc), sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) - assert(status1.location == loc.getBlockManagerId) - assert(status1.mapShuffleLocations == loc) + assert(status1.location.get == loc) for (i <- 0 until 3000) { val estimate = status1.getSizeForBlock(i) if (sizes(i) > 0) { @@ -115,13 +108,11 @@ class MapStatusSuite extends SparkFunSuite { val sizes = (0L to 3000L).toArray val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold) val avg = smallBlockSizes.sum / smallBlockSizes.length - val bmId = BlockManagerId("a", "b", 10) - val loc = DefaultMapShuffleLocations.get(bmId) - val status = MapStatus(bmId, loc, sizes) + val loc = BlockManagerId("a", "b", 10) + val status = MapStatus(Some(loc), sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) - assert(status1.location === bmId) - assert(status1.mapShuffleLocations === loc) + assert(status1.location.get == loc) for (i <- 0 until threshold) { val estimate = status1.getSizeForBlock(i) if (sizes(i) > 0) { @@ -174,8 +165,7 @@ class MapStatusSuite extends SparkFunSuite { SparkEnv.set(env) // Value of element in sizes is equal to the corresponding index. val sizes = (0L to 2000L).toArray - val bmId = BlockManagerId("exec-0", "host-0", 100) - val status1 = MapStatus(bmId, DefaultMapShuffleLocations.get(bmId), sizes) + val status1 = MapStatus(Some(BlockManagerId("exec-0", "host-0", 100)), sizes) val arrayStream = new ByteArrayOutputStream(102400) val objectOutputStream = new ObjectOutputStream(arrayStream) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) @@ -199,4 +189,12 @@ class MapStatusSuite extends SparkFunSuite { assert(count === 3000) } } + + test("Location can be empty") { + val sizes = (0L to 3000L).toArray + val status = MapStatus(None, sizes) + val status1 = compressAndDecompressMapStatus(status) + assert(status1.isInstanceOf[HighlyCompressedMapStatus]) + assert(status1.location.isEmpty) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 83305a96e6794..aa6db8d0423a3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -192,8 +192,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa shuffleId <- shuffleIds reduceIdx <- (0 until nParts) } { - val statuses = taskScheduler.mapOutputTracker.getMapSizesByShuffleLocation( - shuffleId, reduceIdx) + val statuses = taskScheduler.mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c523d0cb9ce80..cd538719d536b 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -36,9 +36,8 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Kryo._ import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") @@ -351,10 +350,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val ser = new KryoSerializer(conf).newInstance() val denseBlockSizes = new Array[Long](5000) val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) - val bmId = BlockManagerId("exec-1", "host", 1234) Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes => ser.serialize(HighlyCompressedMapStatus( - bmId, DefaultMapShuffleLocations.get(bmId), blockSizes)) + Some(BlockManagerId("exec-1", "host", 1234)), blockSizes)) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 6468914bf3185..343049ff2e0c4 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -25,13 +25,11 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.apache.spark._ -import org.apache.spark.api.shuffle.ShuffleLocation import org.apache.spark.internal.config import org.apache.spark.io.CompressionCodec import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.io.DefaultShuffleReadSupport -import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId} import org.apache.spark.storage.BlockId @@ -111,20 +109,20 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Make a mocked MapOutputTracker for the shuffle reader to use to determine what // shuffle data to read. - val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => - val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) - (shuffleBlockId, byteOutputStream.size().toLong) - } - val blocksToRetrieve = Seq( - (Option.apply(DefaultMapShuffleLocations.get(localBlockManagerId)), shuffleBlockIdsAndSizes)) val mapOutputTracker = mock(classOf[MapOutputTracker]) - when(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1)) - .thenAnswer(new Answer[Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])]] { + when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)) + .thenAnswer(new Answer[Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])]] { def answer(invocationOnMock: InvocationOnMock): - Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = { - blocksToRetrieve.iterator + Iterator[(Option[BlockManagerId], Seq[(BlockId, Long)])] = { + // Test a scenario where all data is local, to avoid creating a bunch of additional mocks + // for the code to read data over the network. + val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => + val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) + (shuffleBlockId, byteOutputStream.size().toLong) + } + Seq((Some(localBlockManagerId), shuffleBlockIdsAndSizes)).toIterator } - }) + }) // Create a mocked shuffle handle to pass into HashShuffleReader. val shuffleHandle = { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala index 4f5bb264170de..c7b452598e7fe 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala @@ -29,7 +29,6 @@ import org.mockito.stubbing.Answer import scala.util.Random import org.apache.spark.{Aggregator, MapOutputTracker, ShuffleDependency, SparkConf, SparkEnv, TaskContext} -import org.apache.spark.api.shuffle.ShuffleLocation import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager} @@ -196,16 +195,15 @@ object BlockStoreShuffleReaderBenchmark extends BenchmarkBase { dataBlockId = remoteBlockManagerId } - when(mapOutputTracker.getMapSizesByShuffleLocation(0, 0, 1)) - .thenAnswer(new Answer[Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])]] { + when(mapOutputTracker.getMapSizesByExecutorId(0, 0, 1)) + .thenAnswer(new Answer[Iterator[(BlockManagerId, Seq[(BlockId, Long)])]] { def answer(invocationOnMock: InvocationOnMock): - Iterator[(Option[ShuffleLocation], Seq[(BlockId, Long)])] = { + Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { val shuffleBlockIdsAndSizes = (0 until NUM_MAPS).map { mapId => val shuffleBlockId = ShuffleBlockId(0, mapId, 0) (shuffleBlockId, dataFileLength) } - Seq((Option.apply(DefaultMapShuffleLocations.get(dataBlockId)), shuffleBlockIdsAndSizes)) - .toIterator + Seq((dataBlockId, shuffleBlockIdsAndSizes)).toIterator } }) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala index dbd73f2688dfc..72e71915c79f5 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala @@ -20,7 +20,6 @@ package org.apache.spark.shuffle.sort import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport -import org.apache.spark.storage.BlockManagerId /** * Benchmark to measure performance for aggregate primitives. @@ -47,10 +46,10 @@ object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = { val conf = new SparkConf(loadDefaults = false) - val shuffleWriteSupport = new DefaultShuffleWriteSupport( - conf, blockResolver, BlockManagerId("0", "localhost", 7090)) conf.set("spark.file.transferTo", String.valueOf(transferTo)) conf.set("spark.shuffle.file.buffer", "32k") + val shuffleWriteSupport = + new DefaultShuffleWriteSupport(conf, blockResolver, blockManager.shuffleServerId) val shuffleWriter = new BypassMergeSortShuffleWriter[String, String]( blockManager, diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 013c5916284d2..e0c49f04dc309 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Answers.RETURNS_SMART_NULLS -import org.mockito.ArgumentMatchers.{any, anyInt} +import org.mockito.ArgumentMatchers.{any, anyInt, anyString} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -140,8 +140,8 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte metricsSystem = null, taskMetrics = taskMetrics)) - writeSupport = new DefaultShuffleWriteSupport( - conf, blockResolver, BlockManagerId("0", "localhost", 7090)) + writeSupport = + new DefaultShuffleWriteSupport(conf, blockResolver, blockManager.shuffleServerId) } override def afterEach(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala index b0ff15cb1f790..1a12fc6b509de 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala @@ -23,7 +23,6 @@ import org.apache.spark.{Aggregator, SparkEnv, TaskContext} import org.apache.spark.benchmark.Benchmark import org.apache.spark.shuffle.BaseShuffleHandle import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport -import org.apache.spark.storage.BlockManagerId /** * Benchmark to measure performance for aggregate primitives. @@ -78,10 +77,8 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager) TaskContext.setTaskContext(taskContext) - val writeSupport = new DefaultShuffleWriteSupport( - defaultConf, - blockResolver, - BlockManagerId("0", "localhost", 9099)) + val writeSupport = + new DefaultShuffleWriteSupport(defaultConf, blockResolver, blockManager.shuffleServerId) val shuffleWriter = new SortShuffleWriter[String, String, String]( blockResolver, diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala index 7066ba8fb44df..fff8562c0b50b 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala @@ -19,7 +19,6 @@ package org.apache.spark.shuffle.sort import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.benchmark.Benchmark import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport -import org.apache.spark.storage.BlockManagerId /** * Benchmark to measure performance for aggregate primitives. @@ -44,8 +43,8 @@ object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = { val conf = new SparkConf(loadDefaults = false) conf.set("spark.file.transferTo", String.valueOf(transferTo)) - val shuffleWriteSupport = new DefaultShuffleWriteSupport( - conf, blockResolver, BlockManagerId("0", "localhost", 9099)) + val shuffleWriteSupport = + new DefaultShuffleWriteSupport(conf, blockResolver, blockManager.shuffleServerId) TaskContext.setTaskContext(taskContext) new UnsafeShuffleWriter[String, String]( diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala index 1f4ef0f203994..3ccb549912782 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriterSuite.scala @@ -37,8 +37,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.util.LimitedInputStream import org.apache.spark.shuffle.IndexShuffleBlockResolver import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.ByteBufferInputStream -import org.apache.spark.util.Utils +import org.apache.spark.util.{ByteBufferInputStream, Utils} class DefaultShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndAfterEach {