diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1a54ed1073f64..07ab98fe7f31f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -301,7 +301,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) // of (location, size) of map outputs. // // This method is not thread-safe - def getStatusByReducer(shuffleId: Int, numReducers: Int): Option[Map[Int, Array[(BlockManagerId, Long)]]] = { + def getStatusByReducer( + shuffleId: Int, + numReducers: Int) + : Option[Map[Int, Array[(BlockManagerId, Long)]]] = { if (!statusByReducer.contains(shuffleId) && mapStatuses.contains(shuffleId)) { val statuses = mapStatuses(shuffleId) if (statuses.length > 0) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6314c4595876e..fb7ffca0ceadb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1339,8 +1339,9 @@ class DAGScheduler( // Get the map output locations for this reducer if (status.contains(partition)) { // Select first few locations as preferred locations for the reducer - val topLocs = CollectionUtils.takeOrdered(status(partition).iterator, - NUM_REDUCER_PREF_LOCS)(Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq + val topLocs = CollectionUtils.takeOrdered( + status(partition).iterator, NUM_REDUCER_PREF_LOCS) + (Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq return topLocs.map(_._1).map(loc => TaskLocation(loc.host, loc.executorId)) } }