Skip to content

Commit

Permalink
Rename OperatorScope -> RDDOperationScope
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 4, 2015
1 parent 31aae06 commit b1f0fd1
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 42 deletions.
20 changes: 10 additions & 10 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@
*
* This DAG describes the relationships between
* (1) an RDD and its dependencies,
* (2) an RDD and its operator scopes, and
* (3) an RDD's operator scopes and the stage / job hierarchy
* (2) an RDD and its operation scopes, and
* (3) an RDD's operation scopes and the stage / job hierarchy
*
* An operator scope is a general, named code block representing an operation
* that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operator
* An operation scope is a general, named code block representing an operation
* that instantiates RDDs (e.g. filter, textFile, reduceByKey). An operation
* scope can be nested inside of other scopes if the corresponding RDD operation
* invokes other such operations (for more detail, see o.a.s.rdd.OperatorScope).
* invokes other such operations (for more detail, see o.a.s.rdd.operationScope).
*
* A stage may include one or more operator scopes if the RDD operations are
* A stage may include one or more operation scopes if the RDD operations are
* streamlined into one stage (e.g. rdd.map(...).filter(...).flatMap(...)).
* On the flip side, an operator scope may also include one or many stages,
* On the flip side, an operation scope may also include one or many stages,
* or even jobs if the RDD operation is higher level than Spark's scheduling
* primitives (e.g. take, any SQL query).
*
* In the visualization, an RDD is expressed as a node, and its dependencies
* as directed edges (from parent to child). Operator scopes, stages, and
* as directed edges (from parent to child). operation scopes, stages, and
* jobs are expressed as clusters that may contain one or many nodes. These
* clusters may be nested inside of each other in the scenarios described
* above.
Expand All @@ -54,7 +54,7 @@
var VizConstants = {
rddColor: "#444444",
stageColor: "#FFDDEE",
operatorScopeColor: "#AADFFF",
operationScopeColor: "#AADFFF",
clusterLabelColor: "#888888",
edgeColor: "#444444",
edgeWidth: "1.5px",
Expand Down Expand Up @@ -240,7 +240,7 @@ function renderDot(dot, container) {
function styleDagViz(forJob) {
graphContainer().selectAll("svg g.cluster rect")
.style("fill", "white")
.style("stroke", VizConstants.operatorScopeColor)
.style("stroke", VizConstants.operationScopeColor)
.style("stroke-width", "4px")
.style("stroke-opacity", "0.5");
graphContainer().selectAll("svg g.cluster text")
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}.
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
*
* Note: Return statements are NOT allowed in the given body.
*/
private def withScope[U](body: => U): U = OperatorScope.withScope[U](this)(body)
private def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body)

// Methods for creating RDDs

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,11 @@ abstract class RDD[T: ClassTag](

/**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.OperatorScope}}.
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[spark] def withScope[U](body: => U): U = OperatorScope.withScope[U](sc)(body)
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)

// Transformations (return a new RDD)

Expand Down Expand Up @@ -1453,11 +1453,11 @@ abstract class RDD[T: ClassTag](
* The scope associated with the operation that created this RDD.
*
* This is more flexible than the call site and can be defined hierarchically. For more
* detail, see the documentation of {{OperatorScope}}. This scope is not defined if the
* detail, see the documentation of {{RDDOperationScope}}. This scope is not defined if the
* user instantiates this RDD himself without using any Spark operations.
*/
@transient private[spark] val scope: Option[OperatorScope] = {
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(OperatorScope.fromJson)
@transient private[spark] val scope: Option[RDDOperationScope] = {
Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
}

private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,28 @@ import org.apache.spark.SparkContext
* Examples include, but will not be limited to, existing RDD operations, such as textFile,
* reduceByKey, and treeAggregate.
*
* An operator scope may be nested in other scopes. For instance, a SQL query may enclose
* An operation scope may be nested in other scopes. For instance, a SQL query may enclose
* scopes associated with the public RDD APIs it uses under the hood.
*
* There is no particular relationship between an operator scope and a stage or a job.
* There is no particular relationship between an operation scope and a stage or a job.
* A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
*/
private[spark] class OperatorScope(val name: String, parent: Option[OperatorScope] = None) {
val id: Int = OperatorScope.nextScopeId()
private[spark] class RDDOperationScope(
val name: String,
parent: Option[RDDOperationScope] = None) {

val id: Int = RDDOperationScope.nextScopeId()

def toJson: String = {
OperatorScope.jsonMapper.writeValueAsString(this)
RDDOperationScope.jsonMapper.writeValueAsString(this)
}

/**
* Return a list of scopes that this scope is a part of, including this scope itself.
* The result is ordered from the outermost scope (eldest ancestor) to this scope.
*/
@JsonIgnore
def getAllScopes: Seq[OperatorScope] = {
def getAllScopes: Seq[RDDOperationScope] = {
parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
}
}
Expand All @@ -59,15 +62,15 @@ private[spark] class OperatorScope(val name: String, parent: Option[OperatorScop
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
*/
private[spark] object OperatorScope {
private[spark] object RDDOperationScope {
private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val scopeCounter = new AtomicInteger(0)

def fromJson(s: String): OperatorScope = {
jsonMapper.readValue(s, classOf[OperatorScope])
def fromJson(s: String): RDDOperationScope = {
jsonMapper.readValue(s, classOf[RDDOperationScope])
}

/** Return a globally unique operator scope ID. */
/** Return a globally unique operation scope ID. */
def nextScopeId(): Int = scopeCounter.getAndIncrement

/**
Expand Down Expand Up @@ -100,12 +103,12 @@ private[spark] object OperatorScope {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(OperatorScope.fromJson)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
// Set the scope only if the higher level caller allows us to do so
if (sc.getLocalProperty(noOverrideKey) == null) {
sc.setLocalProperty(scopeKey, new OperatorScope(name, oldScope).toJson)
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.storage

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{OperatorScope, RDD}
import org.apache.spark.rdd.{RDDOperationScope, RDD}
import org.apache.spark.util.Utils

@DeveloperApi
Expand All @@ -28,7 +28,7 @@ class RDDInfo(
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
val scope: Option[OperatorScope] = None)
val scope: Option[RDDOperationScope] = None)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
import org.apache.spark.ui.scope.OperationGraphListener
import org.apache.spark.ui.scope.RDDOperationGraphListener

/**
* Top level user interface for a Spark application.
Expand All @@ -39,7 +39,7 @@ private[spark] class SparkUI private (
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
val operationGraphListener: OperationGraphListener,
val operationGraphListener: RDDOperationGraphListener,
var appName: String,
val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
Expand Down Expand Up @@ -149,7 +149,7 @@ private[spark] object SparkUI {
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
val operationGraphListener = new OperationGraphListener(conf)
val operationGraphListener = new RDDOperationGraphListener(conf)

listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
/**
* A cluster that groups nodes together in an RDDOperationGraph.
*
* This represents any grouping of RDDs, including operator scopes (e.g. textFile, flatMap),
* This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
*/
private[ui] class RDDOperationCluster(val id: String, val name: String) {
Expand All @@ -72,7 +72,7 @@ private[ui] object RDDOperationGraph extends Logging {
* Each node represents an RDD, and each edge represents a dependency between two RDDs pointing
* from the parent to the child.
*
* This does not currently merge common operator scopes across stages. This may be worth
* This does not currently merge common operation scopes across stages. This may be worth
* supporting in the future if we decide to group certain stages within the same job under
* a common scope (e.g. part of a SQL query).
*/
Expand All @@ -87,7 +87,7 @@ private[ui] object RDDOperationGraph extends Logging {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)

// Find nodes, edges, and operator scopes that belong to this stage
// Find nodes, edges, and operation scopes that belong to this stage
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.ui.SparkUI
/**
* A SparkListener that constructs a DAG of RDD operations.
*/
private[ui] class OperationGraphListener(conf: SparkConf) extends SparkListener {
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
private val stageIds = new mutable.ArrayBuffer[Int]
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.json4s.JsonAST._

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.OperatorScope
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
import org.apache.spark.storage._

Expand Down Expand Up @@ -795,7 +795,7 @@ private[spark] object JsonProtocol {
def rddInfoFromJson(json: JValue): RDDInfo = {
val rddId = (json \ "RDD ID").extract[Int]
val name = (json \ "Name").extract[String]
val scope = Utils.jsonOption(json \ "Scope").map(_.extract[OperatorScope])
val scope = Utils.jsonOption(json \ "Scope").map(_.extract[RDDOperationScope])
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.scalatest.FunSuite

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.OperatorScope
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
import org.apache.spark.storage._

Expand Down Expand Up @@ -327,7 +327,7 @@ class JsonProtocolSuite extends FunSuite {
test("RDDInfo backward compatibility (scope, parent IDs)") {
// Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties
val rddInfo = new RDDInfo(
1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new OperatorScope("fable")))
1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable")))
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
.removeField({ _._1 == "Parent IDs"})
.removeField({ _._1 == "Scope"})
Expand Down

0 comments on commit b1f0fd1

Please sign in to comment.