From b1f0fd16d55612226e211b4e0b469cb9a573ff42 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sun, 3 May 2015 18:29:09 -0700 Subject: [PATCH] Rename OperatorScope -> RDDOperationScope --- .../apache/spark/ui/static/spark-dag-viz.js | 20 +++++++------- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../main/scala/org/apache/spark/rdd/RDD.scala | 10 +++---- ...torScope.scala => RDDOperationScope.scala} | 27 ++++++++++--------- .../org/apache/spark/storage/RDDInfo.scala | 4 +-- .../scala/org/apache/spark/ui/SparkUI.scala | 6 ++--- .../spark/ui/scope/RDDOperationGraph.scala | 6 ++--- ....scala => RDDOperationGraphListener.scala} | 2 +- .../org/apache/spark/util/JsonProtocol.scala | 4 +-- .../apache/spark/util/JsonProtocolSuite.scala | 4 +-- 10 files changed, 45 insertions(+), 42 deletions(-) rename core/src/main/scala/org/apache/spark/rdd/{OperatorScope.scala => RDDOperationScope.scala} (83%) rename core/src/main/scala/org/apache/spark/ui/scope/{OperationGraphListener.scala => RDDOperationGraphListener.scala} (96%) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index c304d15cdffa4..41758fc16a72a 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -20,22 +20,22 @@ * * This DAG describes the relationships between * (1) an RDD and its dependencies, - * (2) an RDD and its operator scopes, and - * (3) an RDD's operator scopes and the stage / job hierarchy + * (2) an RDD and its operation scopes, and + * (3) an RDD's operation scopes and the stage / job hierarchy * - * An operator scope is a general, named code block representing an operation - * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operator + * An operation scope is a general, named code block representing an operation + * that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation * scope can be nested inside of other scopes if the corresponding RDD operation - * invokes other such operations (for more detail, see o.a.s.rdd.OperatorScope). + * invokes other such operations (for more detail, see o.a.s.rdd.operationScope). * - * A stage may include one or more operator scopes if the RDD operations are + * A stage may include one or more operation scopes if the RDD operations are * streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)). - * On the flip side, an operator scope may also include one or many stages, + * On the flip side, an operation scope may also include one or many stages, * or even jobs if the RDD operation is higher level than Spark's scheduling * primitives (e.g. take, any SQL query). * * In the visualization, an RDD is expressed as a node, and its dependencies - * as directed edges (from parent to child). Operator scopes, stages, and + * as directed edges (from parent to child). operation scopes, stages, and * jobs are expressed as clusters that may contain one or many nodes. These * clusters may be nested inside of each other in the scenarios described * above. @@ -54,7 +54,7 @@ var VizConstants = { rddColor: "#444444", stageColor: "#FFDDEE", - operatorScopeColor: "#AADFFF", + operationScopeColor: "#AADFFF", clusterLabelColor: "#888888", edgeColor: "#444444", edgeWidth: "1.5px", @@ -240,7 +240,7 @@ function renderDot(dot, container) { function styleDagViz(forJob) { graphContainer().selectAll("svg g.cluster rect") .style("fill", "white") - .style("stroke", VizConstants.operatorScopeColor) + .style("stroke", VizConstants.operationScopeColor) .style("stroke-width", "4px") .style("stroke-opacity", "0.5"); graphContainer().selectAll("svg g.cluster text") diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f5d7729c3dd90..95f2d19f04f43 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -661,11 +661,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * Execute a block of code in a scope such that all new RDDs created in this body will - * be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}. + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. * * Note: Return statements are NOT allowed in the given body. */ - private def withScope[U](body: => U): U = OperatorScope.withScope[U](this)(body) + private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body) // Methods for creating RDDs diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index da3638c2519d3..4dbfd60094336 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -279,11 +279,11 @@ abstract class RDD[T: ClassTag]( /** * Execute a block of code in a scope such that all new RDDs created in this body will - * be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}. + * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. * * Note: Return statements are NOT allowed in the given body. */ - private[spark] def withScope[U](body: => U): U = OperatorScope.withScope[U](sc)(body) + private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body) // Transformations (return a new RDD) @@ -1453,11 +1453,11 @@ abstract class RDD[T: ClassTag]( * The scope associated with the operation that created this RDD. * * This is more flexible than the call site and can be defined hierarchically. For more - * detail, see the documentation of {{OperatorScope}}. This scope is not defined if the + * detail, see the documentation of {{RDDOperationScope}}. This scope is not defined if the * user instantiates this RDD himself without using any Spark operations. */ - @transient private[spark] val scope: Option[OperatorScope] = { - Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(OperatorScope.fromJson) + @transient private[spark] val scope: Option[RDDOperationScope] = { + Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson) } private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/rdd/OperatorScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala similarity index 83% rename from core/src/main/scala/org/apache/spark/rdd/OperatorScope.scala rename to core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala index 056b111e7a193..dbb8a8efa404a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OperatorScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -32,17 +32,20 @@ import org.apache.spark.SparkContext * Examples include, but will not be limited to, existing RDD operations, such as textFile, * reduceByKey, and treeAggregate. * - * An operator scope may be nested in other scopes. For instance, a SQL query may enclose + * An operation scope may be nested in other scopes. For instance, a SQL query may enclose * scopes associated with the public RDD APIs it uses under the hood. * - * There is no particular relationship between an operator scope and a stage or a job. + * There is no particular relationship between an operation scope and a stage or a job. * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take). */ -private[spark] class OperatorScope(val name: String, parent: Option[OperatorScope] = None) { - val id: Int = OperatorScope.nextScopeId() +private[spark] class RDDOperationScope( + val name: String, + parent: Option[RDDOperationScope] = None) { + + val id: Int = RDDOperationScope.nextScopeId() def toJson: String = { - OperatorScope.jsonMapper.writeValueAsString(this) + RDDOperationScope.jsonMapper.writeValueAsString(this) } /** @@ -50,7 +53,7 @@ private[spark] class OperatorScope(val name: String, parent: Option[OperatorScop * The result is ordered from the outermost scope (eldest ancestor) to this scope. */ @JsonIgnore - def getAllScopes: Seq[OperatorScope] = { + def getAllScopes: Seq[RDDOperationScope] = { parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this) } } @@ -59,15 +62,15 @@ private[spark] class OperatorScope(val name: String, parent: Option[OperatorScop * A collection of utility methods to construct a hierarchical representation of RDD scopes. * An RDD scope tracks the series of operations that created a given RDD. */ -private[spark] object OperatorScope { +private[spark] object RDDOperationScope { private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule) private val scopeCounter = new AtomicInteger(0) - def fromJson(s: String): OperatorScope = { - jsonMapper.readValue(s, classOf[OperatorScope]) + def fromJson(s: String): RDDOperationScope = { + jsonMapper.readValue(s, classOf[RDDOperationScope]) } - /** Return a globally unique operator scope ID. */ + /** Return a globally unique operation scope ID. */ def nextScopeId(): Int = scopeCounter.getAndIncrement /** @@ -100,12 +103,12 @@ private[spark] object OperatorScope { val scopeKey = SparkContext.RDD_SCOPE_KEY val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY val oldScopeJson = sc.getLocalProperty(scopeKey) - val oldScope = Option(oldScopeJson).map(OperatorScope.fromJson) + val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) val oldNoOverride = sc.getLocalProperty(noOverrideKey) try { // Set the scope only if the higher level caller allows us to do so if (sc.getLocalProperty(noOverrideKey) == null) { - sc.setLocalProperty(scopeKey, new OperatorScope(name, oldScope).toJson) + sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson) } // Optionally disallow the child body to override our scope if (!allowNesting) { diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index cc2132b17d7b3..251cda6c649d6 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -18,7 +18,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.{OperatorScope, RDD} +import org.apache.spark.rdd.{RDDOperationScope, RDD} import org.apache.spark.util.Utils @DeveloperApi @@ -28,7 +28,7 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], - val scope: Option[OperatorScope] = None) + val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { var numCachedPartitions = 0 diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index c4d6a2ba34bcc..a5271f0574e6c 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab} import org.apache.spark.ui.storage.{StorageListener, StorageTab} -import org.apache.spark.ui.scope.OperationGraphListener +import org.apache.spark.ui.scope.RDDOperationGraphListener /** * Top level user interface for a Spark application. @@ -39,7 +39,7 @@ private[spark] class SparkUI private ( val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, - val operationGraphListener: OperationGraphListener, + val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -149,7 +149,7 @@ private[spark] object SparkUI { val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) - val operationGraphListener = new OperationGraphListener(conf) + val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 65941349cdc9c..7547f071ccc02 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -48,7 +48,7 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int) /** * A cluster that groups nodes together in an RDDOperationGraph. * - * This represents any grouping of RDDs, including operator scopes (e.g. textFile, flatMap), + * This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap), * stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters. */ private[ui] class RDDOperationCluster(val id: String, val name: String) { @@ -72,7 +72,7 @@ private[ui] object RDDOperationGraph extends Logging { * Each node represents an RDD, and each edge represents a dependency between two RDDs pointing * from the parent to the child. * - * This does not currently merge common operator scopes across stages. This may be worth + * This does not currently merge common operation scopes across stages. This may be worth * supporting in the future if we decide to group certain stages within the same job under * a common scope (e.g. part of a SQL query). */ @@ -87,7 +87,7 @@ private[ui] object RDDOperationGraph extends Logging { { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) - // Find nodes, edges, and operator scopes that belong to this stage + // Find nodes, edges, and operation scopes that belong to this stage stage.rddInfos.foreach { rdd => edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) } val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name)) diff --git a/core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala similarity index 96% rename from core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala rename to core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala index ffe0cb814956e..2884a49f31122 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala @@ -26,7 +26,7 @@ import org.apache.spark.ui.SparkUI /** * A SparkListener that constructs a DAG of RDD operations. */ -private[ui] class OperationGraphListener(conf: SparkConf) extends SparkListener { +private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener { private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]] private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph] private val stageIds = new mutable.ArrayBuffer[Int] diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 0fb457aeae689..5d4f380963584 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -30,7 +30,7 @@ import org.json4s.JsonAST._ import org.apache.spark._ import org.apache.spark.executor._ -import org.apache.spark.rdd.OperatorScope +import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -795,7 +795,7 @@ private[spark] object JsonProtocol { def rddInfoFromJson(json: JValue): RDDInfo = { val rddId = (json \ "RDD ID").extract[Int] val name = (json \ "Name").extract[String] - val scope = Utils.jsonOption(json \ "Scope").map(_.extract[OperatorScope]) + val scope = Utils.jsonOption(json \ "Scope").map(_.extract[RDDOperationScope]) val parentIds = Utils.jsonOption(json \ "Parent IDs") .map { l => l.extract[List[JValue]].map(_.extract[Int]) } .getOrElse(Seq.empty) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index e04167a1442b6..0c5221d10d79d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.FunSuite import org.apache.spark._ import org.apache.spark.executor._ -import org.apache.spark.rdd.OperatorScope +import org.apache.spark.rdd.RDDOperationScope import org.apache.spark.scheduler._ import org.apache.spark.storage._ @@ -327,7 +327,7 @@ class JsonProtocolSuite extends FunSuite { test("RDDInfo backward compatibility (scope, parent IDs)") { // Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties val rddInfo = new RDDInfo( - 1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new OperatorScope("fable"))) + 1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable"))) val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) .removeField({ _._1 == "Parent IDs"}) .removeField({ _._1 == "Scope"})