Skip to content

Commit

Permalink
SPARK-4968: takeOrdered to skip reduce step in case mappers return no…
Browse files Browse the repository at this point in the history
… partitions
  • Loading branch information
Yash Datta authored and Yash Datta committed Dec 29, 2014
1 parent 14fa87b commit 5974d10
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,15 +1146,20 @@ abstract class RDD[T: ClassTag](
if (num == 0) {
Array.empty
} else {
mapPartitions { items =>
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= util.collection.Utils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
if (mapRDDs.partitions.size == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}

Expand Down

0 comments on commit 5974d10

Please sign in to comment.