Skip to content

Commit

Permalink
Remove view bounds on Ordered in favor of a context bound on Ordering.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Apr 15, 2014
1 parent 268b535 commit 35798a8
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
* equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {

private val ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
if (partitions == 1) {
Expand All @@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
val rddSize = rdd.count()
val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted
if (rddSample.length == 0) {
Array()
} else {
Expand All @@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V](
var partition = 0
if (rangeBounds.length < 1000) {
// If we have less than 100 partitions naive search
while (partition < rangeBounds.length && k > rangeBounds(partition)) {
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ object SparkContext extends Logging {
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions[K, V, (K, V)](rdd)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import org.apache.spark.{Logging, RangePartitioner}
* use these functions. They will work with any key type that has a `scala.math.Ordered`
* implementation.
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
class OrderedRDDFunctions[K : Ordering : ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag](
self: RDD[P])
extends Logging with Serializable {

private val ordering = implicitly[Ordering[K]]

/**
* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
* `collect` or `save` on the resulting RDD will return or output an ordered list of records
Expand All @@ -45,9 +47,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
shuffled.mapPartitions(iter => {
val buf = iter.toArray
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator
} else {
buf.sortWith((x, y) => x._1 > y._1).iterator
buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator
}
}, preservesPartitioning = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.Array
import scala.reflect._

private[spark] object CollectionsUtils {
def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = {
def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = {
classTag[K] match {
case ClassTag.Float =>
(l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
Expand Down

0 comments on commit 35798a8

Please sign in to comment.