Skip to content

Commit

Permalink
Remove all return statements in withScope
Browse files Browse the repository at this point in the history
The closure cleaner doesn't like these statements, for a good
reason.
  • Loading branch information
Andrew Or committed Apr 29, 2015
1 parent d19c4da commit 6e2cfea
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 28 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Execute a block of code in a scope.
* All new RDDs created in this body will be part of the same scope.
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body)

Expand Down
58 changes: 30 additions & 28 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ abstract class RDD[T: ClassTag](
/**
* Execute a block of code in a scope.
* All new RDDs created in this body will be part of the same scope.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[spark] def withScope[U](body: => U): U = RDDScope.withScope[U](sc)(body)

Expand Down Expand Up @@ -1199,38 +1201,38 @@ abstract class RDD[T: ClassTag](
*/
def take(num: Int): Array[T] = withScope {
if (num == 0) {
return new Array[T](0)
}

val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
// We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
// We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
}
}
}

val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
}

buf.toArray
buf.toArray
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDDScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private[spark] object RDDScope {
/**
* Execute the given body such that all RDDs created in this body will have the same scope.
* The name of the scope will be the name of the method that immediately encloses this one.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
Expand All @@ -68,6 +70,8 @@ private[spark] object RDDScope {
* If nesting is allowed, this concatenates the previous scope with the new one in a way that
* signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to
* this method executed in the body will have no effect.
*
* Note: Return statements are NOT allowed in body.
*/
private[spark] def withScope[T](
sc: SparkContext,
Expand Down

0 comments on commit 6e2cfea

Please sign in to comment.