From 83f9c585e3b6c5df66378a29be71a80bfa9f8372 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 3 May 2015 18:09:42 -0700 Subject: [PATCH] Implement a programmatic representation of operator scopes Rather than doing our own string parsing magic, serialize the scopes to and from JSON and operate directly on the class that implements these scopes. --- .../apache/spark/ui/static/spark-dag-viz.js | 8 +-- .../scala/org/apache/spark/SparkContext.scala | 48 ++++++------- .../{RDDScope.scala => OperatorScope.scala} | 72 ++++++++++++------- .../main/scala/org/apache/spark/rdd/RDD.scala | 18 ++--- .../org/apache/spark/storage/RDDInfo.scala | 14 ++-- .../org/apache/spark/ui/viz/VizGraph.scala | 13 ++-- .../org/apache/spark/util/JsonProtocol.scala | 9 +-- .../apache/spark/storage/StorageSuite.scala | 4 +- .../spark/ui/storage/StorageTabSuite.scala | 14 ++-- .../apache/spark/util/JsonProtocolSuite.scala | 21 ++---- 10 files changed, 116 insertions(+), 105 deletions(-) rename core/src/main/scala/org/apache/spark/rdd/{RDDScope.scala => OperatorScope.scala} (60%) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index c10daab73f5ed..c304d15cdffa4 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -23,10 +23,10 @@ * (2) an RDD and its operator scopes, and * (3) an RDD's operator scopes and the stage / job hierarchy * - * An operator scope is a general, named code block representing a public - * operation that instantiates RDDs (e.g. filter, textFile, reduceByKey). - * An operator scope can be nested inside of other scopes if the corresponding - * RDD operation invokes other such operations. + * An operator scope is a general, named code block representing an operation + * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operator + * scope can be nested inside of other scopes if the corresponding RDD operation + * invokes other such operations (for more detail, see o.a.s.rdd.OperatorScope). * * A stage may include one or more operator scopes if the RDD operations are * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)). diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 714bfb5c216de..f5d7729c3dd90 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -660,12 +660,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * Execute a block of code in a scope. - * All new RDDs created in this body will be part of the same scope. + * Execute a block of code in a scope such that all new RDDs created in this body will + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}. * * Note: Return statements are NOT allowed in the given body. */ - private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body) + private def withScope[U](body: => U): U = OperatorScope.withScope[U](this)(body) // Methods for creating RDDs @@ -679,7 +679,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def parallelize[T: ClassTag]( seq: Seq[T], - numSlices: Int = defaultParallelism): RDD[T] = withRDDScope { + numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } @@ -690,14 +690,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def makeRDD[T: ClassTag]( seq: Seq[T], - numSlices: Int = defaultParallelism): RDD[T] = withRDDScope { + numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } /** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ - def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withRDDScope { + def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) @@ -709,7 +709,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def textFile( path: String, - minPartitions: Int = defaultMinPartitions): RDD[String] = withRDDScope { + minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) @@ -744,7 +744,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def wholeTextFiles( path: String, - minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withRDDScope { + minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking @@ -793,7 +793,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @Experimental def binaryFiles( path: String, - minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withRDDScope { + minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking @@ -825,7 +825,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def binaryRecords( path: String, recordLength: Int, - conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withRDDScope { + conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope { assertNotStopped() conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path, @@ -866,7 +866,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope { + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) @@ -886,7 +886,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope { + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], @@ -940,14 +940,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * copy them using a `map` function. */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile[K, V, F](path, defaultMinPartitions) } /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { newAPIHadoopFile( path, fm.runtimeClass.asInstanceOf[Class[F]], @@ -970,7 +970,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withRDDScope { + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves @@ -1004,7 +1004,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], - vClass: Class[V]): RDD[(K, V)] = withRDDScope { + vClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() // Add necessary security credentials to the JobConf. Required to access secure HDFS. val jconf = new JobConf(conf) @@ -1024,7 +1024,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli keyClass: Class[K], valueClass: Class[V], minPartitions: Int - ): RDD[(K, V)] = withRDDScope { + ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) @@ -1041,7 +1041,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def sequenceFile[K, V]( path: String, keyClass: Class[K], - valueClass: Class[V]): RDD[(K, V)] = withRDDScope { + valueClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) } @@ -1072,7 +1072,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { - withRDDScope { + withScope { assertNotStopped() val kc = kcf() val vc = vcf() @@ -1094,18 +1094,18 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def objectFile[T: ClassTag]( path: String, - minPartitions: Int = defaultMinPartitions): RDD[T] = withRDDScope { + minPartitions: Int = defaultMinPartitions): RDD[T] = withScope { assertNotStopped() sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } - protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withRDDScope { + protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope { new CheckpointRDD[T](this, path) } /** Build the union of a list of RDDs. */ - def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withRDDScope { + def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope { val partitioners = rdds.flatMap(_.partitioner).toSet if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) @@ -1115,7 +1115,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs passed as variable-length arguments. */ - def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withRDDScope { + def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope { union(Seq(first) ++ rest) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala b/core/src/main/scala/org/apache/spark/rdd/OperatorScope.scala similarity index 60% rename from core/src/main/scala/org/apache/spark/rdd/RDDScope.scala rename to core/src/main/scala/org/apache/spark/rdd/OperatorScope.scala index 319e90a66f467..056b111e7a193 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OperatorScope.scala @@ -18,39 +18,58 @@ package org.apache.spark.rdd import java.util.concurrent.atomic.AtomicInteger + +import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + import org.apache.spark.SparkContext /** - * A collection of utility methods to construct a hierarchical representation of RDD scopes. - * An RDD scope tracks the series of operations that created a given RDD. + * A general, named code block representing an operation that instantiates RDDs. + * + * All RDDs instantiated in the corresponding code block will store a pointer to this object. + * Examples include, but will not be limited to, existing RDD operations, such as textFile, + * reduceByKey, and treeAggregate. + * + * An operator scope may be nested in other scopes. For instance, a SQL query may enclose + * scopes associated with the public RDD APIs it uses under the hood. + * + * There is no particular relationship between an operator scope and a stage or a job. + * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take). */ -private[spark] object RDDScope { +private[spark] class OperatorScope(val name: String, parent: Option[OperatorScope] = None) { + val id: Int = OperatorScope.nextScopeId() - // Symbol for delimiting each level of the hierarchy - // e.g. grandparent;parent;child - val SCOPE_NESTING_DELIMITER = ";" + def toJson: String = { + OperatorScope.jsonMapper.writeValueAsString(this) + } - // Symbol for delimiting the scope name from the ID within each level - val SCOPE_NAME_DELIMITER = "_" + /** + * Return a list of scopes that this scope is a part of, including this scope itself. + * The result is ordered from the outermost scope (eldest ancestor) to this scope. + */ + @JsonIgnore + def getAllScopes: Seq[OperatorScope] = { + parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this) + } +} - // Counter for generating scope IDs, for differentiating - // between different scopes of the same name +/** + * A collection of utility methods to construct a hierarchical representation of RDD scopes. + * An RDD scope tracks the series of operations that created a given RDD. + */ +private[spark] object OperatorScope { + private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule) private val scopeCounter = new AtomicInteger(0) - /** - * Make a globally unique scope ID from the scope name. - * - * For instance: - * textFile -> textFile_0 - * textFile -> textFile_1 - * map -> map_2 - * name;with_sensitive;characters -> name-with-sensitive-characters_3 - */ - private def makeScopeId(name: String): String = { - name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") + - SCOPE_NAME_DELIMITER + scopeCounter.getAndIncrement + def fromJson(s: String): OperatorScope = { + jsonMapper.readValue(s, classOf[OperatorScope]) } + /** Return a globally unique operator scope ID. */ + def nextScopeId(): Int = scopeCounter.getAndIncrement + /** * Execute the given body such that all RDDs created in this body will have the same scope. * The name of the scope will be the name of the method that immediately encloses this one. @@ -80,14 +99,13 @@ private[spark] object RDDScope { // Save the old scope to restore it later val scopeKey = SparkContext.RDD_SCOPE_KEY val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY - val oldScope = sc.getLocalProperty(scopeKey) + val oldScopeJson = sc.getLocalProperty(scopeKey) + val oldScope = Option(oldScopeJson).map(OperatorScope.fromJson) val oldNoOverride = sc.getLocalProperty(noOverrideKey) try { // Set the scope only if the higher level caller allows us to do so if (sc.getLocalProperty(noOverrideKey) == null) { - val oldScopeId = Option(oldScope).map { _ + SCOPE_NESTING_DELIMITER }.getOrElse("") - val newScopeId = oldScopeId + makeScopeId(name) - sc.setLocalProperty(scopeKey, newScopeId) + sc.setLocalProperty(scopeKey, new OperatorScope(name, oldScope).toJson) } // Optionally disallow the child body to override our scope if (!allowNesting) { @@ -96,7 +114,7 @@ private[spark] object RDDScope { body } finally { // Remember to restore any state that was modified before exiting - sc.setLocalProperty(scopeKey, oldScope) + sc.setLocalProperty(scopeKey, oldScopeJson) sc.setLocalProperty(noOverrideKey, oldNoOverride) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index dec6872233143..da3638c2519d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -278,12 +278,12 @@ abstract class RDD[T: ClassTag]( } /** - * Execute a block of code in a scope. - * All new RDDs created in this body will be part of the same scope. + * Execute a block of code in a scope such that all new RDDs created in this body will + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}. * * Note: Return statements are NOT allowed in the given body. */ - private[spark] def withScope[U](body: => U): U = RDDScope.withScope[U](sc)(body) + private[spark] def withScope[U](body: => U): U = OperatorScope.withScope[U](sc)(body) // Transformations (return a new RDD) @@ -1450,13 +1450,15 @@ abstract class RDD[T: ClassTag]( @transient private[spark] val creationSite = sc.getCallSite() /** - * The scope in which this RDD is defined. + * The scope associated with the operation that created this RDD. * - * This is more flexible than the call site and can be defined hierarchically. - * For more detail, see the documentation of {{RDDScope}}. This scope is null if - * the user instantiates this RDD himself without using any Spark operations. + * This is more flexible than the call site and can be defined hierarchically. For more + * detail, see the documentation of {{OperatorScope}}. This scope is not defined if the + * user instantiates this RDD himself without using any Spark operations. */ - @transient private[spark] val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) + @transient private[spark] val scope: Option[OperatorScope] = { + Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(OperatorScope.fromJson) + } private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 99e3d13c46003..cc2132b17d7b3 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{OperatorScope, RDD} import org.apache.spark.util.Utils @DeveloperApi @@ -27,8 +27,8 @@ class RDDInfo( val name: String, val numPartitions: Int, var storageLevel: StorageLevel, - val scope: String, - val parentIds: Seq[Int]) + val parentIds: Seq[Int], + val scope: Option[OperatorScope] = None) extends Ordered[RDDInfo] { var numCachedPartitions = 0 @@ -41,12 +41,10 @@ class RDDInfo( override def toString: String = { import Utils.bytesToString - val _scope = Option(scope).getOrElse("--") ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " + - "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s [scope: %s]").format( + "MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s").format( name, id, storageLevel.toString, numCachedPartitions, numPartitions, - bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize), - _scope) + bytesToString(memSize), bytesToString(externalBlockStoreSize), bytesToString(diskSize)) } override def compare(that: RDDInfo): Int = { @@ -58,6 +56,6 @@ private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(rdd.id.toString) val parentIds = rdd.dependencies.map(_.rdd.id) - new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, rdd.scope, parentIds) + new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope) } } diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala index 2b3d8bc70be71..fc562c97fe7d7 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.spark.Logging -import org.apache.spark.rdd.RDDScope +import org.apache.spark.rdd.OperatorScope import org.apache.spark.scheduler.StageInfo /** @@ -53,8 +53,6 @@ private[ui] class VizCluster(val id: String, val name: String) { private val _childrenNodes = new ListBuffer[VizNode] private val _childrenClusters = new ListBuffer[VizCluster] - def this(id: String) { this(id, id.split(RDDScope.SCOPE_NAME_DELIMITER).head) } - def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq def childrenClusters: Seq[VizCluster] = _childrenClusters.iterator.toSeq def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode } @@ -97,9 +95,12 @@ private[ui] object VizGraph extends Logging { } else { // Otherwise, this RDD belongs to an inner cluster, // which may be nested inside of other clusters - val rddClusters = rdd.scope - .split(RDDScope.SCOPE_NESTING_DELIMITER) - .map { scopeId => clusters.getOrElseUpdate(scopeId, new VizCluster(scopeId)) } + val rddScopes = rdd.scope.map { scope => scope.getAllScopes }.getOrElse(Seq.empty) + val rddClusters = rddScopes.map { scope => + val clusterId = scope.name + "_" + scope.id + val clusterName = scope.name + clusters.getOrElseUpdate(clusterId, new VizCluster(clusterId, clusterName)) + } // Build the cluster hierarchy for this RDD rddClusters.sliding(2).foreach { pc => if (pc.size == 2) { diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c8425039b9c6d..0fb457aeae689 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -28,10 +28,11 @@ import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.JsonAST._ +import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.rdd.OperatorScope import org.apache.spark.scheduler._ import org.apache.spark.storage._ -import org.apache.spark._ /** * Serializes SparkListener events to/from JSON. This protocol provides strong backwards- @@ -373,7 +374,7 @@ private[spark] object JsonProtocol { val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList) ("RDD ID" -> rddInfo.id) ~ ("Name" -> rddInfo.name) ~ - ("Scope" -> Option(rddInfo.scope)) ~ + ("Scope" -> rddInfo.scope.map(_.toJson)) ~ ("Parent IDs" -> parentIds) ~ ("Storage Level" -> storageLevel) ~ ("Number of Partitions" -> rddInfo.numPartitions) ~ @@ -794,7 +795,7 @@ private[spark] object JsonProtocol { def rddInfoFromJson(json: JValue): RDDInfo = { val rddId = (json \ "RDD ID").extract[Int] val name = (json \ "Name").extract[String] - val scope = Utils.jsonOption(json \ "Scope").map(_.extract[String]).orNull + val scope = Utils.jsonOption(json \ "Scope").map(_.extract[OperatorScope]) val parentIds = Utils.jsonOption(json \ "Parent IDs") .map { l => l.extract[List[JValue]].map(_.extract[Int]) } .getOrElse(Seq.empty) @@ -807,7 +808,7 @@ private[spark] object JsonProtocol { .getOrElse(json \ "Tachyon Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, scope, parentIds) + val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, scope) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.externalBlockStoreSize = externalBlockStoreSize diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index 4c32d508ff813..17193ddbfd894 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -287,8 +287,8 @@ class StorageSuite extends FunSuite { // For testing StorageUtils.updateRddInfo private def stockRDDInfos: Seq[RDDInfo] = { - val info0 = new RDDInfo(0, "0", 10, memAndDisk, "scoop", Seq(3)) - val info1 = new RDDInfo(1, "1", 3, memAndDisk, "scoop", Seq(4)) + val info0 = new RDDInfo(0, "0", 10, memAndDisk, Seq(3)) + val info1 = new RDDInfo(1, "1", 3, memAndDisk, Seq(4)) Seq(info0, info1) } diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index d35414674e52a..7b38e6d9473e1 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -35,10 +35,10 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { private val none = StorageLevel.NONE private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) private val taskInfo1 = new TaskInfo(1, 1, 1, 1, "big", "cat", TaskLocality.ANY, false) - private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly, "scoop", Seq(10)) - private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly, "scoop", Seq(10)) - private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk, "scoop", Seq(10)) - private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk, "scoop", Seq(10)) + private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly, Seq(10)) + private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly, Seq(10)) + private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk, Seq(10)) + private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk, Seq(10)) private val bm1 = BlockManagerId("big", "dog", 1) before { @@ -71,7 +71,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { assert(storageListener.rddInfoList.size === 2) // Submitting RDDInfos with duplicate IDs does nothing - val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, "scoop", Seq(10)) + val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, Seq(10)) rddInfo0Cached.numCachedPartitions = 1 val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), Seq.empty, "details") bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached)) @@ -168,8 +168,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { test("verify StorageTab contains all cached rdds") { - val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, "scoop", Seq(4)) - val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, "scoop", Seq(4)) + val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, Seq(4)) + val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, Seq(4)) val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details") val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details") val taskMetrics0 = new TaskMetrics diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 20f28cae85ce3..e04167a1442b6 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.FunSuite import org.apache.spark._ import org.apache.spark.executor._ +import org.apache.spark.rdd.OperatorScope import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -325,11 +326,12 @@ class JsonProtocolSuite extends FunSuite { test("RDDInfo backward compatibility (scope, parent IDs)") { // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties - val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, "fable", Seq(1, 6, 8)) + val rddInfo = new RDDInfo( + 1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new OperatorScope("fable"))) val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) - .removeField({ _._1 == "Scope"}) .removeField({ _._1 == "Parent IDs"}) - val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, null, Seq.empty) + .removeField({ _._1 == "Scope"}) + val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq.empty, scope = None) assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson)) } @@ -663,7 +665,7 @@ class JsonProtocolSuite extends FunSuite { } private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { - val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, "layer", Seq(1, 4, 7)) + val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7)) r.numCachedPartitions = c r.memSize = d r.diskSize = e @@ -804,7 +806,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 101, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1190,7 +1191,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 1, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1232,7 +1232,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 2, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1250,7 +1249,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 3, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1292,7 +1290,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 3, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1310,7 +1307,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 4, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1328,7 +1324,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 5, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1370,7 +1365,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 4, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1388,7 +1382,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 5, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1406,7 +1399,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 6, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true, @@ -1424,7 +1416,6 @@ class JsonProtocolSuite extends FunSuite { | { | "RDD ID": 7, | "Name": "mayor", - | "Scope": "layer", | "Parent IDs": [1, 4, 7], | "Storage Level": { | "Use Disk": true,