diff --git a/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java b/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java index e161a09cc57a2..565e7d631e48e 100644 --- a/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java +++ b/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java @@ -20,9 +20,10 @@ import java.lang.annotation.*; /** - * Blah blah blah blah blah. - * This should really be private and not displayed on the docs. + * An annotation to mark a method as an RDD operation that encloses its body in a scope. + * This is used to compute the scope of an RDD when it is instantiated. */ +// TODO: This should really be private[spark] @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface RDDScoped {} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e87757aec72ca..e96f425b8a4c2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1450,7 +1450,13 @@ abstract class RDD[T: ClassTag]( /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ @transient private[spark] val creationSite = sc.getCallSite() - /** Dem scopes. Tis null if de scope is not defined'eh. TODO: Make this private[spark]. */ + /** + * The scope in which this RDD is defined. + * + * This is more flexible than the call site and can be defined hierarchically. + * For more detail, see the documentation of {{RDDScope}}. This scope is null if + * the user instantiates this RDD himself without using any Spark operations. + */ @transient private[spark] val scope = RDDScope.getScope.orNull private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") @@ -1602,11 +1608,8 @@ abstract class RDD[T: ClassTag]( firstDebugString(this).mkString("\n") } - override def toString: String = { - val _name = Option(name).map(_ + " ").getOrElse("") - val _scope = Option(scope).map(" (scope: " + _ + ")").getOrElse("") - "%s%s[%d] at %s%s".format(_name, getClass.getSimpleName, id, getCreationSite, _scope) - } + override def toString: String = "%s%s[%d] at %s".format( + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala index f466970b1b883..2557a10c62b4d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala @@ -1,22 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.rdd import java.util.concurrent.atomic.AtomicInteger + import org.apache.spark.annotation.RDDScoped /** - * + * A collection of utility methods to construct a hierarchical representation of RDD scopes. + * An RDD scope tracks the series of operations that created a given RDD. */ private[spark] object RDDScope { - /** - * - */ + // Symbol for delimiting each level of the hierarchy + // e.g. grandparent;parent;child val SCOPE_NESTING_DELIMITER = ";" + + // Symbol for delimiting the scope name from the ID within each level val SCOPE_NAME_DELIMITER = "_" - /** - * - */ + // Counter for generating scope IDs, for differentiating + // between different scopes of the same name + private val scopeCounter = new AtomicInteger(0) + + // Consider only methods that belong to these classes as potential RDD operations + // This is to limit the amount of reflection we do when we traverse the stack trace private val classesWithScopeMethods = Set( "org.apache.spark.SparkContext", "org.apache.spark.rdd.RDD", @@ -25,45 +48,61 @@ private[spark] object RDDScope { ) /** + * Make a globally unique scope ID from the scope name. * - */ - private val scopeIdCounter = new AtomicInteger(0) - - - /** - * + * For instance: + * textFile -> textFile_0 + * textFile -> textFile_1 + * map -> map_2 + * name;with_sensitive;characters -> name-with-sensitive-characters_3 */ private def makeScopeId(name: String): String = { name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") + - SCOPE_NAME_DELIMITER + scopeIdCounter.getAndIncrement + SCOPE_NAME_DELIMITER + scopeCounter.getAndIncrement } /** + * Retrieve the hierarchical scope from the stack trace when an RDD is first created. + * + * This considers all methods marked with the @RDDScoped annotation and chains them together + * in the order they are invoked. Each level in the scope hierarchy represents a unique + * invocation of a particular RDD operation. * + * For example: treeAggregate_0;reduceByKey_1;combineByKey_2;mapPartitions_3 + * This means this RDD is created by the user calling treeAggregate, which calls + * `reduceByKey`, and then `combineByKey`, and then `mapPartitions` to create this RDD. */ private[spark] def getScope: Option[String] = { + + // TODO: Note that this approach does not correctly associate the same invocation across RDDs + // For instance, a call to `textFile` creates both a HadoopRDD and a MapPartitionsRDD, but + // there is no way to associate the invocation across these two RDDs to draw the same scope + // around them. This is because the stack trace simply does not provide information for us + // to make any reasonable association across RDDs. We may need a higher level approach that + // involves setting common variables before and after the RDD operation itself. + val rddScopeNames = Thread.currentThread.getStackTrace // Avoid reflecting on all classes in the stack trace .filter { ste => classesWithScopeMethods.contains(ste.getClassName) } // Return the corresponding method if it has the @RDDScoped annotation .flatMap { ste => - // Note that this is an approximation since we match the method only by name - // Unfortunate we cannot be more precise because the stack trace does not - // include parameter information - Class.forName(ste.getClassName).getDeclaredMethods.find { m => - m.getName == ste.getMethodName && + // Note that this is an approximation since we match the method only by name + // Unfortunate we cannot be more precise because the stack trace does not include + // parameter information + Class.forName(ste.getClassName).getDeclaredMethods.find { m => + m.getName == ste.getMethodName && m.getDeclaredAnnotations.exists { a => a.annotationType() == classOf[RDDScoped] } + } } - } // Use the method name as the scope name for now .map { m => m.getName } // It is common for such methods to internally invoke other methods with the same name - // (e.g. union, reduceByKey). Here we remove adjacent duplicates such that the scope - // chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). This is - // surprisingly difficult to express even in Scala. + // as aliases (e.g. union, reduceByKey). Here we remove adjacent duplicates such that + // the scope chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). + // This is surprisingly difficult to express even in Scala. var prev: String = null val dedupedRddScopeNames = rddScopeNames.flatMap { n => if (n != prev) { diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 7e4b9f77e8375..11fe336c9c3ec 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -42,7 +42,7 @@ class RDDInfo( import Utils.bytesToString val _scope = Option(scope).getOrElse("--") ("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " + - "MemorySize: %s; TachyonSize: %s; DiskSize: %s (scope: %s)").format( + "MemorySize: %s; TachyonSize: %s; DiskSize: %s [scope: %s]").format( name, id, storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize), _scope) } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 8e76c4c788840..e5087bb1fcf18 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -93,6 +93,9 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" + val DEFAULT_POOL_NAME = "default" + val DEFAULT_RETAINED_STAGES = 1000 + val DEFAULT_RETAINED_JOBS = 1000 def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 625596885faa1..33e9baffb7910 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId +import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData._ /** @@ -38,8 +39,6 @@ import org.apache.spark.ui.jobs.UIData._ @DeveloperApi class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { - import JobProgressListener._ - // Define a handful of type aliases so that data structures' types can serve as documentation. // These type aliases are public because they're used in the types of public fields: @@ -82,8 +81,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { // To limit the total memory usage of JobProgressListener, we only track information for a fixed // number of non-active jobs and stages (there is no limit for active jobs and stages): - val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) - val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) + val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) + val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS) // We can test for memory leaks by ensuring that collections that track non-active jobs and // stages do not grow without bound and that collections for active jobs/stages eventually become @@ -284,8 +283,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { activeStages(stage.stageId) = stage pendingStages.remove(stage.stageId) val poolName = Option(stageSubmitted.properties).map { - p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) - }.getOrElse(DEFAULT_POOL_NAME) + p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME) + }.getOrElse(SparkUI.DEFAULT_POOL_NAME) stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) @@ -517,9 +516,3 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { } } - -private object JobProgressListener { - val DEFAULT_POOL_NAME = "default" - val DEFAULT_RETAINED_STAGES = 1000 - val DEFAULT_RETAINED_JOBS = 1000 -} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index f1185638fa0f8..0ca758999244f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -36,12 +36,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListener private val vizListener = parent.vizListener + /** + * Return a DOM element that contains an RDD DAG visualization for this stage. + * If there is no visualization information for this stage, return an empty element. + */ private def renderViz(stageId: Int): Seq[Node] = { val graph = vizListener.getVizGraph(stageId) if (graph.isEmpty) { - return Seq.empty - } - { + Seq.empty + } else { diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala b/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala index ed1d58b46fcd8..6e3fab9669641 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala @@ -20,22 +20,40 @@ package org.apache.spark.ui.viz import scala.collection.mutable import org.apache.spark.scheduler._ +import org.apache.spark.ui.SparkUI /** - * A SparkListener that... + * A SparkListener that constructs a graph of the RDD DAG for each stage. + * This graph will be used for rendering visualization in the UI later. */ private[ui] class VisualizationListener extends SparkListener { - private val graphsByStageId = new mutable.HashMap[Int, VizGraph] // stage ID -> viz graph - /** */ + // A list of stage IDs to track the order in which stages are inserted + private val stageIds = new mutable.ArrayBuffer[Int] + + // Stage ID -> graph metadata for the stage + private val stageIdToGraph = new mutable.HashMap[Int, VizGraph] + + // How many stages to retain graph metadata for + private val retainedStages = + conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES) + + /** Return the graph metadata for the given stage, or None if no such information exists. */ def getVizGraph(stageId: Int): Option[VizGraph] = { - graphsByStageId.get(stageId) + stageIdToGraph.get(stageId) } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized { val stageId = stageCompleted.stageInfo.stageId val rddInfos = stageCompleted.stageInfo.rddInfos val vizGraph = VizGraph.makeVizGraph(rddInfos) - graphsByStageId(stageId) = vizGraph + stageIdToGraph(stageId) = vizGraph + + // Remove metadata for old stages + if (stageIds.size >= retainedStages) { + val toRemove = math.max(retainedStages / 10, 1) + stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) } + stageIds.trimStart(toRemove) + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala index 6bf25599c8a5a..48a9106e4a8b0 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala @@ -23,39 +23,42 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.storage.RDDInfo import org.apache.spark.rdd.RDDScope -/** */ -private[ui] case class VizNode(id: Int, name: String, isCached: Boolean = false) - -/** */ -private[ui] case class VizEdge(fromId: Int, toId: Int) - /** + * A class that represents an RDD DAG for a stage. * + * Each scope can have many children scopes and children nodes, and edges can span multiple scopes. + * Thus, it is sufficient to only keep track of the root scopes and the root nodes in the graph + * as all children scopes and nodes will be transitively included. */ +private[ui] case class VizGraph( + edges: Seq[VizEdge], + rootNodes: Seq[VizNode], + rootScopes: Seq[VizScope]) + +/** A node in the graph that represents an RDD. */ +private[ui] case class VizNode(id: Int, name: String) + +/** An edge in the graph that represents an RDD dependency. */ +private[ui] case class VizEdge(fromId: Int, toId: Int) + +/** A cluster in the graph that represents a level in the RDD scope hierarchy. */ private[ui] class VizScope(val id: String) { private val _childrenNodes = new ArrayBuffer[VizNode] private val _childrenScopes = new ArrayBuffer[VizScope] val name: String = id.split(RDDScope.SCOPE_NAME_DELIMITER).head - def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq def childrenScopes: Seq[VizScope] = _childrenScopes.iterator.toSeq - def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode } def attachChildScope(childScope: VizScope): Unit = { _childrenScopes += childScope } } -/** - * - */ -private[ui] case class VizGraph( - edges: Seq[VizEdge], - rootNodes: Seq[VizNode], - rootScopes: Seq[VizScope]) - private[ui] object VizGraph { /** + * Construct a VizGraph from a list of RDDInfo's. * + * The information needed to construct this graph include the names, + * IDs, and scopes of all RDDs, and the dependencies between these RDDs. */ def makeVizGraph(rddInfos: Seq[RDDInfo]): VizGraph = { val edges = new mutable.HashSet[VizEdge] @@ -75,13 +78,14 @@ private[ui] object VizGraph { // There is no encompassing scope, so this is a root node rootNodes += node } else { - // Attach children scopes and nodes to each scope + // Attach children scopes and nodes to each scope in the hierarchy var previousScope: VizScope = null val scopeIt = rdd.scope.split(RDDScope.SCOPE_NESTING_DELIMITER).iterator while (scopeIt.hasNext) { val scopeId = scopeIt.next() val scope = scopes.getOrElseUpdate(scopeId, new VizScope(scopeId)) - // Only attach this node to the innermost scope + // Only attach this node to the innermost scope so + // the node is not duplicated across all levels if (!scopeIt.hasNext) { scope.attachChildNode(node) } @@ -105,34 +109,35 @@ private[ui] object VizGraph { } /** + * Generate the content of a dot file that describes the specified graph. + * + * Note that this only uses a minimal subset of features available to the DOT specification. + * The style is added in the UI later through post-processing in JavaScript. * + * For the complete specification, see http://www.graphviz.org/Documentation/dotguide.pdf. */ def makeDotFile(graph: VizGraph): String = { val dotFile = new StringBuilder dotFile.append("digraph G {\n") - // graph.rootScopes.foreach { scope => dotFile.append(makeDotSubgraph(scope, " ")) } - // graph.rootNodes.foreach { node => dotFile.append(s" ${makeDotNode(node)};\n") } - // graph.edges.foreach { edge => dotFile.append(s" ${edge.fromId}->${edge.toId};\n") } dotFile.append("}") - println(dotFile.toString()) dotFile.toString() } - /** */ + /** Return the dot representation of a node. */ private def makeDotNode(node: VizNode): String = { s"""${node.id} [label="${node.name}"]""" } - /** */ + /** Return the dot representation of a subgraph recursively. */ private def makeDotSubgraph(scope: VizScope, indent: String): String = { val subgraph = new StringBuilder subgraph.append(indent + s"subgraph cluster${scope.id} {\n")