Skip to content

Commit

Permalink
Implement a programmatic representation of operator scopes
Browse files Browse the repository at this point in the history
Rather than doing our own string parsing magic, serialize the scopes
to and from JSON and operate directly on the class that implements
these scopes.
  • Loading branch information
Andrew Or committed May 4, 2015
1 parent 5a7faf4 commit 83f9c58
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
* (2) an RDD and its operator scopes, and
* (3) an RDD's operator scopes and the stage / job hierarchy
*
* An operator scope is a general, named code block representing a public
* operation that instantiates RDDs (e.g. filter, textFile, reduceByKey).
* An operator scope can be nested inside of other scopes if the corresponding
* RDD operation invokes other such operations.
* An operator scope is a general, named code block representing an operation
* that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operator
* scope can be nested inside of other scopes if the corresponding RDD operation
* invokes other such operations (for more detail, see o.a.s.rdd.OperatorScope).
*
* A stage may include one or more operator scopes if the RDD operations are
* streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)).
Expand Down
48 changes: 24 additions & 24 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -660,12 +660,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* Execute a block of code in a scope.
* All new RDDs created in this body will be part of the same scope.
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}.
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body)
private def withScope[U](body: => U): U = OperatorScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand All @@ -679,7 +679,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withRDDScope {
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
Expand All @@ -690,14 +690,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withRDDScope {
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}

/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withRDDScope {
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
Expand All @@ -709,7 +709,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withRDDScope {
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
Expand Down Expand Up @@ -744,7 +744,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withRDDScope {
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
Expand Down Expand Up @@ -793,7 +793,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@Experimental
def binaryFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withRDDScope {
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
Expand Down Expand Up @@ -825,7 +825,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def binaryRecords(
path: String,
recordLength: Int,
conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withRDDScope {
conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope {
assertNotStopped()
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
Expand Down Expand Up @@ -866,7 +866,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope {
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
Expand All @@ -886,7 +886,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope {
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
Expand Down Expand Up @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope {
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
Expand All @@ -940,14 +940,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope {
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
hadoopFile[K, V, F](path, defaultMinPartitions)
}

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope {
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
Expand All @@ -970,7 +970,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withRDDScope {
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
Expand Down Expand Up @@ -1004,7 +1004,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = withRDDScope {
vClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
Expand All @@ -1024,7 +1024,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
): RDD[(K, V)] = withRDDScope {
): RDD[(K, V)] = withScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
Expand All @@ -1041,7 +1041,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]): RDD[(K, V)] = withRDDScope {
valueClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
Expand Down Expand Up @@ -1072,7 +1072,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
withRDDScope {
withScope {
assertNotStopped()
val kc = kcf()
val vc = vcf()
Expand All @@ -1094,18 +1094,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions): RDD[T] = withRDDScope {
minPartitions: Int = defaultMinPartitions): RDD[T] = withScope {
assertNotStopped()
sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions)
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}

protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withRDDScope {
protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope {
new CheckpointRDD[T](this, path)
}

/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withRDDScope {
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
new PartitionerAwareUnionRDD(this, rdds)
Expand All @@ -1115,7 +1115,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withRDDScope {
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
union(Seq(first) ++ rest)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,58 @@
package org.apache.spark.rdd

import java.util.concurrent.atomic.AtomicInteger

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.SparkContext

/**
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
* A general, named code block representing an operation that instantiates RDDs.
*
* All RDDs instantiated in the corresponding code block will store a pointer to this object.
* Examples include, but will not be limited to, existing RDD operations, such as textFile,
* reduceByKey, and treeAggregate.
*
* An operator scope may be nested in other scopes. For instance, a SQL query may enclose
* scopes associated with the public RDD APIs it uses under the hood.
*
* There is no particular relationship between an operator scope and a stage or a job.
* A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
*/
private[spark] object RDDScope {
private[spark] class OperatorScope(val name: String, parent: Option[OperatorScope] = None) {
val id: Int = OperatorScope.nextScopeId()

// Symbol for delimiting each level of the hierarchy
// e.g. grandparent;parent;child
val SCOPE_NESTING_DELIMITER = ";"
def toJson: String = {
OperatorScope.jsonMapper.writeValueAsString(this)
}

// Symbol for delimiting the scope name from the ID within each level
val SCOPE_NAME_DELIMITER = "_"
/**
* Return a list of scopes that this scope is a part of, including this scope itself.
* The result is ordered from the outermost scope (eldest ancestor) to this scope.
*/
@JsonIgnore
def getAllScopes: Seq[OperatorScope] = {
parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
}
}

// Counter for generating scope IDs, for differentiating
// between different scopes of the same name
/**
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
*/
private[spark] object OperatorScope {
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val scopeCounter = new AtomicInteger(0)

/**
* Make a globally unique scope ID from the scope name.
*
* For instance:
* textFile -> textFile_0
* textFile -> textFile_1
* map -> map_2
* name;with_sensitive;characters -> name-with-sensitive-characters_3
*/
private def makeScopeId(name: String): String = {
name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") +
SCOPE_NAME_DELIMITER + scopeCounter.getAndIncrement
def fromJson(s: String): OperatorScope = {
jsonMapper.readValue(s, classOf[OperatorScope])
}

/** Return a globally unique operator scope ID. */
def nextScopeId(): Int = scopeCounter.getAndIncrement

/**
* Execute the given body such that all RDDs created in this body will have the same scope.
* The name of the scope will be the name of the method that immediately encloses this one.
Expand Down Expand Up @@ -80,14 +99,13 @@ private[spark] object RDDScope {
// Save the old scope to restore it later
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScope = sc.getLocalProperty(scopeKey)
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(OperatorScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
// Set the scope only if the higher level caller allows us to do so
if (sc.getLocalProperty(noOverrideKey) == null) {
val oldScopeId = Option(oldScope).map { _ + SCOPE_NESTING_DELIMITER }.getOrElse("")
val newScopeId = oldScopeId + makeScopeId(name)
sc.setLocalProperty(scopeKey, newScopeId)
sc.setLocalProperty(scopeKey, new OperatorScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
Expand All @@ -96,7 +114,7 @@ private[spark] object RDDScope {
body
} finally {
// Remember to restore any state that was modified before exiting
sc.setLocalProperty(scopeKey, oldScope)
sc.setLocalProperty(scopeKey, oldScopeJson)
sc.setLocalProperty(noOverrideKey, oldNoOverride)
}
}
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,12 @@ abstract class RDD[T: ClassTag](
}

/**
* Execute a block of code in a scope.
* All new RDDs created in this body will be part of the same scope.
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[spark] def withScope[U](body: => U): U = RDDScope.withScope[U](sc)(body)
private[spark] def withScope[U](body: => U): U = OperatorScope.withScope[U](sc)(body)

// Transformations (return a new RDD)

Expand Down Expand Up @@ -1450,13 +1450,15 @@ abstract class RDD[T: ClassTag](
@transient private[spark] val creationSite = sc.getCallSite()

/**
* The scope in which this RDD is defined.
* The scope associated with the operation that created this RDD.
*
* This is more flexible than the call site and can be defined hierarchically.
* For more detail, see the documentation of {{RDDScope}}. This scope is null if
* the user instantiates this RDD himself without using any Spark operations.
* This is more flexible than the call site and can be defined hierarchically. For more
* detail, see the documentation of {{OperatorScope}}. This scope is not defined if the
* user instantiates this RDD himself without using any Spark operations.
*/
@transient private[spark] val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)
@transient private[spark] val scope: Option[OperatorScope] = {
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(OperatorScope.fromJson)
}

private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")

Expand Down
Loading

0 comments on commit 83f9c58

Please sign in to comment.