Skip to content

Commit

Permalink
Revert "Make removing map output by exec id configurable (#608)"
Browse files Browse the repository at this point in the history
This reverts commit 5abde44.
  • Loading branch information
mccheah committed Oct 3, 2019
1 parent 94262f8 commit b95f918
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@

public interface ShuffleDriverComponents {

enum MapOutputUnregistrationStrategy {
MAP_OUTPUT_ONLY,
EXECUTOR,
HOST,
}

/**
* @return additional SparkConf values necessary for the executors.
*/
Expand All @@ -41,8 +35,8 @@ default void registerShuffle(int shuffleId) throws IOException {}

default void removeShuffle(int shuffleId, boolean blocking) throws IOException {}

default MapOutputUnregistrationStrategy unregistrationStrategyOnFetchFailure() {
return MapOutputUnregistrationStrategy.EXECUTOR;
default boolean shouldUnregisterOutputOnHostOnFetchFailure() {
return false;
}

default boolean isMapOutputLostWhenMapperLost(int shuffleId, int mapId, Optional<BlockManagerId> mapperLocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ public void removeShuffle(int shuffleId, boolean blocking) {
}

@Override
public MapOutputUnregistrationStrategy unregistrationStrategyOnFetchFailure() {
if (shouldUnregisterOutputOnHostOnFetchFailure) {
return MapOutputUnregistrationStrategy.HOST;
}
return MapOutputUnregistrationStrategy.EXECUTOR;
public boolean shouldUnregisterOutputOnHostOnFetchFailure() {
return shouldUnregisterOutputOnHostOnFetchFailure;
}

private void checkInitialized() {
Expand Down
14 changes: 4 additions & 10 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{DeterministicLevel, RDD, RDDCheckpointData}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.shuffle.api.ShuffleDriverComponents.MapOutputUnregistrationStrategy
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
import org.apache.spark.util._
Expand Down Expand Up @@ -1673,8 +1672,7 @@ private[spark] class DAGScheduler(
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
if (bmAddress.executorId == null) {
if (shuffleDriverComponents.unregistrationStrategyOnFetchFailure() ==
MapOutputUnregistrationStrategy.HOST) {
if (shuffleDriverComponents.shouldUnregisterOutputOnHostOnFetchFailure()) {
val currentEpoch = task.epoch
val host = bmAddress.host
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
Expand All @@ -1683,8 +1681,7 @@ private[spark] class DAGScheduler(
}
} else {
val hostToUnregisterOutputs =
if (shuffleDriverComponents.unregistrationStrategyOnFetchFailure() ==
MapOutputUnregistrationStrategy.HOST) {
if (shuffleDriverComponents.shouldUnregisterOutputOnHostOnFetchFailure()) {
// We had a fetch failure with the external shuffle service, so we
// assume all shuffle data on the node is bad.
Some(bmAddress.host)
Expand Down Expand Up @@ -1866,11 +1863,8 @@ private[spark] class DAGScheduler(
logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
mapOutputTracker.removeOutputsOnHost(host)
case None =>
if (shuffleDriverComponents.unregistrationStrategyOnFetchFailure() ==
MapOutputUnregistrationStrategy.EXECUTOR) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
mapOutputTracker.removeOutputsOnExecutor(execId)
}
clearCacheLocs()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.{FetchFailed, HashPartitioner, ShuffleDependency, SparkC
import org.apache.spark.internal.config
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.api.{ShuffleDataIO, ShuffleDriverComponents, ShuffleExecutorComponents}
import org.apache.spark.shuffle.api.ShuffleDriverComponents.MapOutputUnregistrationStrategy
import org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO
import org.apache.spark.storage.BlockManagerId

Expand All @@ -45,42 +44,16 @@ class PluginShuffleDriverComponents(delegate: ShuffleDriverComponents)
override def removeShuffle(shuffleId: Int, blocking: Boolean): Unit =
delegate.removeShuffle(shuffleId, blocking)

override def unregistrationStrategyOnFetchFailure():
ShuffleDriverComponents.MapOutputUnregistrationStrategy =
MapOutputUnregistrationStrategy.HOST
}

class AsyncShuffleDataIO(sparkConf: SparkConf) extends ShuffleDataIO {
val localDiskShuffleDataIO = new LocalDiskShuffleDataIO(sparkConf)
override def driver(): ShuffleDriverComponents =
new AsyncShuffleDriverComponents(localDiskShuffleDataIO.driver())

override def executor(): ShuffleExecutorComponents = localDiskShuffleDataIO.executor()
}

class AsyncShuffleDriverComponents(delegate: ShuffleDriverComponents)
extends ShuffleDriverComponents {
override def initializeApplication(): util.Map[String, String] =
delegate.initializeApplication()

override def cleanupApplication(): Unit =
delegate.cleanupApplication()

override def removeShuffle(shuffleId: Int, blocking: Boolean): Unit =
delegate.removeShuffle(shuffleId, blocking)

override def unregistrationStrategyOnFetchFailure():
ShuffleDriverComponents.MapOutputUnregistrationStrategy =
MapOutputUnregistrationStrategy.MAP_OUTPUT_ONLY
override def shouldUnregisterOutputOnHostOnFetchFailure(): Boolean = true
}

class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite {

private def setupTest(pluginClass: Class[_]): (RDD[_], Int) = {
private def setupTest(): (RDD[_], Int) = {
afterEach()
val conf = new SparkConf()
// unregistering all outputs on a host is enabled for the individual file server case
conf.set(config.SHUFFLE_IO_PLUGIN_CLASS, pluginClass.getName)
conf.set(config.SHUFFLE_IO_PLUGIN_CLASS, classOf[PluginShuffleDataIO].getName)
init(conf)
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand All @@ -89,32 +62,8 @@ class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite {
(reduceRdd, shuffleId)
}

test("Test async") {
val (reduceRdd, shuffleId) = setupTest(classOf[AsyncShuffleDataIO])
submit(reduceRdd, Array(0, 1))

// Perform map task
val mapStatus1 = makeMapStatus("exec1", "hostA")
val mapStatus2 = makeMapStatus("exec1", "hostA")
complete(taskSets(0), Seq((Success, mapStatus1), (Success, mapStatus2)))
assertMapShuffleLocations(shuffleId, Seq(mapStatus1, mapStatus2))


// perform reduce task
complete(taskSets(1), Seq((Success, 42),
(FetchFailed(BlockManagerId("exec1", "hostA", 1234), shuffleId, 1, 0, "ignored"), null)))
assertMapShuffleLocations(shuffleId, Seq(mapStatus1, null))

scheduler.resubmitFailedStages()
complete(taskSets(2), Seq((Success, mapStatus2)))

complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty()
}

test("Test simple file server") {
val (reduceRdd, shuffleId) = setupTest(classOf[PluginShuffleDataIO])
val (reduceRdd, shuffleId) = setupTest()
submit(reduceRdd, Array(0, 1))

// Perform map task
Expand All @@ -130,7 +79,7 @@ class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite {
}

test("Test simple file server fetch failure") {
val (reduceRdd, shuffleId) = setupTest(classOf[PluginShuffleDataIO])
val (reduceRdd, shuffleId) = setupTest()
submit(reduceRdd, Array(0, 1))

// Perform map task
Expand All @@ -151,7 +100,7 @@ class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite {
}

test("Test simple file fetch server - duplicate host") {
val (reduceRdd, shuffleId) = setupTest(classOf[PluginShuffleDataIO])
val (reduceRdd, shuffleId) = setupTest()
submit(reduceRdd, Array(0, 1))

// Perform map task
Expand All @@ -172,7 +121,7 @@ class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite {
}

test("Test DFS case - empty BlockManagerId") {
val (reduceRdd, shuffleId) = setupTest(classOf[PluginShuffleDataIO])
val (reduceRdd, shuffleId) = setupTest()
submit(reduceRdd, Array(0, 1))

val mapStatus = makeEmptyMapStatus()
Expand All @@ -186,7 +135,7 @@ class DAGSchedulerShufflePluginSuite extends DAGSchedulerSuite {
}

test("Test DFS case - fetch failure") {
val (reduceRdd, shuffleId) = setupTest(classOf[PluginShuffleDataIO])
val (reduceRdd, shuffleId) = setupTest()
submit(reduceRdd, Array(0, 1))

// Perform map task
Expand Down

0 comments on commit b95f918

Please sign in to comment.