Skip to content

Commit

Permalink
Fill in documentation + miscellaneous minor changes
Browse files Browse the repository at this point in the history
For instance, this adds ability to throw away old stage graphs.
  • Loading branch information
Andrew Or committed Apr 27, 2015
1 parent fe7816f commit 8dd5af2
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import java.lang.annotation.*;

/**
* Blah blah blah blah blah.
* This should really be private and not displayed on the docs.
* An annotation to mark a method as an RDD operation that encloses its body in a scope.
* This is used to compute the scope of an RDD when it is instantiated.
*/
// TODO: This should really be private[spark]
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface RDDScoped {}
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,13 @@ abstract class RDD[T: ClassTag](
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
@transient private[spark] val creationSite = sc.getCallSite()

/** Dem scopes. Tis null if de scope is not defined'eh. TODO: Make this private[spark]. */
/**
* The scope in which this RDD is defined.
*
* This is more flexible than the call site and can be defined hierarchically.
* For more detail, see the documentation of {{RDDScope}}. This scope is null if
* the user instantiates this RDD himself without using any Spark operations.
*/
@transient private[spark] val scope = RDDScope.getScope.orNull

private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
Expand Down Expand Up @@ -1602,11 +1608,8 @@ abstract class RDD[T: ClassTag](
firstDebugString(this).mkString("\n")
}

override def toString: String = {
val _name = Option(name).map(_ + " ").getOrElse("")
val _scope = Option(scope).map(" (scope: " + _ + ")").getOrElse("")
"%s%s[%d] at %s%s".format(_name, getClass.getSimpleName, id, getCreationSite, _scope)
}
override def toString: String = "%s%s[%d] at %s".format(
Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)

def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
Expand Down
85 changes: 62 additions & 23 deletions core/src/main/scala/org/apache/spark/rdd/RDDScope.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.annotation.RDDScoped

/**
*
* A collection of utility methods to construct a hierarchical representation of RDD scopes.
* An RDD scope tracks the series of operations that created a given RDD.
*/
private[spark] object RDDScope {

/**
*
*/
// Symbol for delimiting each level of the hierarchy
// e.g. grandparent;parent;child
val SCOPE_NESTING_DELIMITER = ";"

// Symbol for delimiting the scope name from the ID within each level
val SCOPE_NAME_DELIMITER = "_"

/**
*
*/
// Counter for generating scope IDs, for differentiating
// between different scopes of the same name
private val scopeCounter = new AtomicInteger(0)

// Consider only methods that belong to these classes as potential RDD operations
// This is to limit the amount of reflection we do when we traverse the stack trace
private val classesWithScopeMethods = Set(
"org.apache.spark.SparkContext",
"org.apache.spark.rdd.RDD",
Expand All @@ -25,45 +48,61 @@ private[spark] object RDDScope {
)

/**
* Make a globally unique scope ID from the scope name.
*
*/
private val scopeIdCounter = new AtomicInteger(0)


/**
*
* For instance:
* textFile -> textFile_0
* textFile -> textFile_1
* map -> map_2
* name;with_sensitive;characters -> name-with-sensitive-characters_3
*/
private def makeScopeId(name: String): String = {
name.replace(SCOPE_NESTING_DELIMITER, "-").replace(SCOPE_NAME_DELIMITER, "-") +
SCOPE_NAME_DELIMITER + scopeIdCounter.getAndIncrement
SCOPE_NAME_DELIMITER + scopeCounter.getAndIncrement
}

/**
* Retrieve the hierarchical scope from the stack trace when an RDD is first created.
*
* This considers all methods marked with the @RDDScoped annotation and chains them together
* in the order they are invoked. Each level in the scope hierarchy represents a unique
* invocation of a particular RDD operation.
*
* For example: treeAggregate_0;reduceByKey_1;combineByKey_2;mapPartitions_3
* This means this RDD is created by the user calling treeAggregate, which calls
* `reduceByKey`, and then `combineByKey`, and then `mapPartitions` to create this RDD.
*/
private[spark] def getScope: Option[String] = {

// TODO: Note that this approach does not correctly associate the same invocation across RDDs
// For instance, a call to `textFile` creates both a HadoopRDD and a MapPartitionsRDD, but
// there is no way to associate the invocation across these two RDDs to draw the same scope
// around them. This is because the stack trace simply does not provide information for us
// to make any reasonable association across RDDs. We may need a higher level approach that
// involves setting common variables before and after the RDD operation itself.

val rddScopeNames = Thread.currentThread.getStackTrace
// Avoid reflecting on all classes in the stack trace
.filter { ste => classesWithScopeMethods.contains(ste.getClassName) }
// Return the corresponding method if it has the @RDDScoped annotation
.flatMap { ste =>
// Note that this is an approximation since we match the method only by name
// Unfortunate we cannot be more precise because the stack trace does not
// include parameter information
Class.forName(ste.getClassName).getDeclaredMethods.find { m =>
m.getName == ste.getMethodName &&
// Note that this is an approximation since we match the method only by name
// Unfortunate we cannot be more precise because the stack trace does not include
// parameter information
Class.forName(ste.getClassName).getDeclaredMethods.find { m =>
m.getName == ste.getMethodName &&
m.getDeclaredAnnotations.exists { a =>
a.annotationType() == classOf[RDDScoped]
}
}
}
}
// Use the method name as the scope name for now
.map { m => m.getName }

// It is common for such methods to internally invoke other methods with the same name
// (e.g. union, reduceByKey). Here we remove adjacent duplicates such that the scope
// chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). This is
// surprisingly difficult to express even in Scala.
// as aliases (e.g. union, reduceByKey). Here we remove adjacent duplicates such that
// the scope chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c).
// This is surprisingly difficult to express even in Scala.
var prev: String = null
val dedupedRddScopeNames = rddScopeNames.flatMap { n =>
if (n != prev) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RDDInfo(
import Utils.bytesToString
val _scope = Option(scope).getOrElse("--")
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
"MemorySize: %s; TachyonSize: %s; DiskSize: %s (scope: %s)").format(
"MemorySize: %s; TachyonSize: %s; DiskSize: %s [scope: %s]").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize), _scope)
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String)
private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
val DEFAULT_RETAINED_JOBS = 1000

def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData._

/**
Expand All @@ -38,8 +39,6 @@ import org.apache.spark.ui.jobs.UIData._
@DeveloperApi
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {

import JobProgressListener._

// Define a handful of type aliases so that data structures' types can serve as documentation.
// These type aliases are public because they're used in the types of public fields:

Expand Down Expand Up @@ -82,8 +81,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// To limit the total memory usage of JobProgressListener, we only track information for a fixed
// number of non-active jobs and stages (there is no limit for active jobs and stages):

val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)

// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
Expand Down Expand Up @@ -284,8 +283,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
activeStages(stage.stageId) = stage
pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)

stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
Expand Down Expand Up @@ -517,9 +516,3 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}

}

private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
val DEFAULT_RETAINED_JOBS = 1000
}
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
private val progressListener = parent.progressListener
private val vizListener = parent.vizListener

/**
* Return a DOM element that contains an RDD DAG visualization for this stage.
* If there is no visualization information for this stage, return an empty element.
*/
private def renderViz(stageId: Int): Seq[Node] = {
val graph = vizListener.getVizGraph(stageId)
if (graph.isEmpty) {
return Seq.empty
}
{
Seq.empty
} else {
<div id="viz-dot-file" style="display:none">
{VizGraph.makeDotFile(graph.get)}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,40 @@ package org.apache.spark.ui.viz
import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI

/**
* A SparkListener that...
* A SparkListener that constructs a graph of the RDD DAG for each stage.
* This graph will be used for rendering visualization in the UI later.
*/
private[ui] class VisualizationListener extends SparkListener {
private val graphsByStageId = new mutable.HashMap[Int, VizGraph] // stage ID -> viz graph

/** */
// A list of stage IDs to track the order in which stages are inserted
private val stageIds = new mutable.ArrayBuffer[Int]

// Stage ID -> graph metadata for the stage
private val stageIdToGraph = new mutable.HashMap[Int, VizGraph]

// How many stages to retain graph metadata for
private val retainedStages =
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getVizGraph(stageId: Int): Option[VizGraph] = {
graphsByStageId.get(stageId)
stageIdToGraph.get(stageId)
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stageId = stageCompleted.stageInfo.stageId
val rddInfos = stageCompleted.stageInfo.rddInfos
val vizGraph = VizGraph.makeVizGraph(rddInfos)
graphsByStageId(stageId) = vizGraph
stageIdToGraph(stageId) = vizGraph

// Remove metadata for old stages
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
}
}
Loading

0 comments on commit 8dd5af2

Please sign in to comment.