Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaram committed Jun 9, 2015
1 parent 897a914 commit 2ef2d39
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,27 +304,25 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
: Option[Array[BlockManagerId]] = {

if (mapStatuses.contains(shuffleId)) {
// Pre-compute the top locations for each reducer and cache it
val statuses = mapStatuses(shuffleId)
if (statuses.nonEmpty) {
// HashMap to add up sizes of all blocks at the same location
val locs = new HashMap[BlockManagerId, Long]
var totalOutputSize = 0L
var mapIdx = 0
while (mapIdx < statuses.length) {
val blockSize = statuses(mapIdx).getSizeForBlock(reducerId)
val status = statuses(mapIdx)
val blockSize = status.getSizeForBlock(reducerId)
if (blockSize > 0) {
locs(statuses(mapIdx).location) = locs.getOrElse(statuses(mapIdx).location, 0L) +
blockSize
locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
totalOutputSize += blockSize
}
mapIdx = mapIdx + 1
}
val topLocs = locs.filter { case (loc, size) =>
size.toDouble / totalOutputSize >= fractionThreshold
}
// Only add this reducer to our map if we have any locations which satisfy
// the required threshold
// Return if we have any locations which satisfy the required threshold
if (topLocs.nonEmpty) {
return Some(topLocs.map(_._1).toArray)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class DAGScheduler(
sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
// Number of map, reduce tasks above which we do not assign preferred locations
// based on map output sizes. We limit the size of jobs for which assign preferred locations
// as sorting the locations by size becomes expensive.
// as computing the top locations by size becomes expensive.
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
Expand All @@ -151,7 +151,6 @@ class DAGScheduler(
// location for a reduce task.
// Making this larger will focus on fewer locations where most data can be read locally, but
// may lead to more delay in scheduling if those locations are busy.
//
private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2

// Called by TaskScheduler to report task's starting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,14 @@ class MapOutputTrackerSuite extends SparkFunSuite {
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(3L)))

// When the threshold is 50%, only host A should be returned as a preferred location
// as it has 4 out of 7 bytes of output.
val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
assert(topLocs50.nonEmpty)
assert(topLocs50.get.size === 1)
assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))

// When the threshold is 20%, both hosts should be returned as preferred locations.
val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
assert(topLocs20.nonEmpty)
assert(topLocs20.get.size === 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

test("shuffle with reducer locality") {
test("reduce tasks should be placed locally with map output") {
// Create an shuffleMapRdd with 1 partition
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
Expand All @@ -820,7 +820,7 @@ class DAGSchedulerSuite
assertDataStructuresEmpty
}

test("reducer locality with different sizes") {
test("reduce task locality preferences should only include machines with largest map outputs") {
val numMapTasks = 4
// Create an shuffleMapRdd with more partitions
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
Expand Down

0 comments on commit 2ef2d39

Please sign in to comment.