Skip to content

Commit

Permalink
Set preferred locations for reduce tasks
Browse files Browse the repository at this point in the history
This is another attempt at apache#1697 addressing some of the earlier concerns.
This adds a couple of thresholds based on number map and reduce tasks
beyond which we don't use preferred locations for reduce tasks.

This patch also fixes some bugs in DAGSchedulerSuite where the MapStatus
objects created didn't have the right number of reducers set.
  • Loading branch information
shivaram committed Feb 12, 2015
1 parent 99bd500 commit 3b464b7
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 13 deletions.
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

// For each shuffleId we also maintain a Map from reducerId -> (location, size)
// Lazily populated whenever the statuses are requested from DAGScheduler
private val statusByReducer =
new TimeStampedHashMap[Int, HashMap[Int, Array[(BlockManagerId, Long)]]]()


// For cleaning up TimeStampedHashMaps
private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
Expand Down Expand Up @@ -282,13 +288,37 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
override def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
cachedSerializedStatuses.remove(shuffleId)
statusByReducer.remove(shuffleId)
}

/** Check if the given shuffle is being tracked */
def containsShuffle(shuffleId: Int): Boolean = {
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

// Return the list of locations and blockSizes for each reducer.
// The map is keyed by reducerId and for each reducer the value contains the array
// of (location, size) of map outputs.
//
// This method is not thread-safe
def getStatusByReducer(shuffleId: Int, numReducers: Int): Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
if (!statusByReducer.contains(shuffleId) && mapStatuses.contains(shuffleId)) {
val statuses = mapStatuses(shuffleId)
if (statuses.length > 0) {
statusByReducer(shuffleId) = new HashMap[Int, Array[(BlockManagerId, Long)]]
var r = 0
while (r < numReducers) {
val locs = statuses.map { s =>
(s.location, s.getSizeForBlock(r))
}
statusByReducer(shuffleId) += (r -> locs)
r = r + 1
}
}
}
statusByReducer.get(shuffleId)
}

def incrementEpoch() {
epochLock.synchronized {
epoch += 1
Expand Down Expand Up @@ -336,6 +366,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
private def cleanup(cleanupTime: Long) {
mapStatuses.clearOldValues(cleanupTime)
cachedSerializedStatuses.clearOldValues(cleanupTime)
statusByReducer.clearOldValues(cleanupTime)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util._
import org.apache.spark.util.collection.{Utils => CollectionUtils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down Expand Up @@ -128,6 +129,15 @@ class DAGScheduler(

private val outputCommitCoordinator = env.outputCommitCoordinator

// Number of map, reduce tasks above which we do not assign preferred locations
// based on map output sizes.
private val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
// Number of preferred locations to use for reducer tasks
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5


// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventProcessLoop.post(BeginEvent(task, taskInfo))
Expand Down Expand Up @@ -1295,7 +1305,7 @@ class DAGScheduler(
{
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd,partition))) {
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
Expand All @@ -1320,6 +1330,22 @@ class DAGScheduler(
return locs
}
}
case s: ShuffleDependency[_, _, _] =>
if (rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
// Assign preferred locations for reducers by looking at map output location and sizes
val mapStatuses = mapOutputTracker.getStatusByReducer(s.shuffleId, rdd.partitions.size)
mapStatuses.map { status =>
// Get the map output locations for this reducer
if (status.contains(partition)) {
// Select first few locations as preferred locations for the reducer
val topLocs = CollectionUtils.takeOrdered(status(partition).iterator,
NUM_REDUCER_PREF_LOCS)(Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq
return topLocs.map(_._1).map(loc => TaskLocation(loc.host, loc.executorId))
}
}
}

case _ =>
}
Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// the 2nd ResultTask failed
complete(taskSets(1), Seq(
(Success, 42),
Expand All @@ -449,7 +449,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
// ask the scheduler to try it again
scheduler.resubmitFailedStages()
// have the 2nd attempt pass
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
// we can see both result blocks now
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
complete(taskSets(3), Seq((Success, 43)))
Expand All @@ -464,8 +464,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))
Expand Down Expand Up @@ -507,14 +507,18 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assert(newEpoch > oldEpoch)
val taskSet = taskSets(0)
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
Expand Down Expand Up @@ -739,19 +743,63 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
assertDataStructuresEmpty
}

test("shuffle with reducer locality") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1))))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostA")))

// Reducer should run on the same host that map task ran
val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

test("reducer locality with different sizes") {
val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1
// Create an shuffleMapRdd with more partitions
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))

val statuses = (1 to numMapTasks).map { i =>
(Success, makeMapStatus("host" + i, 1, (10*i).toByte))
}
complete(taskSets(0), statuses)

// Reducer should prefer the last hosts where output size is larger
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)

val reduceTaskSet = taskSets(1)
assertLocations(reduceTaskSet, Seq(hosts))
complete(reduceTaskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
*/
private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
assert(hosts.size === taskSet.tasks.size)
for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
assert(taskLocs.map(_.host) === expectedLocs)
assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
}
}

private def makeMapStatus(host: String, reduces: Int): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))

private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
Expand Down

0 comments on commit 3b464b7

Please sign in to comment.