Skip to content

Commit

Permalink
Use fraction of map outputs to determine locations
Browse files Browse the repository at this point in the history
Also removes caching of preferred locations to make the API cleaner
  • Loading branch information
shivaram committed Jun 9, 2015
1 parent 68bc29e commit f5be578
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 34 deletions.
49 changes: 28 additions & 21 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,44 +292,51 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}

/**
* Return a list of locations which have the largest map outputs given a shuffleId
* and a reducerId.
* Return a list of locations which have fraction of map output greater than specified threshold.
*
* @param shuffleId id of the shuffle
* @param reducerId id of the reduce task
* @param numReducers total number of reducers in the shuffle
* @param fractionThreshold fraction of total map output size that a location must have
* for it to be considered large.
*
* This method is not thread-safe
*/
def getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
numTopLocs: Int)
fractionThreshold: Double)
: Option[Array[BlockManagerId]] = {
if (!shuffleIdToReduceLocations.contains(shuffleId) && mapStatuses.contains(shuffleId)) {

if (mapStatuses.contains(shuffleId)) {
// Pre-compute the top locations for each reducer and cache it
val statuses = mapStatuses(shuffleId)
if (statuses.nonEmpty) {
val ordering = Ordering.by[(BlockManagerId, Long), Long](_._2).reverse
shuffleIdToReduceLocations(shuffleId) = new HashMap[Int, Array[BlockManagerId]]
var reduceIdx = 0
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
while (reduceIdx < numReducers) {
var mapIdx = 0
locs.clear()
while (mapIdx < statuses.length) {
val blockSize = statuses(mapIdx).getSizeForBlock(reduceIdx)
if (blockSize > 0) {
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
blockSize
}
mapIdx = mapIdx + 1
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
val blockSize = statuses(mapIdx).getSizeForBlock(reducerId)
if (blockSize > 0) {
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
blockSize
totalOutputSize += blockSize
}
val topLocs = CollectionUtils.takeOrdered(locs.toIterator, numTopLocs)(ordering)
shuffleIdToReduceLocations(shuffleId) += (reduceIdx -> topLocs.map(_._1).toArray)
reduceIdx = reduceIdx + 1
mapIdx = mapIdx + 1
}
val topLocs = locs.filter { case (loc, size) =>
size.toDouble / totalOutputSize >= fractionThreshold
}
// Only add this reducer to our map if we have any locations which satisfy
// the required threshold
if (topLocs.nonEmpty) {
return Some(topLocs.map(_._1).toArray)
}
}
}
shuffleIdToReduceLocations.get(shuffleId).flatMap(_.get(reducerId))
None
}

def incrementEpoch() {
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ class DAGScheduler(
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
// Number of preferred locations to use for reducer tasks.
// Making this smaller will focus on the locations where the most data can be read locally, but
// may lead to more delay in scheduling if all of those locations are busy.
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5

// Fraction of total map output that must be at a location for it to considered as a preferred
// location for a reduce task.
// Making this larger will focus on fewer locations where most data can be read locally, but
// may lead to more delay in scheduling if those locations are busy.
//
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2

// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
Expand Down Expand Up @@ -1411,14 +1414,14 @@ class DAGScheduler(
}
}
case s: ShuffleDependency[_, _, _] =>
// For shuffle dependencies, pick the 5 locations with the largest map outputs as preferred
// locations
// For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
// of data as preferred locations
if (shuffleLocalityEnabled &&
rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
// Get the preferred map output locations for this reducer
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
partition, rdd.partitions.size, NUM_REDUCER_PREF_LOCS)
partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
if (topLocsForReducer.nonEmpty) {
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
}
Expand Down
15 changes: 11 additions & 4 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,17 @@ class MapOutputTrackerSuite extends SparkFunSuite {
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(3L)))

val topLocs = tracker.getLocationsWithLargestOutputs(10, 0, 1, 1)
assert(topLocs.nonEmpty)
assert(topLocs.get.size === 1)
assert(topLocs.get.head === BlockManagerId("a", "hostA", 1000))
val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
assert(topLocs50.nonEmpty)
assert(topLocs50.get.size === 1)
assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))

val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
assert(topLocs20.nonEmpty)
assert(topLocs20.get.size === 2)
assert(topLocs20.get.toSet ===
Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet)

tracker.stop()
rpcEnv.shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ class DAGSchedulerSuite
}

test("reducer locality with different sizes") {
val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1
val numMapTasks = 4
// Create an shuffleMapRdd with more partitions
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
Expand All @@ -834,7 +834,7 @@ class DAGSchedulerSuite
}
complete(taskSets(0), statuses)

// Reducer should prefer the last hosts where output size is larger
// Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)

val reduceTaskSet = taskSets(1)
Expand Down

0 comments on commit f5be578

Please sign in to comment.