Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaram committed Jun 4, 2015
1 parent e7d5449 commit 0df3180
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 39 deletions.
51 changes: 30 additions & 21 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import scala.collection.mutable.{HashSet, Map, HashMap}
import scala.collection.mutable.{HashMap, HashSet, Map}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag

Expand All @@ -30,6 +30,7 @@ import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.MetadataFetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util._
import org.apache.spark.util.collection.{Utils => CollectionUtils}

private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int)
Expand Down Expand Up @@ -232,11 +233,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

// For each shuffleId we also maintain a Map from reducerId -> (location, size)
// For each shuffleId we also maintain a Map from reducerId -> (locations with largest outputs)
// Lazily populated whenever the statuses are requested from DAGScheduler
private val statusByReducer =
new TimeStampedHashMap[Int, HashMap[Int, Array[(BlockManagerId, Long)]]]()

private val shuffleIdToReduceLocations =
new TimeStampedHashMap[Int, HashMap[Int, Array[BlockManagerId]]]()

// For cleaning up TimeStampedHashMaps
private val metadataCleaner =
Expand Down Expand Up @@ -283,38 +283,47 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
override def unregisterShuffle(shuffleId: Int) {
mapStatuses.remove(shuffleId)
cachedSerializedStatuses.remove(shuffleId)
statusByReducer.remove(shuffleId)
shuffleIdToReduceLocations.remove(shuffleId)
}

/** Check if the given shuffle is being tracked */
def containsShuffle(shuffleId: Int): Boolean = {
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
}

// Return the list of locations and blockSizes for each reducer.
// The map is keyed by reducerId and for each reducer the value contains the array
// of (location, size) of map outputs.
//
// This method is not thread-safe
def getStatusByReducer(
/**
* Return a list of locations which have the largest map outputs given a shuffleId
* and a reducerId.
*
* This method is not thread-safe
*/
def getLocationsWithLargestOutputs(
shuffleId: Int,
numReducers: Int)
: Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
if (!statusByReducer.contains(shuffleId) && mapStatuses.contains(shuffleId)) {
reducerId: Int,
numReducers: Int,
numTopLocs: Int)
: Option[Array[BlockManagerId]] = {
if (!shuffleIdToReduceLocations.contains(shuffleId) && mapStatuses.contains(shuffleId)) {
// Pre-compute the top locations for each reducer and cache it
val statuses = mapStatuses(shuffleId)
if (statuses.length > 0) {
statusByReducer(shuffleId) = new HashMap[Int, Array[(BlockManagerId, Long)]]
if (statuses.nonEmpty) {
val ordering = Ordering.by[(BlockManagerId, Long), Long](_._2).reverse
shuffleIdToReduceLocations(shuffleId) = new HashMap[Int, Array[BlockManagerId]]
var r = 0
while (r < numReducers) {
// Add up sizes of all blocks at the same location
val locs = statuses.map { s =>
(s.location, s.getSizeForBlock(r))
}
statusByReducer(shuffleId) += (r -> locs)
}.groupBy(_._1).mapValues { sizes =>
sizes.map(_._2).reduceLeft(_ + _)
}.toIterator
val topLocs = CollectionUtils.takeOrdered(locs, numTopLocs)(ordering)
shuffleIdToReduceLocations(shuffleId) += (r -> topLocs.map(_._1).toArray)
r = r + 1
}
}
}
statusByReducer.get(shuffleId)
shuffleIdToReduceLocations.get(shuffleId).flatMap(_.get(reducerId))
}

def incrementEpoch() {
Expand Down Expand Up @@ -364,7 +373,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
private def cleanup(cleanupTime: Long) {
mapStatuses.clearOldValues(cleanupTime)
cachedSerializedStatuses.clearOldValues(cleanupTime)
statusByReducer.clearOldValues(cleanupTime)
shuffleIdToReduceLocations.clearOldValues(cleanupTime)
}
}

Expand Down
33 changes: 15 additions & 18 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.util.collection.{Utils => CollectionUtils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down Expand Up @@ -139,10 +138,11 @@ class DAGScheduler(
taskScheduler.setDAGScheduler(this)

// Number of map, reduce tasks above which we do not assign preferred locations
// based on map output sizes.
private val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// based on map output sizes. We limit the size of jobs for which assign preferred locations
// as sorting the locations by size becomes expensive.
private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
// Number of preferred locations to use for reducer tasks
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5

Expand Down Expand Up @@ -1393,31 +1393,28 @@ class DAGScheduler(
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.

rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case s: ShuffleDependency[_, _, _] =>
// For shuffle dependencies, pick the 5 locations with the largest map outputs as preferred
// locations
if (rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
// Assign preferred locations for reducers by looking at map output location and sizes
val mapStatuses = mapOutputTracker.getStatusByReducer(s.shuffleId, rdd.partitions.size)
mapStatuses.map { status =>
// Get the map output locations for this reducer
if (status.contains(partition)) {
// Select first few locations as preferred locations for the reducer
val topLocs = CollectionUtils.takeOrdered(
status(partition).iterator, NUM_REDUCER_PREF_LOCS)(
Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq
return topLocs.map(_._1).map(loc => TaskLocation(loc.host, loc.executorId))
}
// Get the preferred map output locations for this reducer
val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId, partition,
rdd.partitions.size, NUM_REDUCER_PREF_LOCS)
if (topLocsForReducer.nonEmpty) {
return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
}
}

Expand Down
25 changes: 25 additions & 0 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,29 @@ class MapOutputTrackerSuite extends SparkFunSuite {
// masterTracker.stop() // this throws an exception
rpcEnv.shutdown()
}

test("getLocationsWithLargestOutputs with multiple outputs in same machine") {
val rpcEnv = createRpcEnv("test")
val tracker = new MapOutputTrackerMaster(conf)
tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
// Setup 3 map tasks
// on hostA with output size 1
// on hostA with output size 1
// on hostB with output size 2
tracker.registerShuffle(10, 3)
tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1L)))
tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
Array(1L)))
tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
Array(2L)))

val topLocs = tracker.getLocationsWithLargestOutputs(10, 0, 1, 1)
assert(topLocs.nonEmpty)
assert(topLocs.get.size === 1)
assert(topLocs.get.head === BlockManagerId("a", "hostA", 1000))
tracker.stop()
rpcEnv.shutdown()
}
}

0 comments on commit 0df3180

Please sign in to comment.