From 04c750016e711448e6281789e39ef482030050db Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 27 Apr 2015 18:50:33 -0700 Subject: [PATCH] Make the server and client use the same timezone --- .../apache/spark/ui/static/streaming-page.js | 34 +++++---- .../ui/StreamingJobProgressListener.scala | 2 +- .../spark/streaming/ui/StreamingPage.scala | 73 +++++++++++++++---- 3 files changed, 78 insertions(+), 31 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js index 2e03b89c12660..5b39ec6d198f0 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js @@ -16,8 +16,9 @@ */ -var globalMinX = 0; -var globalMaxX = 0; +var timelineMarginLeft = 50; +var distributionMinX = 0; +var distributionMaxX = 0; var binCount = 10; // An invisible div to show details of a point in the graph @@ -53,19 +54,18 @@ function hideGraphTooltip() { */ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) { d3.select(d3.select(id).node().parentNode).style("padding", "8px 0 8px 8px").style("border-right", "0px solid white"); - var margin = {top: 20, right: 27, bottom: 30, left: 50}; + var margin = {top: 20, right: 27, bottom: 30, left: timelineMarginLeft}; var width = 500 - margin.left - margin.right; var height = 150 - margin.top - margin.bottom; - var x = d3.time.scale().domain([minX, maxX]).range([0, width]); + var x = d3.scale.linear().domain([minX, maxX]).range([0, width]); var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); - var timeFormat = d3.time.format("%H:%M:%S") - var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(timeFormat); + var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; }); var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5); var line = d3.svg.line() - .x(function(d) { return x(new Date(d.x)); }) + .x(function(d) { return x(d.x); }) .y(function(d) { return y(d.y); }); var svg = d3.select(id).append("svg") @@ -106,7 +106,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) { .attr("cy", function(d) { return y(d.y); }) .attr("r", function(d) { return 3; }) .on('mouseover', function(d) { - var tip = d.y + " " + unitY + " at " + timeFormat(new Date(d.x)); + var tip = d.y + " " + unitY + " at " + timeFormat[d.x]; showGraphTooltip(tip, d3.event.pageX + 5, d3.event.pageY - 25); // show the point d3.select(this) @@ -137,15 +137,13 @@ function drawDistribution(id, values, minY, maxY, unitY) { var width = 300 - margin.left - margin.right; var height = 150 - margin.top - margin.bottom; - //var binCount = values.length > 100 ? 100 : values.length; var formatBinValue = d3.format(",.2f"); var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values); var x = d3.scale.linear() - .domain([globalMinX, globalMaxX]) - //.domain([0, d3.max(data, function(d) { return d.y; })]) + .domain([distributionMinX, distributionMaxX]) .range([0, width]); var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5); @@ -165,9 +163,6 @@ function drawDistribution(id, values, minY, maxY, unitY) { svg.append("g") .attr("class", "y axis") .call(yAxis) - // .append("text") - // .attr("transform", "translate(0," + (-3) + ")") - // .text(unitY); var bar = svg.selectAll(".bar") .data(data) @@ -199,8 +194,17 @@ function drawDistribution(id, values, minY, maxY, unitY) { }); } +function prepareTimeline(minY, maxY) { + var y = d3.scale.linear().domain([0, maxY]).tickFormat(5); + console.log(y(maxY)); + var numOfChars = y(maxY).length; + var maxPx = numOfChars * 8 + 10; + // Make sure we have enough space to show the ticks in the y axis of timeline + timelineMarginLeft = maxPx > timelineMarginLeft? maxPx : timelineMarginLeft; +} + function prepareDistribution(values, minY, maxY) { var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values); var maxBarSize = d3.max(data, function(d) { return d.y; }); - globalMaxX = maxBarSize > globalMaxX? maxBarSize : globalMaxX; + distributionMaxX = maxBarSize > distributionMaxX? maxBarSize : distributionMaxX; } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index de7e5fbe0b67d..0b5402a6d9fce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -196,7 +196,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext) retainedBatches.lastOption } - def retainedBatches: Seq[BatchInfo] = { + def retainedBatches: Seq[BatchInfo] = synchronized { (waitingBatchInfos.values.toSeq ++ runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index aa4a246a62470..6abdb98abb5dd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.ui +import java.text.SimpleDateFormat +import java.util.Date import javax.servlet.http.HttpServletRequest import scala.collection.mutable.ArrayBuffer @@ -30,6 +32,8 @@ import org.apache.spark.util.Distribution /** * @param divId the `id` used in the html `div` tag * @param data the data for the timeline graph + * @param minX the min value of X axis + * @param maxX the max value of X axis * @param minY the min value of Y axis * @param maxY the max value of Y axis * @param unitY the unit of Y axis @@ -41,6 +45,7 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: val jsForData = data.map { case (x, y) => s"""{"x": $x, "y": $y}""" }.mkString("[", ",", "]") + jsCollector.addPreparedStatement(s"prepareTimeline($minY, $maxY);") jsCollector.addStatement( s"drawTimeline('#$divId', $jsForData, $minX, $maxX, $minY, $maxY, '$unitY');") @@ -67,17 +72,21 @@ private[ui] case class DistributionUIData( } } -private[ui] case class LongStreamingUIData(data: Seq[(Long, Long)]) { +private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) { val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) + val formattedAvg: String = StreamingPage.formatDurationOption(avg) + val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max) } -private[ui] case class DoubleStreamingUIData(data: Seq[(Long, Double)]) { +private[ui] case class DoubleStatUIData(data: Seq[(Long, Double)]) { val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) + val formattedAvg: String = avg.map(_.formatted("%.2f")).getOrElse("-") + val max: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).max) } @@ -89,7 +98,6 @@ private[ui] class StreamingPage(parent: StreamingTab) private val listener = parent.listener private val startTime = System.currentTimeMillis() - private val emptyCell = "-" /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { @@ -104,6 +112,9 @@ private[ui] class StreamingPage(parent: StreamingTab) UIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000)) } + /** + * Generate html that will load css/js files for StreamingPage + */ private def generateLoadResources(): Seq[Node] = { // scalastyle:off @@ -130,6 +141,16 @@ private[ui] class StreamingPage(parent: StreamingTab) } + private def generateTimeMap(times: Seq[Long]): Seq[Node] = { + val dateFormat = new SimpleDateFormat("HH:mm:ss") + val js = "var timeFormat = {};\n" + times.map { time => + val formattedTime = dateFormat.format(new Date(time)) + s"timeFormat[$time] = '$formattedTime';" + }.mkString("\n") + + + } + private def generateStatTable(): Seq[Node] = { val batchInfos = listener.retainedBatches @@ -137,17 +158,17 @@ private[ui] class StreamingPage(parent: StreamingTab) val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max - val eventRateForAllReceivers = DoubleStreamingUIData(batchInfos.map { batchInfo => + val eventRateForAllReceivers = DoubleStatUIData(batchInfos.map { batchInfo => (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration) }) - val schedulingDelay = LongStreamingUIData(batchInfos.flatMap { batchInfo => + val schedulingDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo => batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _) }) - val processingTime = LongStreamingUIData(batchInfos.flatMap { batchInfo => + val processingTime = MillisecondsStatUIData(batchInfos.flatMap { batchInfo => batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _) }) - val totalDelay = LongStreamingUIData(batchInfos.flatMap { batchInfo => + val totalDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo => batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _) }) @@ -167,6 +188,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val maxEventRate = eventRateForAllReceivers.max.map(_.ceil.toLong).getOrElse(0L) val minEventRate = 0L + // JavaScript to show/hide the receiver sub table. val triangleJs = s"""$$('#inputs-table').toggle('collapsed'); |if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') @@ -258,7 +280,7 @@ private[ui] class StreamingPage(parent: StreamingTab) {Unparsed(BLACK_RIGHT_TRIANGLE_HTML)} Input Rate -
Avg: {eventRateForAllReceivers.avg.map(_.formatted("%.2f")).getOrElse(emptyCell)} events/sec
+
Avg: {eventRateForAllReceivers.formattedAvg} events/sec
{timelineDataForEventRateOfAllReceivers} {distributionDataForEventRateOfAllReceivers} @@ -271,7 +293,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
Scheduling Delay
-
Avg: {formatDurationOption(schedulingDelay.avg)}
+
Avg: {schedulingDelay.formattedAvg}
{timelineDataForSchedulingDelay} {distributionDataForSchedulingDelay} @@ -279,7 +301,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
Processing Time
-
Avg: {formatDurationOption(processingTime.avg)}
+
Avg: {processingTime.formattedAvg}
{timelineDataForProcessingTime} {distributionDataForProcessingTime} @@ -287,7 +309,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
Total Delay
-
Avg: {formatDurationOption(totalDelay.avg)}
+
Avg: {totalDelay.formattedAvg}
{timelineDataForTotalDelay} {distributionDataForTotalDelay} @@ -296,7 +318,7 @@ private[ui] class StreamingPage(parent: StreamingTab) // scalastyle:on - table ++ jsCollector.toHtml + generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml } private def generateInputReceiversTable( @@ -366,7 +388,6 @@ private[ui] class StreamingPage(parent: StreamingTab) maxY, "events/sec").toHtml(jsCollector) - // scalastyle:off
@@ -385,7 +406,6 @@ private[ui] class StreamingPage(parent: StreamingTab) {distributionForEventsRate} - // scalastyle:on } /** @@ -417,13 +437,33 @@ private[ui] class StreamingPage(parent: StreamingTab) } } -private object StreamingPage { +private[ui] object StreamingPage { val BLACK_RIGHT_TRIANGLE_HTML = "▶" val BLACK_DOWN_TRIANGLE_HTML = "▼" + + val emptyCell = "-" + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + def formatDurationOption(msOption: Option[Long]): String = { + msOption.map(formatDurationVerbose).getOrElse(emptyCell) + } } +/** + * A helper class that allows the user to add JavaScript statements which will be executed when the + * DOM has finished loading. + */ private[ui] class JsCollector { + /** + * JavaScript statements that will execute before `statements` + */ private val preparedStatements = ArrayBuffer[String]() + + /** + * JavaScript statements that will execute after `preparedStatements` + */ private val statements = ArrayBuffer[String]() def addPreparedStatement(js: String): Unit = { @@ -434,6 +474,9 @@ private[ui] class JsCollector { statements += js } + /** + * Generate a html snippet that will execute all scripts when the DOM has finished loading. + */ def toHtml: Seq[Node] = { val js = s"""