Skip to content

Commit

Permalink
Make the server and client use the same timezone
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 28, 2015
1 parent fed8219 commit 04c7500
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
*/


var globalMinX = 0;
var globalMaxX = 0;
var timelineMarginLeft = 50;
var distributionMinX = 0;
var distributionMaxX = 0;
var binCount = 10;

// An invisible div to show details of a point in the graph
Expand Down Expand Up @@ -53,19 +54,18 @@ function hideGraphTooltip() {
*/
function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) {
d3.select(d3.select(id).node().parentNode).style("padding", "8px 0 8px 8px").style("border-right", "0px solid white");
var margin = {top: 20, right: 27, bottom: 30, left: 50};
var margin = {top: 20, right: 27, bottom: 30, left: timelineMarginLeft};
var width = 500 - margin.left - margin.right;
var height = 150 - margin.top - margin.bottom;

var x = d3.time.scale().domain([minX, maxX]).range([0, width]);
var x = d3.scale.linear().domain([minX, maxX]).range([0, width]);
var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);

var timeFormat = d3.time.format("%H:%M:%S")
var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(timeFormat);
var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; });
var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5);

var line = d3.svg.line()
.x(function(d) { return x(new Date(d.x)); })
.x(function(d) { return x(d.x); })
.y(function(d) { return y(d.y); });

