From 6e2cfeae9db3b05ac836a229e888af1a54e4f9d3 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 28 Apr 2015 17:40:30 -0700 Subject: [PATCH] Remove all return statements in `withScope` The closure cleaner doesn't like these statements, for a good reason. --- .../scala/org/apache/spark/SparkContext.scala | 2 + .../main/scala/org/apache/spark/rdd/RDD.scala | 58 ++++++++++--------- .../scala/org/apache/spark/rdd/RDDScope.scala | 4 ++ 3 files changed, 36 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 32ea23c266f96..ecf9b4260c206 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -643,6 +643,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Execute a block of code in a scope. * All new RDDs created in this body will be part of the same scope. + * + * Note: Return statements are NOT allowed in the given body. */ private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 863fc80f5f669..a1adadbe115d2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -280,6 +280,8 @@ abstract class RDD[T: ClassTag]( /** * Execute a block of code in a scope. * All new RDDs created in this body will be part of the same scope. + * + * Note: Return statements are NOT allowed in the given body. */ private[spark] def withScope[U](body: => U): U = RDDScope.withScope[U](sc)(body) @@ -1199,38 +1201,38 @@ abstract class RDD[T: ClassTag]( */ def take(num: Int): Array[T] = withScope { if (num == 0) { - return new Array[T](0) - } - - val buf = new ArrayBuffer[T] - val totalParts = this.partitions.length - var partsScanned = 0 - while (buf.size < num && partsScanned < totalParts) { - // The number of partitions to try in this iteration. It is ok for this number to be - // greater than totalParts because we actually cap it at totalParts in runJob. - var numPartsToTry = 1 - if (partsScanned > 0) { - // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise, - // interpolate the number of partitions we need to try, but overestimate it by 50%. - // We also cap the estimation in the end. - if (buf.size == 0) { - numPartsToTry = partsScanned * 4 - } else { - // the left side of max is >=1 whenever partsScanned >= 2 - numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) - numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) + new Array[T](0) + } else { + val buf = new ArrayBuffer[T] + val totalParts = this.partitions.length + var partsScanned = 0 + while (buf.size < num && partsScanned < totalParts) { + // The number of partitions to try in this iteration. It is ok for this number to be + // greater than totalParts because we actually cap it at totalParts in runJob. + var numPartsToTry = 1 + if (partsScanned > 0) { + // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise, + // interpolate the number of partitions we need to try, but overestimate it by 50%. + // We also cap the estimation in the end. + if (buf.size == 0) { + numPartsToTry = partsScanned * 4 + } else { + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) + } } - } - val left = num - buf.size - val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) - val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) + val left = num - buf.size + val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) + val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) - res.foreach(buf ++= _.take(num - buf.size)) - partsScanned += numPartsToTry - } + res.foreach(buf ++= _.take(num - buf.size)) + partsScanned += numPartsToTry + } - buf.toArray + buf.toArray + } } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala index b8986acff4263..319e90a66f467 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala @@ -54,6 +54,8 @@ private[spark] object RDDScope { /** * Execute the given body such that all RDDs created in this body will have the same scope. * The name of the scope will be the name of the method that immediately encloses this one. + * + * Note: Return statements are NOT allowed in body. */ private[spark] def withScope[T]( sc: SparkContext, @@ -68,6 +70,8 @@ private[spark] object RDDScope { * If nesting is allowed, this concatenates the previous scope with the new one in a way that * signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to * this method executed in the body will have no effect. + * + * Note: Return statements are NOT allowed in body. */ private[spark] def withScope[T]( sc: SparkContext,