Skip to content

Commit

Permalink
Add a few missing scopes to certain RDD methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 17, 2015
1 parent 6b3403b commit a9ed4f9
Showing 1 changed file with 14 additions and 6 deletions.
20 changes: 14 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(): RDD[T] = distinct(partitions.length)
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}

/**
* Return a new RDD that has exactly numPartitions partitions.
Expand Down Expand Up @@ -429,7 +431,7 @@ abstract class RDD[T: ClassTag](
def takeSample(
withReplacement: Boolean,
num: Int,
seed: Long = Utils.random.nextLong): Array[T] = {
seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0

if (num < 0) {
Expand Down Expand Up @@ -487,7 +489,9 @@ abstract class RDD[T: ClassTag](
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
*/
def ++(other: RDD[T]): RDD[T] = this.union(other)
def ++(other: RDD[T]): RDD[T] = withScope {
this.union(other)
}

/**
* Return this RDD sorted by the given key function.
Expand Down Expand Up @@ -567,8 +571,9 @@ abstract class RDD[T: ClassTag](
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
Expand All @@ -579,8 +584,11 @@ abstract class RDD[T: ClassTag](
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy(f, new HashPartitioner(numPartitions))
}

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
Expand Down Expand Up @@ -739,7 +747,7 @@ abstract class RDD[T: ClassTag](
mapPartitionsWithIndex { (index, iter) =>
val a = constructA(index)
iter.map(t => {f(t, a); t})
}.foreach(_ => {})
}
}

/**
Expand Down

0 comments on commit a9ed4f9

Please sign in to comment.