diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 73271ab3cdbc4..b98fb6c68af3f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -292,8 +292,13 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) } /** - * Return a list of locations which have the largest map outputs given a shuffleId - * and a reducerId. + * Return a list of locations which have fraction of map output greater than specified threshold. + * + * @param shuffleId id of the shuffle + * @param reducerId id of the reduce task + * @param numReducers total number of reducers in the shuffle + * @param fractionThreshold fraction of total map output size that a location must have + * for it to be considered large. * * This method is not thread-safe */ @@ -301,35 +306,37 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) shuffleId: Int, reducerId: Int, numReducers: Int, - numTopLocs: Int) + fractionThreshold: Double) : Option[Array[BlockManagerId]] = { - if (!shuffleIdToReduceLocations.contains(shuffleId) && mapStatuses.contains(shuffleId)) { + + if (mapStatuses.contains(shuffleId)) { // Pre-compute the top locations for each reducer and cache it val statuses = mapStatuses(shuffleId) if (statuses.nonEmpty) { - val ordering = Ordering.by[(BlockManagerId, Long), Long](_._2).reverse - shuffleIdToReduceLocations(shuffleId) = new HashMap[Int, Array[BlockManagerId]] - var reduceIdx = 0 // HashMap to add up sizes of all blocks at the same location val locs = new HashMap[BlockManagerId, Long] - while (reduceIdx < numReducers) { - var mapIdx = 0 - locs.clear() - while (mapIdx < statuses.length) { - val blockSize = statuses(mapIdx).getSizeForBlock(reduceIdx) - if (blockSize > 0) { - locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) + - blockSize - } - mapIdx = mapIdx + 1 + var totalOutputSize = 0L + var mapIdx = 0 + while (mapIdx < statuses.length) { + val blockSize = statuses(mapIdx).getSizeForBlock(reducerId) + if (blockSize > 0) { + locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) + + blockSize + totalOutputSize += blockSize } - val topLocs = CollectionUtils.takeOrdered(locs.toIterator, numTopLocs)(ordering) - shuffleIdToReduceLocations(shuffleId) += (reduceIdx -> topLocs.map(_._1).toArray) - reduceIdx = reduceIdx + 1 + mapIdx = mapIdx + 1 + } + val topLocs = locs.filter { case (loc, size) => + size.toDouble / totalOutputSize >= fractionThreshold + } + // Only add this reducer to our map if we have any locations which satisfy + // the required threshold + if (topLocs.nonEmpty) { + return Some(topLocs.map(_._1).toArray) } } } - shuffleIdToReduceLocations.get(shuffleId).flatMap(_.get(reducerId)) + None } def incrementEpoch() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7758b8c4f714f..a5086648e72f3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -146,10 +146,13 @@ class DAGScheduler( private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000 // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000 - // Number of preferred locations to use for reducer tasks. - // Making this smaller will focus on the locations where the most data can be read locally, but - // may lead to more delay in scheduling if all of those locations are busy. - private[scheduler] val NUM_REDUCER_PREF_LOCS = 5 + + // Fraction of total map output that must be at a location for it to considered as a preferred + // location for a reduce task. + // Making this larger will focus on fewer locations where most data can be read locally, but + // may lead to more delay in scheduling if those locations are busy. + // + private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2 // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { @@ -1411,14 +1414,14 @@ class DAGScheduler( } } case s: ShuffleDependency[_, _, _] => - // For shuffle dependencies, pick the 5 locations with the largest map outputs as preferred - // locations + // For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION + // of data as preferred locations if (shuffleLocalityEnabled && rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD && s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) { // Get the preferred map output locations for this reducer val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId, - partition, rdd.partitions.size, NUM_REDUCER_PREF_LOCS) + partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION) if (topLocsForReducer.nonEmpty) { return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId)) } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index d8d3a4cc32486..39dd1cc194997 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -223,10 +223,17 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000), Array(3L))) - val topLocs = tracker.getLocationsWithLargestOutputs(10, 0, 1, 1) - assert(topLocs.nonEmpty) - assert(topLocs.get.size === 1) - assert(topLocs.get.head === BlockManagerId("a", "hostA", 1000)) + val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5) + assert(topLocs50.nonEmpty) + assert(topLocs50.get.size === 1) + assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000)) + + val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2) + assert(topLocs20.nonEmpty) + assert(topLocs20.get.size === 2) + assert(topLocs20.get.toSet === + Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet) + tracker.stop() rpcEnv.shutdown() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 79d66186d9e09..8994666002712 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -821,7 +821,7 @@ class DAGSchedulerSuite } test("reducer locality with different sizes") { - val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1 + val numMapTasks = 4 // Create an shuffleMapRdd with more partitions val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) @@ -834,7 +834,7 @@ class DAGSchedulerSuite } complete(taskSets(0), statuses) - // Reducer should prefer the last hosts where output size is larger + // Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1) val reduceTaskSet = taskSets(1)