Skip to content

Commit

Permalink
Connect RDDs across stages + update style
Browse files Browse the repository at this point in the history
This requires us to track incoming and outgoing edges in each
stage on the backend, and render the connecting edges manually
ourselves in d3.
  • Loading branch information
Andrew Or committed May 1, 2015
1 parent ab91416 commit 5c7ce16
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 53 deletions.
192 changes: 149 additions & 43 deletions core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
* limitations under the License.
*/

var VizDefaults = {
stageVizOffset: 160
var VizConstants = {
stageVizOffset: 160,
rddScopeColor: "#AADFFF",
rddDotColor: "#444444",
stageScopeColor: "#FFDDEE",
scopeLabelColor: "#888888",
edgeColor: "#444444",
edgeWidth: "1.5px"
};

/*
Expand All @@ -30,7 +36,7 @@ function toggleShowViz(forJob) {
if (show) {
var shouldRender = d3.select("#viz-graph svg").empty();
if (shouldRender) {
renderViz();
renderViz(forJob);
styleViz(forJob);
}
$("#viz-graph").show();
Expand All @@ -49,7 +55,7 @@ function toggleShowViz(forJob) {
* http://github.com/andrewor14/dagre-d3/dist/dagre-d3.js. For more detail, please
* track the changes in that project after it was forked.
*/
function renderViz() {
function renderViz(forJob) {

// If there is not a dot file to render, report error
if (d3.select("#viz-dot-files").empty()) {
Expand All @@ -61,18 +67,42 @@ function renderViz() {

var svg = d3.select("#viz-graph").append("svg");

// Each div in #viz-dot-files stores the content of one dot file
// On the job page, the visualization for each stage will rendered separately
// Thus, we will need to render the edges that cross stages later on our own
var crossStageEdges = [];

// Each div child in #viz-dot-files stores the content of one dot file
// Each dot file is used to generate the visualization for a stage
d3.selectAll("#viz-dot-files div").each(function(d, i) {
d3.selectAll("#viz-dot-files > div").each(function(d, i) {
var div = d3.select(this);
var stageId = div.attr("name");
var dot = div.text();
var dot = div.select("div.dot-file").text();
var container = svg.append("g").attr("id", "graph_" + stageId);
// Move the container so it doesn't overlap with the existing ones
container.attr("transform", "translate(" + VizDefaults.stageVizOffset * i + ", 0)");
container.attr("transform", "translate(" + VizConstants.stageVizOffset * i + ", 0)");
renderDot(dot, container);
// If there are any incoming edges into this graph, keep track of them to
// render them separately later (job page only). Note that we cannot draw
// them now because we need to put these edges in a container that is on
// top of all stage graphs.
if (forJob) {
div.selectAll("div.incoming-edge").each(function(v) {
var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4]
crossStageEdges.push(edge);
});
}
});

// Time to draw cross stage edges (job page only)!
if (crossStageEdges.length > 0 && forJob) {
var container = svg.append("g").attr("id", "cross-stage-edges");
for (var i = 0; i < crossStageEdges.length; i++) {
var fromId = "node_" + crossStageEdges[i][0];
var toId = "node_" + crossStageEdges[i][1];
connect(fromId, toId, container);
}
}

// Set the appropriate SVG dimensions to ensure that all elements are displayed
var svgMargin = 20;
var boundingBox = svg.node().getBBox();
Expand Down Expand Up @@ -107,9 +137,7 @@ function renderViz() {
svg.attr("viewBox", newViewBox);
}

/*
*
*/
/* Render the dot file as an SVG in the given container. */
function renderDot(dot, container) {
var escaped_dot = dot
.replace(/&lt;/g, "<")
Expand All @@ -120,56 +148,134 @@ function renderDot(dot, container) {
renderer(container, g);
}

/*
* Style the visualization we just rendered.
* We apply a different style depending on whether this is a stage or a job.
*/
/* Style the visualization we just rendered. */
function styleViz(forJob) {
d3.selectAll("svg g.cluster rect")
.style("fill", "none")
.style("stroke", VizConstants.rddScopeColor)
.style("stroke-width", "4px")
.style("stroke-opacity", "0.5");
d3.selectAll("svg g.cluster text")
.attr("fill", VizConstants.scopeLabelColor)
.attr("font-size", "11px")
d3.selectAll("svg path")
.style("stroke", VizConstants.edgeColor)
.style("stroke-width", VizConstants.edgeWidth);
styleEdgeArrow();

// Apply a different color to stage clusters
d3.selectAll("svg g.cluster")
.filter(function() {
var name = d3.select(this).attr("name");
return name && name.indexOf("Stage") > -1;
})
.select("rect")
.style("stroke", VizConstants.stageScopeColor);

// Apply any job or stage specific styles
if (forJob) {
styleJobViz();
} else {
styleStageViz();
}
}

/*
* Put an arrow at the end of every edge.
* We need to do this because we manually render some edges ourselves through d3.
* For these edges, we borrow the arrow marker generated by dagre-d3.
*/
function styleEdgeArrow() {
var dagreD3Marker = d3.select("svg g.edgePaths marker").node()
d3.select("svg")
.append(function() { return dagreD3Marker.cloneNode(true); })
.attr("id", "marker-arrow")
.select("path")
.attr("fill", VizConstants.edgeColor)
.attr("strokeWidth", "0px");
d3.selectAll("svg g > path").attr("marker-end", "url(#marker-arrow)")
d3.selectAll("svg g.edgePaths def").remove(); // We no longer need these
}

/* Apply job-page-specific style to the visualization. */
function styleJobViz() {
d3.selectAll("svg g.cluster rect")
.style("fill", "none")
.style("stroke", "#AADFFF")
.style("stroke-width", "4px")
.style("stroke-opacity", "0.5");
d3.selectAll("svg g.node rect")
.style("fill", "white")
.style("stroke", "black")
.style("stroke-width", "2px")
.style("fill-opacity", "0.8")
.style("stroke-opacity", "0.9");
d3.selectAll("svg g.edgePath path")
.style("stroke", "black")
.style("stroke-width", "2px");
d3.selectAll("svg g.cluster text")
.attr("fill", "#AAAAAA")
.attr("font-size", "11px")
d3.selectAll("svg g.node circle")
.style("fill", VizConstants.rddDotColor);
d3.selectAll("svg g#cross-stage-edges path")
.style("fill", "none");
}

/* Apply stage-page-specific style to the visualization. */
function styleStageViz() {
d3.selectAll("svg g.cluster rect")
.style("fill", "none")
.style("stroke", "#AADFFF")
.style("stroke-width", "4px")
.style("stroke-opacity", "0.5");
d3.selectAll("svg g.node rect")
.style("fill", "white")
.style("stroke", "black")
.style("stroke-width", "2px")
.style("fill-opacity", "0.8")
.style("stroke-opacity", "0.9");
d3.selectAll("svg g.edgePath path")
.style("stroke", "black")
.style("stroke-width", "2px");
d3.selectAll("svg g.cluster text")
.attr("fill", "#AAAAAA")
.attr("font-size", "11px")
}

/*
* (Job page only) Return the absolute position of the
* group element identified by the given selector.
*/
function getAbsolutePosition(groupId) {
var obj = d3.select("#" + groupId).filter("g");
var _x = 0, _y = 0;
while (!obj.empty()) {
var transformText = obj.attr("transform");
var translate = d3.transform(transformText).translate
_x += translate[0];
_y += translate[1];
obj = d3.select(obj.node().parentNode).filter("g")
}
return { x: _x, y: _y };
}

/*
* (Job page only) Connect two group elements with a curved edge.
* This assumes that the path will be styled later.
*/
function connect(fromNodeId, toNodeId, container) {
var from = getAbsolutePosition(fromNodeId);
var to = getAbsolutePosition(toNodeId);

// Account for node radius (this is highly-specific to the job page)
// Otherwise the arrow heads will bleed into the circle itself
var delta = toFloat(d3.select("svg g.nodes circle").attr("r"));
var markerEndDelta = 2; // adjust for arrow stroke width
if (from.x < to.x) {
from.x = from.x + delta;
to.x = to.x - delta - markerEndDelta;
} else if (from.x > to.x) {
from.x = from.x - delta;
to.x = to.x + delta + markerEndDelta;
}

if (from.y == to.y) {
// If they are on the same rank, curve the middle part of the edge
// upward a little to avoid interference with things in between
var points = [
[from.x, from.y],
[from.x + (to.x - from.x) * 0.2, from.y],
[from.x + (to.x - from.x) * 0.3, from.y - 20],
[from.x + (to.x - from.x) * 0.7, from.y - 20],
[from.x + (to.x - from.x) * 0.8, to.y],
[to.x, to.y]
];
} else {
var points = [
[from.x, from.y],
[from.x + (to.x - from.x) * 0.4, from.y],
[from.x + (to.x - from.x) * 0.6, to.y],
[to.x, to.y]
];
}

var line = d3.svg.line().interpolate("basis");
container.append("path")
.datum(points)
.attr("d", line);
}

/* Helper method to convert attributes to numeric values. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener {
{
graphs.map { g =>
<div name={g.rootScope.id} style="display:none">
{VizGraph.makeDotFile(g, forJob)}
<div class="dot-file">{VizGraph.makeDotFile(g, forJob)}</div>
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
</div>
}
}
Expand Down
43 changes: 34 additions & 9 deletions core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@ package org.apache.spark.ui.viz
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.spark.Logging
import org.apache.spark.rdd.RDDScope
import org.apache.spark.scheduler.StageInfo

/**
* A representation of a generic scoped graph used for storing visualization information.
*
* Each graph is defined with a set of edges and a root scope, which may contain children
* nodes and children scopes.
* nodes and children scopes. Additionally, a graph may also have edges that enter or exit
* the graph from nodes that belong to adjacent graphs.
*/
private[ui] case class VizGraph(edges: Seq[VizEdge], rootScope: VizScope)
private[ui] case class VizGraph(
edges: Seq[VizEdge],
outgoingEdges: Seq[VizEdge],
incomingEdges: Seq[VizEdge],
rootScope: VizScope)

private[ui] case class VizNode(id: Int, name: String)
private[ui] case class VizEdge(fromId: Int, toId: Int)

Expand All @@ -46,7 +53,7 @@ private[ui] class VizScope(val id: String, val name: String) {
def attachChildScope(childScope: VizScope): Unit = { _childrenScopes += childScope }
}

private[ui] object VizGraph {
private[ui] object VizGraph extends Logging {

/**
* Construct a VizGraph for a given stage.
Expand All @@ -56,7 +63,7 @@ private[ui] object VizGraph {
* between two RDDs from the parent to the child.
*/
def makeVizGraph(stage: StageInfo): VizGraph = {
val edges = new mutable.HashSet[VizEdge]
val edges = new ListBuffer[VizEdge]
val nodes = new mutable.HashMap[Int, VizNode]
val scopes = new mutable.HashMap[String, VizScope] // scope ID -> viz scope

Expand All @@ -66,7 +73,10 @@ private[ui] object VizGraph {
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
val rootScope = new VizScope(stageScopeId, stageScopeName)

// Find nodes, edges, and children scopes
// Find nodes, edges, and children scopes that belong to this stage. Each node is an RDD
// that lives either directly in the root scope or in one of the children scopes. Each
// children scope represents a level of the RDD scope and must contain at least one RDD.
// Children scopes can be nested if one RDD operation calls another.
stage.rddInfos.foreach { rdd =>
edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) }
val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name))
Expand Down Expand Up @@ -94,10 +104,23 @@ private[ui] object VizGraph {
}
}

// Remove any edges with nodes belonging to other stages so we do not have orphaned nodes
edges.retain { case VizEdge(f, t) => nodes.contains(f) && nodes.contains(t) }
// Classify each edge as internal, outgoing or incoming
val internalEdges = new ListBuffer[VizEdge]
val outgoingEdges = new ListBuffer[VizEdge]
val incomingEdges = new ListBuffer[VizEdge]
edges.foreach { case e: VizEdge =>
val fromThisGraph = nodes.contains(e.fromId)
val toThisGraph = nodes.contains(e.toId)
(fromThisGraph, toThisGraph) match {
case (true, true) => internalEdges += e
case (true, false) => outgoingEdges += e
case (false, true) => incomingEdges += e
// should never happen
case _ => logWarning(s"Found an orphan edge in stage ${stage.stageId}: $e")
}
}

VizGraph(edges.toSeq, rootScope)
VizGraph(internalEdges, outgoingEdges, incomingEdges, rootScope)
}

/**
Expand All @@ -116,7 +139,9 @@ private[ui] object VizGraph {
dotFile.append(s""" ${edge.fromId}->${edge.toId} [lineInterpolate="basis"];\n""")
}
dotFile.append("}")
dotFile.toString()
val result = dotFile.toString()
logDebug(result)
result
}

/** Return the dot representation of a node. */
Expand Down

0 comments on commit 5c7ce16

Please sign in to comment.