Skip to content

Commit

Permalink
Re-implement scopes using closures instead of annotations
Browse files Browse the repository at this point in the history
The problem with annotations is that there is no way to associate
an RDD's scope with another's. This is because the stack trace
simply does not expose enough information for us to associate one
instance of a method invocation with another.

So, we're back to closures. Note that this still suffers from the
same not serializable issue previously discussed, and this is being
fixed in the ClosureCleaner separately.
  • Loading branch information
Andrew Or committed Apr 28, 2015
1 parent 52187fc commit c3bfcae
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 443 deletions.
98 changes: 44 additions & 54 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
Expand Down Expand Up @@ -631,6 +631,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}

/**
* Execute a block of code in a scope.
* All new RDDs created in this body will be part of the same scope.
*/
private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body)

// Methods for creating RDDs

/** Distribute a local Scala collection to form an RDD.
Expand All @@ -641,8 +647,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
@RDDScoped
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withRDDScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
Expand All @@ -651,16 +658,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* This method is identical to `parallelize`.
*/
@RDDScoped
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withRDDScope {
parallelize(seq, numSlices)
}

/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
@RDDScoped
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withRDDScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
Expand All @@ -670,8 +675,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
@RDDScoped
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withRDDScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
Expand Down Expand Up @@ -704,9 +708,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
@RDDScoped
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withRDDScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
Expand Down Expand Up @@ -751,9 +755,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
@RDDScoped
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, PortableDataStream)] = {
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withRDDScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
Expand All @@ -780,9 +784,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
@RDDScoped
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
: RDD[Array[Byte]] = {
def binaryRecords(
path: String,
recordLength: Int,
conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withRDDScope {
assertNotStopped()
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
Expand Down Expand Up @@ -818,14 +823,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope {
assertNotStopped()
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
Expand All @@ -840,14 +843,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand Down Expand Up @@ -876,10 +877,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
Expand All @@ -901,16 +901,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope {
hadoopFile[K, V, F](path, defaultMinPartitions)
}

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
@RDDScoped
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
Expand All @@ -928,13 +927,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withRDDScope {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
Expand Down Expand Up @@ -962,12 +960,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = {
vClass: Class[V]): RDD[(K, V)] = withRDDScope {
assertNotStopped()
// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
Expand All @@ -983,12 +980,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
): RDD[(K, V)] = {
): RDD[(K, V)] = withRDDScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
Expand All @@ -1002,8 +998,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
@RDDScoped
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]): RDD[(K, V)] = withRDDScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
Expand All @@ -1030,12 +1028,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = withRDDScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
Expand All @@ -1054,26 +1050,20 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*/
@RDDScoped
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions
): RDD[T] = {
minPartitions: Int = defaultMinPartitions): RDD[T] = withRDDScope {
assertNotStopped()
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}

@RDDScoped
protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withRDDScope {
new CheckpointRDD[T](this, path)
}

/** Build the union of a list of RDDs. */
@RDDScoped
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withRDDScope {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
Expand All @@ -1083,9 +1073,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
@RDDScoped
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withRDDScope {
union(Seq(first) ++ rest)
}

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this)
Expand Down Expand Up @@ -2039,10 +2029,10 @@ object SparkContext extends Logging {
}

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"

/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
Expand Down
29 changes: 0 additions & 29 deletions core/src/main/scala/org/apache/spark/annotation/RDDScoped.java

This file was deleted.

16 changes: 5 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
import org.apache.spark.annotation.RDDScoped

/**
* A set of asynchronous RDD actions available through an implicit conversion.
Expand All @@ -34,8 +33,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for counting the number of elements in the RDD.
*/
@RDDScoped
def countAsync(): FutureAction[Long] = {
def countAsync(): FutureAction[Long] = self.withScope {
val totalCount = new AtomicLong
self.context.submitJob(
self,
Expand All @@ -55,8 +53,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving all elements of this RDD.
*/
@RDDScoped
def collectAsync(): FutureAction[Seq[T]] = {
def collectAsync(): FutureAction[Seq[T]] = self.withScope {
val results = new Array[Array[T]](self.partitions.length)
self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length),
(index, data) => results(index) = data, results.flatten.toSeq)
Expand All @@ -65,8 +62,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Returns a future for retrieving the first num elements of the RDD.
*/
@RDDScoped
def takeAsync(num: Int): FutureAction[Seq[T]] = {
def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
val f = new ComplexFutureAction[Seq[T]]

f.run {
Expand Down Expand Up @@ -113,8 +109,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to all elements of this RDD.
*/
@RDDScoped
def foreachAsync(f: T => Unit): FutureAction[Unit] = {
def foreachAsync(f: T => Unit): FutureAction[Unit] = self.withScope {
val cleanF = self.context.clean(f)
self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length),
(index, data) => Unit, Unit)
Expand All @@ -123,8 +118,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
/**
* Applies a function f to each partition of this RDD.
*/
@RDDScoped
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = self.withScope {
self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length),
(index, data) => Unit, Unit)
}
Expand Down
Loading

0 comments on commit c3bfcae

Please sign in to comment.