diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 88f6d32a0f272..603330e71956a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -137,6 +137,8 @@ class DAGScheduler( private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) + // Flag to control if reduce tasks are assigned preferred locations + private val shuffleLocalityEnabled = sc.getConf.getBoolean("spark.shuffle.locality.enabled", true) // Number of map, reduce tasks above which we do not assign preferred locations // based on map output sizes. We limit the size of jobs for which assign preferred locations // as sorting the locations by size becomes expensive. @@ -1410,7 +1412,8 @@ class DAGScheduler( case s: ShuffleDependency[_, _, _] => // For shuffle dependencies, pick the 5 locations with the largest map outputs as preferred // locations - if (rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD && + if (shuffleLocalityEnabled && + rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD && s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) { // Get the preferred map output locations for this reducer val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,