Skip to content

Commit

Permalink
SPARK-1456 Remove view bounds on Ordered in favor of a context bound …
Browse files Browse the repository at this point in the history
…on Ordering.

This doesn't require creating new Ordering objects per row.  Additionally, [view bounds are going to be deprecated](https://issues.scala-lang.org/browse/SI-7629), so we should get rid of them while APIs are still flexible.

Author: Michael Armbrust <michael@databricks.com>

Closes apache#410 from marmbrus/viewBounds and squashes the following commits:

c574221 [Michael Armbrust] fix example.
812008e [Michael Armbrust] Update Java API.
1b9b85c [Michael Armbrust] Update scala doc.
35798a8 [Michael Armbrust] Remove view bounds on Ordered in favor of a context bound on Ordering.
  • Loading branch information
marmbrus authored and rxin committed Apr 18, 2014
1 parent 81a152c commit c399baa
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 18 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {

private val ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
if (partitions == 1) {
Expand All @@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
val rddSize = rdd.count()
val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
if (rddSample.length == 0) {
Array()
} else {
Expand All @@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
var partition = 0
if (rangeBounds.length < 1000) {
// If we have less than 100 partitions naive search
while (partition < rangeBounds.length && k > rangeBounds(partition)) {
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ object SparkContext extends Logging {
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

Expand Down
10 changes: 2 additions & 8 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* order of the keys).
*/
def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = {
class KeyOrdering(val a: K) extends Ordered[K] {
override def compare(b: K) = comp.compare(a, b)
}
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
}

Expand All @@ -640,10 +637,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* order of the keys).
*/
def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = {
class KeyOrdering(val a: K) extends Ordered[K] {
override def compare(b: K) = comp.compare(a, b)
}
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
}

Expand Down
26 changes: 21 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,31 @@ import org.apache.spark.{Logging, RangePartitioner}
/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
* an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
* use these functions. They will work with any key type that has a `scala.math.Ordered`
* implementation.
* use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in
* scope. Ordering objects already exist for all of the standard primitive types. Users can also
* define their own orderings for custom types, or to override the default ordering. The implicit
* ordering that is in the closest scope will be used.
*
* {{{
* import org.apache.spark.SparkContext._
*
* val rdd: RDD[(String, Int)] = ...
* implicit val caseInsensitiveOrdering = new Ordering[String] {
* override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
* }
*
* // Sort by key, using the above case insensitive ordering.
* rdd.sortByKey()
* }}}
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag](
self: RDD[P])
extends Logging with Serializable {

private val ordering = implicitly[Ordering[K]]

/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
Expand All @@ -45,9 +61,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
shuffled.mapPartitions(iter => {
val buf = iter.toArray
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
} else {
buf.sortWith((x, y) => x._1 > y._1).iterator
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
}
}, preservesPartitioning = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.Array
import scala.reflect._

private[spark] object CollectionsUtils {
def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = {
def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
classTag[K] match {
case ClassTag.Float =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
Expand Down

0 comments on commit c399baa

Please sign in to comment.