diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index 109114aedd1bd..637b5c08c839b 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -15,8 +15,14 @@ * limitations under the License. */ -var VizDefaults = { - stageVizOffset: 160 +var VizConstants = { + stageVizOffset: 160, + rddScopeColor: "#AADFFF", + rddDotColor: "#444444", + stageScopeColor: "#FFDDEE", + scopeLabelColor: "#888888", + edgeColor: "#444444", + edgeWidth: "1.5px" }; /* @@ -30,7 +36,7 @@ function toggleShowViz(forJob) { if (show) { var shouldRender = d3.select("#viz-graph svg").empty(); if (shouldRender) { - renderViz(); + renderViz(forJob); styleViz(forJob); } $("#viz-graph").show(); @@ -49,7 +55,7 @@ function toggleShowViz(forJob) { * http://github.com/andrewor14/dagre-d3/dist/dagre-d3.js. For more detail, please * track the changes in that project after it was forked. */ -function renderViz() { +function renderViz(forJob) { // If there is not a dot file to render, report error if (d3.select("#viz-dot-files").empty()) { @@ -61,18 +67,42 @@ function renderViz() { var svg = d3.select("#viz-graph").append("svg"); - // Each div in #viz-dot-files stores the content of one dot file + // On the job page, the visualization for each stage will rendered separately + // Thus, we will need to render the edges that cross stages later on our own + var crossStageEdges = []; + + // Each div child in #viz-dot-files stores the content of one dot file // Each dot file is used to generate the visualization for a stage - d3.selectAll("#viz-dot-files div").each(function(d, i) { + d3.selectAll("#viz-dot-files > div").each(function(d, i) { var div = d3.select(this); var stageId = div.attr("name"); - var dot = div.text(); + var dot = div.select("div.dot-file").text(); var container = svg.append("g").attr("id", "graph_" + stageId); // Move the container so it doesn't overlap with the existing ones - container.attr("transform", "translate(" + VizDefaults.stageVizOffset * i + ", 0)"); + container.attr("transform", "translate(" + VizConstants.stageVizOffset * i + ", 0)"); renderDot(dot, container); + // If there are any incoming edges into this graph, keep track of them to + // render them separately later (job page only). Note that we cannot draw + // them now because we need to put these edges in a container that is on + // top of all stage graphs. + if (forJob) { + div.selectAll("div.incoming-edge").each(function(v) { + var edge = d3.select(this).text().split(","); // e.g. 3,4 => [3, 4] + crossStageEdges.push(edge); + }); + } }); + // Time to draw cross stage edges (job page only)! + if (crossStageEdges.length > 0 && forJob) { + var container = svg.append("g").attr("id", "cross-stage-edges"); + for (var i = 0; i < crossStageEdges.length; i++) { + var fromId = "node_" + crossStageEdges[i][0]; + var toId = "node_" + crossStageEdges[i][1]; + connect(fromId, toId, container); + } + } + // Set the appropriate SVG dimensions to ensure that all elements are displayed var svgMargin = 20; var boundingBox = svg.node().getBBox(); @@ -107,9 +137,7 @@ function renderViz() { svg.attr("viewBox", newViewBox); } -/* - * - */ +/* Render the dot file as an SVG in the given container. */ function renderDot(dot, container) { var escaped_dot = dot .replace(/</g, "<") @@ -120,11 +148,31 @@ function renderDot(dot, container) { renderer(container, g); } -/* - * Style the visualization we just rendered. - * We apply a different style depending on whether this is a stage or a job. - */ +/* Style the visualization we just rendered. */ function styleViz(forJob) { + d3.selectAll("svg g.cluster rect") + .style("fill", "none") + .style("stroke", VizConstants.rddScopeColor) + .style("stroke-width", "4px") + .style("stroke-opacity", "0.5"); + d3.selectAll("svg g.cluster text") + .attr("fill", VizConstants.scopeLabelColor) + .attr("font-size", "11px") + d3.selectAll("svg path") + .style("stroke", VizConstants.edgeColor) + .style("stroke-width", VizConstants.edgeWidth); + styleEdgeArrow(); + + // Apply a different color to stage clusters + d3.selectAll("svg g.cluster") + .filter(function() { + var name = d3.select(this).attr("name"); + return name && name.indexOf("Stage") > -1; + }) + .select("rect") + .style("stroke", VizConstants.stageScopeColor); + + // Apply any job or stage specific styles if (forJob) { styleJobViz(); } else { @@ -132,44 +180,102 @@ function styleViz(forJob) { } } +/* + * Put an arrow at the end of every edge. + * We need to do this because we manually render some edges ourselves through d3. + * For these edges, we borrow the arrow marker generated by dagre-d3. + */ +function styleEdgeArrow() { + var dagreD3Marker = d3.select("svg g.edgePaths marker").node() + d3.select("svg") + .append(function() { return dagreD3Marker.cloneNode(true); }) + .attr("id", "marker-arrow") + .select("path") + .attr("fill", VizConstants.edgeColor) + .attr("strokeWidth", "0px"); + d3.selectAll("svg g > path").attr("marker-end", "url(#marker-arrow)") + d3.selectAll("svg g.edgePaths def").remove(); // We no longer need these +} + +/* Apply job-page-specific style to the visualization. */ function styleJobViz() { - d3.selectAll("svg g.cluster rect") - .style("fill", "none") - .style("stroke", "#AADFFF") - .style("stroke-width", "4px") - .style("stroke-opacity", "0.5"); - d3.selectAll("svg g.node rect") - .style("fill", "white") - .style("stroke", "black") - .style("stroke-width", "2px") - .style("fill-opacity", "0.8") - .style("stroke-opacity", "0.9"); - d3.selectAll("svg g.edgePath path") - .style("stroke", "black") - .style("stroke-width", "2px"); - d3.selectAll("svg g.cluster text") - .attr("fill", "#AAAAAA") - .attr("font-size", "11px") + d3.selectAll("svg g.node circle") + .style("fill", VizConstants.rddDotColor); + d3.selectAll("svg g#cross-stage-edges path") + .style("fill", "none"); } +/* Apply stage-page-specific style to the visualization. */ function styleStageViz() { - d3.selectAll("svg g.cluster rect") - .style("fill", "none") - .style("stroke", "#AADFFF") - .style("stroke-width", "4px") - .style("stroke-opacity", "0.5"); d3.selectAll("svg g.node rect") .style("fill", "white") .style("stroke", "black") .style("stroke-width", "2px") .style("fill-opacity", "0.8") .style("stroke-opacity", "0.9"); - d3.selectAll("svg g.edgePath path") - .style("stroke", "black") - .style("stroke-width", "2px"); - d3.selectAll("svg g.cluster text") - .attr("fill", "#AAAAAA") - .attr("font-size", "11px") +} + +/* + * (Job page only) Return the absolute position of the + * group element identified by the given selector. + */ +function getAbsolutePosition(groupId) { + var obj = d3.select("#" + groupId).filter("g"); + var _x = 0, _y = 0; + while (!obj.empty()) { + var transformText = obj.attr("transform"); + var translate = d3.transform(transformText).translate + _x += translate[0]; + _y += translate[1]; + obj = d3.select(obj.node().parentNode).filter("g") + } + return { x: _x, y: _y }; +} + +/* + * (Job page only) Connect two group elements with a curved edge. + * This assumes that the path will be styled later. + */ +function connect(fromNodeId, toNodeId, container) { + var from = getAbsolutePosition(fromNodeId); + var to = getAbsolutePosition(toNodeId); + + // Account for node radius (this is highly-specific to the job page) + // Otherwise the arrow heads will bleed into the circle itself + var delta = toFloat(d3.select("svg g.nodes circle").attr("r")); + var markerEndDelta = 2; // adjust for arrow stroke width + if (from.x < to.x) { + from.x = from.x + delta; + to.x = to.x - delta - markerEndDelta; + } else if (from.x > to.x) { + from.x = from.x - delta; + to.x = to.x + delta + markerEndDelta; + } + + if (from.y == to.y) { + // If they are on the same rank, curve the middle part of the edge + // upward a little to avoid interference with things in between + var points = [ + [from.x, from.y], + [from.x + (to.x - from.x) * 0.2, from.y], + [from.x + (to.x - from.x) * 0.3, from.y - 20], + [from.x + (to.x - from.x) * 0.7, from.y - 20], + [from.x + (to.x - from.x) * 0.8, to.y], + [to.x, to.y] + ]; + } else { + var points = [ + [from.x, from.y], + [from.x + (to.x - from.x) * 0.4, from.y], + [from.x + (to.x - from.x) * 0.6, to.y], + [to.x, to.y] + ]; + } + + var line = d3.svg.line().interpolate("basis"); + container.append("path") + .datum(points) + .attr("d", line); } /* Helper method to convert attributes to numeric values. */ diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala b/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala index 901af143043fd..a62980454488c 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala @@ -58,7 +58,9 @@ private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener { { graphs.map { g =>
- {VizGraph.makeDotFile(g, forJob)} +
{VizGraph.makeDotFile(g, forJob)}
+ { g.incomingEdges.map { e =>
{e.fromId},{e.toId}
} } + { g.outgoingEdges.map { e =>
{e.fromId},{e.toId}
} }
} } diff --git a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala index 0f225eef4b810..b3dff90a431a3 100644 --- a/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala @@ -20,6 +20,7 @@ package org.apache.spark.ui.viz import scala.collection.mutable import scala.collection.mutable.ListBuffer +import org.apache.spark.Logging import org.apache.spark.rdd.RDDScope import org.apache.spark.scheduler.StageInfo @@ -27,9 +28,15 @@ import org.apache.spark.scheduler.StageInfo * A representation of a generic scoped graph used for storing visualization information. * * Each graph is defined with a set of edges and a root scope, which may contain children - * nodes and children scopes. + * nodes and children scopes. Additionally, a graph may also have edges that enter or exit + * the graph from nodes that belong to adjacent graphs. */ -private[ui] case class VizGraph(edges: Seq[VizEdge], rootScope: VizScope) +private[ui] case class VizGraph( + edges: Seq[VizEdge], + outgoingEdges: Seq[VizEdge], + incomingEdges: Seq[VizEdge], + rootScope: VizScope) + private[ui] case class VizNode(id: Int, name: String) private[ui] case class VizEdge(fromId: Int, toId: Int) @@ -46,7 +53,7 @@ private[ui] class VizScope(val id: String, val name: String) { def attachChildScope(childScope: VizScope): Unit = { _childrenScopes += childScope } } -private[ui] object VizGraph { +private[ui] object VizGraph extends Logging { /** * Construct a VizGraph for a given stage. @@ -56,7 +63,7 @@ private[ui] object VizGraph { * between two RDDs from the parent to the child. */ def makeVizGraph(stage: StageInfo): VizGraph = { - val edges = new mutable.HashSet[VizEdge] + val edges = new ListBuffer[VizEdge] val nodes = new mutable.HashMap[Int, VizNode] val scopes = new mutable.HashMap[String, VizScope] // scope ID -> viz scope @@ -66,7 +73,10 @@ private[ui] object VizGraph { { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" } val rootScope = new VizScope(stageScopeId, stageScopeName) - // Find nodes, edges, and children scopes + // Find nodes, edges, and children scopes that belong to this stage. Each node is an RDD + // that lives either directly in the root scope or in one of the children scopes. Each + // children scope represents a level of the RDD scope and must contain at least one RDD. + // Children scopes can be nested if one RDD operation calls another. stage.rddInfos.foreach { rdd => edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) } val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name)) @@ -94,10 +104,23 @@ private[ui] object VizGraph { } } - // Remove any edges with nodes belonging to other stages so we do not have orphaned nodes - edges.retain { case VizEdge(f, t) => nodes.contains(f) && nodes.contains(t) } + // Classify each edge as internal, outgoing or incoming + val internalEdges = new ListBuffer[VizEdge] + val outgoingEdges = new ListBuffer[VizEdge] + val incomingEdges = new ListBuffer[VizEdge] + edges.foreach { case e: VizEdge => + val fromThisGraph = nodes.contains(e.fromId) + val toThisGraph = nodes.contains(e.toId) + (fromThisGraph, toThisGraph) match { + case (true, true) => internalEdges += e + case (true, false) => outgoingEdges += e + case (false, true) => incomingEdges += e + // should never happen + case _ => logWarning(s"Found an orphan edge in stage ${stage.stageId}: $e") + } + } - VizGraph(edges.toSeq, rootScope) + VizGraph(internalEdges, outgoingEdges, incomingEdges, rootScope) } /** @@ -116,7 +139,9 @@ private[ui] object VizGraph { dotFile.append(s""" ${edge.fromId}->${edge.toId} [lineInterpolate="basis"];\n""") } dotFile.append("}") - dotFile.toString() + val result = dotFile.toString() + logDebug(result) + result } /** Return the dot representation of a node. */