var svg = d3.select(id).append("svg")
Expand Down Expand Up @@ -106,7 +106,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) {
.attr("cy", function(d) { return y(d.y); })
.attr("r", function(d) { return 3; })
.on('mouseover', function(d) {
var tip = d.y + " " + unitY + " at " + timeFormat(new Date(d.x));
var tip = d.y + " " + unitY + " at " + timeFormat[d.x];
showGraphTooltip(tip, d3.event.pageX + 5, d3.event.pageY - 25);
// show the point
d3.select(this)
Expand Down Expand Up @@ -137,15 +137,13 @@ function drawDistribution(id, values, minY, maxY, unitY) {
var width = 300 - margin.left - margin.right;
var height = 150 - margin.top - margin.bottom;

//var binCount = values.length > 100 ? 100 : values.length;
var formatBinValue = d3.format(",.2f");

var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values);

var x = d3.scale.linear()
.domain([globalMinX, globalMaxX])
//.domain([0, d3.max(data, function(d) { return d.y; })])
.domain([distributionMinX, distributionMaxX])
.range([0, width]);

var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
Expand All @@ -165,9 +163,6 @@ function drawDistribution(id, values, minY, maxY, unitY) {
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
// .append("text")
// .attr("transform", "translate(0," + (-3) + ")")
// .text(unitY);

var bar = svg.selectAll(".bar")
.data(data)
Expand Down Expand Up @@ -199,8 +194,17 @@ function drawDistribution(id, values, minY, maxY, unitY) {
});
}

function prepareTimeline(minY, maxY) {
var y = d3.scale.linear().domain([0, maxY]).tickFormat(5);
console.log(y(maxY));
var numOfChars = y(maxY).length;
var maxPx = numOfChars * 8 + 10;
// Make sure we have enough space to show the ticks in the y axis of timeline
timelineMarginLeft = maxPx > timelineMarginLeft? maxPx : timelineMarginLeft;
}

function prepareDistribution(values, minY, maxY) {
var data = d3.layout.histogram().range([minY, maxY]).bins(binCount)(values);
var maxBarSize = d3.max(data, function(d) { return d.y; });
globalMaxX = maxBarSize > globalMaxX? maxBarSize : globalMaxX;
distributionMaxX = maxBarSize > distributionMaxX? maxBarSize : distributionMaxX;
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
retainedBatches.lastOption
}

def retainedBatches: Seq[BatchInfo] = {
def retainedBatches: Seq[BatchInfo] = synchronized {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.streaming.ui

import java.text.SimpleDateFormat
import java.util.Date
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.ArrayBuffer
Expand All @@ -30,6 +32,8 @@ import org.apache.spark.util.Distribution
/**
* @param divId the `id` used in the html `div` tag
* @param data the data for the timeline graph
* @param minX the min value of X axis
* @param maxX the max value of X axis
* @param minY the min value of Y axis
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
Expand All @@ -41,6 +45,7 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX:
val jsForData = data.map { case (x, y) =>
s"""{"x": $x, "y": $y}"""
}.mkString("[", ",", "]")
jsCollector.addPreparedStatement(s"prepareTimeline($minY, $maxY);")
jsCollector.addStatement(
s"drawTimeline('#$divId', $jsForData, $minX, $maxX, $minY, $maxY, '$unitY');")

Expand All @@ -67,17 +72,21 @@ private[ui] case class DistributionUIData(
}
}

private[ui] case class LongStreamingUIData(data: Seq[(Long, Long)]) {
private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) {

val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)

val formattedAvg: String = StreamingPage.formatDurationOption(avg)

val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max)
}

private[ui] case class DoubleStreamingUIData(data: Seq[(Long, Double)]) {
private[ui] case class DoubleStatUIData(data: Seq[(Long, Double)]) {

val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)

val formattedAvg: String = avg.map(_.formatted("%.2f")).getOrElse("-")

val max: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).max)
}

Expand All @@ -89,7 +98,6 @@ private[ui] class StreamingPage(parent: StreamingTab)

private val listener = parent.listener
private val startTime = System.currentTimeMillis()
private val emptyCell = "-"

/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
Expand All @@ -104,6 +112,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
UIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
}

/**
* Generate html that will load css/js files for StreamingPage
*/
private def generateLoadResources(): Seq[Node] = {
// scalastyle:off
<script src={UIUtils.prependBaseUri("/static/d3.min.js")}></script>
Expand All @@ -130,24 +141,34 @@ private[ui] class StreamingPage(parent: StreamingTab)
</div>
}

private def generateTimeMap(times: Seq[Long]): Seq[Node] = {
val dateFormat = new SimpleDateFormat("HH:mm:ss")
val js = "var timeFormat = {};\n" + times.map { time =>
val formattedTime = dateFormat.format(new Date(time))
s"timeFormat[$time] = '$formattedTime';"
}.mkString("\n")

<script>{Unparsed(js)}</script>
}

private def generateStatTable(): Seq[Node] = {
val batchInfos = listener.retainedBatches

val batchTimes = batchInfos.map(_.batchTime.milliseconds)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max

val eventRateForAllReceivers = DoubleStreamingUIData(batchInfos.map { batchInfo =>
val eventRateForAllReceivers = DoubleStatUIData(batchInfos.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})

val schedulingDelay = LongStreamingUIData(batchInfos.flatMap { batchInfo =>
val schedulingDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
})
val processingTime = LongStreamingUIData(batchInfos.flatMap { batchInfo =>
val processingTime = MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
})
val totalDelay = LongStreamingUIData(batchInfos.flatMap { batchInfo =>
val totalDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
})

Expand All @@ -167,6 +188,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val maxEventRate = eventRateForAllReceivers.max.map(_.ceil.toLong).getOrElse(0L)
val minEventRate = 0L

// JavaScript to show/hide the receiver sub table.
val triangleJs =
s"""$$('#inputs-table').toggle('collapsed');
|if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML')
Expand Down Expand Up @@ -258,7 +280,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
<span onclick={Unparsed(triangleJs)}>{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}</span>
<strong>Input Rate</strong>
</div>
<div>Avg: {eventRateForAllReceivers.avg.map(_.formatted("%.2f")).getOrElse(emptyCell)} events/sec</div>
<div>Avg: {eventRateForAllReceivers.formattedAvg} events/sec</div>
</td>
<td class="timeline">{timelineDataForEventRateOfAllReceivers}</td>
<td class="distribution">{distributionDataForEventRateOfAllReceivers}</td>
Expand All @@ -271,23 +293,23 @@ private[ui] class StreamingPage(parent: StreamingTab)
<tr>
<td style="vertical-align: middle;">
<div><strong>Scheduling Delay</strong></div>
<div>Avg: {formatDurationOption(schedulingDelay.avg)}</div>
<div>Avg: {schedulingDelay.formattedAvg}</div>
</td>
<td class="timeline">{timelineDataForSchedulingDelay}</td>
<td class="distribution">{distributionDataForSchedulingDelay}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div><strong>Processing Time</strong></div>
<div>Avg: {formatDurationOption(processingTime.avg)}</div>
<div>Avg: {processingTime.formattedAvg}</div>
</td>
<td class="timeline">{timelineDataForProcessingTime}</td>
<td class="distribution">{distributionDataForProcessingTime}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div><strong>Total Delay</strong></div>
<div>Avg: {formatDurationOption(totalDelay.avg)}</div>
<div>Avg: {totalDelay.formattedAvg}</div>
</td>
<td class="timeline">{timelineDataForTotalDelay}</td>
<td class="distribution">{distributionDataForTotalDelay}</td>
Expand All @@ -296,7 +318,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
</table>
// scalastyle:on

table ++ jsCollector.toHtml
generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
}

private def generateInputReceiversTable(
Expand Down Expand Up @@ -366,7 +388,6 @@ private[ui] class StreamingPage(parent: StreamingTab)
maxY,
"events/sec").toHtml(jsCollector)

// scalastyle:off
<tr>
<td rowspan="2" style="vertical-align: middle;">
<div>
Expand All @@ -385,7 +406,6 @@ private[ui] class StreamingPage(parent: StreamingTab)
</td>
<td class="distribution">{distributionForEventsRate}</td>
</tr>
// scalastyle:on
}

/**
Expand Down Expand Up @@ -417,13 +437,33 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
}

private object StreamingPage {
private[ui] object StreamingPage {
val BLACK_RIGHT_TRIANGLE_HTML = "&#9654;"
val BLACK_DOWN_TRIANGLE_HTML = "&#9660;"

val emptyCell = "-"

/**
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
def formatDurationOption(msOption: Option[Long]): String = {
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
}
}

/**
* A helper class that allows the user to add JavaScript statements which will be executed when the
* DOM has finished loading.
*/
private[ui] class JsCollector {
/**
* JavaScript statements that will execute before `statements`
*/
private val preparedStatements = ArrayBuffer[String]()

/**
* JavaScript statements that will execute after `preparedStatements`
*/
private val statements = ArrayBuffer[String]()

def addPreparedStatement(js: String): Unit = {
Expand All @@ -434,6 +474,9 @@ private[ui] class JsCollector {
statements += js
}

/**
* Generate a html snippet that will execute all scripts when the DOM has finished loading.
*/
def toHtml: Seq[Node] = {
val js =
s"""
Expand Down

0 comments on commit 04c7500

Please sign in to comment.