Skip to content

Commit

Permalink
Change time unit in the graphs automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 28, 2015
1 parent 4c0b43f commit c81a1ee
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@

.line {
fill: none;
stroke: steelblue;
stroke: #0088cc;
stroke-width: 1.5px;
}

.bar rect {
fill: steelblue;
fill: #0088cc;
shape-rendering: crispEdges;
}

.bar rect:hover {
//fill: rgb(49, 91, 125);
fill: rgb(100, 185, 255);
fill: #005580;
}

.timeline {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) {
var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);

var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; });
var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5);
var formatYValue = d3.format(",.2f");
var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue);

var line = d3.svg.line()
.x(function(d) { return x(d.x); })
Expand Down Expand Up @@ -117,7 +118,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) {
.attr("cy", function(d) { return y(d.y); })
.attr("r", function(d) { return 3; })
.on('mouseover', function(d) {
var tip = d.y + " " + unitY + " at " + timeFormat[d.x];
var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x];
showBootstrapTooltip(d3.select(this).node(), tip);
//showGraphTooltip(tip, d3.event.pageX + 5, d3.event.pageY - 25);
// show the point
Expand Down Expand Up @@ -210,9 +211,8 @@ function drawDistribution(id, values, minY, maxY, unitY) {
}

function prepareTimeline(minY, maxY) {
var y = d3.scale.linear().domain([0, maxY]).tickFormat(5);
console.log(y(maxY));
var numOfChars = y(maxY).length;
var formatYValue = d3.format(",.2f");
var numOfChars = formatYValue(maxY).length;
var maxPx = numOfChars * 8 + 10;
// Make sure we have enough space to show the ticks in the y axis of timeline
timelineMarginLeft = maxPx > timelineMarginLeft? maxPx : timelineMarginLeft;
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ui

import java.text.SimpleDateFormat
import java.util.concurrent.TimeUnit
import java.util.{Locale, Date}

import scala.xml.{Node, Text}
Expand All @@ -34,6 +35,39 @@ private[spark] object UIUtils extends Logging {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

/**
* Return the short string for a `TimeUnit`.
*/
def shortTimeUnitString(unit: TimeUnit): String = unit match {
case TimeUnit.NANOSECONDS => "ns"
case TimeUnit.MICROSECONDS => "us"
case TimeUnit.MILLISECONDS => "ms"
case TimeUnit.SECONDS => "s"
case TimeUnit.MINUTES => "min"
case TimeUnit.HOURS => "h"
case TimeUnit.DAYS => "d"
}

/**
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value after converting with
* the `TimeUnit`.
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
return (milliseconds, TimeUnit.MILLISECONDS)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 60) {
return (seconds, TimeUnit.SECONDS)
}
val minutes = seconds / 60
if (minutes < 60) {
return (minutes, TimeUnit.MINUTES)
}
val hours = minutes / 60
(hours, TimeUnit.HOURS)
}

def formatDate(date: Date): String = dateFormat.get.format(date)

def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.streaming.ui

import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable.ArrayBuffer
Expand All @@ -38,8 +39,8 @@ import org.apache.spark.util.Distribution
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
*/
private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long, maxX: Long,
minY: Long, maxY: Long, unitY: String) {
private[ui] class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long, maxX: Long,
minY: Double, maxY: Double, unitY: String) {

def toHtml(jsCollector: JsCollector): Seq[Node] = {
val jsForData = data.map { case (x, y) =>
Expand All @@ -60,8 +61,8 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX:
* @param maxY the max value of Y axis
* @param unitY the unit of Y axis
*/
private[ui] case class DistributionUIData(
divId: String, data: Seq[_], minY: Long, maxY: Long, unitY: String) {
private[ui] class DistributionUIData(
divId: String, data: Seq[_], minY: Double, maxY: Double, unitY: String) {

def toHtml(jsCollector: JsCollector): Seq[Node] = {
val jsForData = data.mkString("[", ",", "]")
Expand All @@ -72,7 +73,11 @@ private[ui] case class DistributionUIData(
}
}

private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {

def timelineData(unit: TimeUnit) = data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit))

def distributionData(unit: TimeUnit) = data.map(x => StreamingPage.convertToTimeUnit(x._2, unit))

val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)

Expand All @@ -81,7 +86,7 @@ private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max)
}

private[ui] case class DoubleStatUIData(data: Seq[(Long, Double)]) {
private[ui] class DoubleStatUIData(val data: Seq[(Long, Double)]) {

val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)

Expand Down Expand Up @@ -158,30 +163,31 @@ private[ui] class StreamingPage(parent: StreamingTab)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max

val eventRateForAllReceivers = DoubleStatUIData(batchInfos.map { batchInfo =>
val eventRateForAllReceivers = new DoubleStatUIData(batchInfos.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})

val schedulingDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
val schedulingDelay = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
})
val processingTime = MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
val processingTime = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
})
val totalDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
val totalDelay = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo =>
batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
})

val jsCollector = new JsCollector

// Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
// Y axis ranges same.
val maxTime =
val _maxTime =
(for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield
m1 max m2 max m3).getOrElse(0L)
List(1, 2, 3).sum
// Should start at 0
val minTime = 0L
val (maxTime, unit) = UIUtils.normalizeDuration(_maxTime)
val formattedUnit = UIUtils.shortTimeUnitString(unit)

// Use the max input rate for all receivers' graphs to make the Y axis ranges same.
// If it's not an integral number, just use its ceil integral number.
Expand All @@ -196,7 +202,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
|else $$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');""".stripMargin.replaceAll("\\n", "")

val timelineDataForEventRateOfAllReceivers =
TimelineUIData(
new TimelineUIData(
"all-receiver-events-timeline",
eventRateForAllReceivers.data,
minBatchTime,
Expand All @@ -206,81 +212,83 @@ private[ui] class StreamingPage(parent: StreamingTab)
"events/sec").toHtml(jsCollector)

val distributionDataForEventRateOfAllReceivers =
DistributionUIData(
new DistributionUIData(
"all-receiver-events-distribution",
eventRateForAllReceivers.data.map(_._2),
minEventRate,
maxEventRate,
"events/sec").toHtml(jsCollector)

val timelineDataForSchedulingDelay =
TimelineUIData(
new TimelineUIData(
"scheduling-delay-timeline",
schedulingDelay.data,
schedulingDelay.timelineData(unit),
minBatchTime,
maxBatchTime,
minTime,
maxTime,
"ms").toHtml(jsCollector)
formattedUnit).toHtml(jsCollector)

val distributionDataForSchedulingDelay =
DistributionUIData(
new DistributionUIData(
"scheduling-delay-distribution",
schedulingDelay.data.map(_._2),
schedulingDelay.distributionData(unit),
minTime,
maxTime,
"ms").toHtml(jsCollector)
formattedUnit).toHtml(jsCollector)

val timelineDataForProcessingTime =
TimelineUIData(
new TimelineUIData(
"processing-time-timeline",
processingTime.data,
processingTime.timelineData(unit),
minBatchTime,
maxBatchTime,
minTime,
maxTime,
"ms").toHtml(jsCollector)
formattedUnit).toHtml(jsCollector)

val distributionDataForProcessingTime =
DistributionUIData(
new DistributionUIData(
"processing-time-distribution",
processingTime.data.map(_._2),
processingTime.distributionData(unit),
minTime,
maxTime,
"ms").toHtml(jsCollector)
formattedUnit).toHtml(jsCollector)

val timelineDataForTotalDelay =
TimelineUIData(
new TimelineUIData(
"total-delay-timeline",
totalDelay.data,
totalDelay.timelineData(unit),
minBatchTime,
maxBatchTime,
minTime,
maxTime,
"ms").toHtml(jsCollector)
formattedUnit).toHtml(jsCollector)

val distributionDataForTotalDelay =
DistributionUIData(
new DistributionUIData(
"total-delay-distribution",
totalDelay.data.map(_._2),
totalDelay.distributionData(unit),
minTime,
maxTime,
"ms").toHtml(jsCollector)
formattedUnit).toHtml(jsCollector)

val table =
// scalastyle:off
<table class="table table-bordered" style="width: auto">
<thead>
<tr><th></th><th>Timelines</th><th>Histograms</th></tr>
<tr><th style="width: 160px;"></th><th style="width: 492px;">Timelines</th><th style="width: 300px;">Histograms</th></tr>
</thead>
<tbody>
<tr>
<td style="vertical-align: middle; width: 200px;">
<td style="vertical-align: middle; width: 160px;">
<div style="width: 160px;">
<div>
<span onclick={Unparsed(triangleJs)}>{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}</span>
<strong>Input Rate</strong>
</div>
<div>Avg: {eventRateForAllReceivers.formattedAvg} events/sec</div>
</div>
</td>
<td class="timeline">{timelineDataForEventRateOfAllReceivers}</td>
<td class="distribution">{distributionDataForEventRateOfAllReceivers}</td>
Expand Down Expand Up @@ -325,19 +333,19 @@ private[ui] class StreamingPage(parent: StreamingTab)
jsCollector: JsCollector,
minX: Long,
maxX: Long,
minY: Long,
maxY: Long): Seq[Node] = {
minY: Double,
maxY: Double): Seq[Node] = {
val content = listener.receivedRecordsDistributions.map { case (receiverId, distribution) =>
generateInputReceiverRow(jsCollector, receiverId, distribution, minX, maxX, minY, maxY)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)

<table class="table table-bordered" style="width: auto">
<thead>
<tr>
<th></th>
<th style="width: 166.7px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Status</div></th>
<th style="width: 166.7px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Location</div></th>
<th style="width: 166.7px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Last Error Time</div></th>
<th style="width: 151px;"></th>
<th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Status</div></th>
<th style="width: 167px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Location</div></th>
<th style="width: 166px; padding: 8px 0 8px 0"><div style="margin: 0 8px 0 8px">Last Error Time</div></th>
<th>Last Error Message</th>
</tr>
</thead>
Expand All @@ -353,8 +361,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
distribution: Option[Distribution],
minX: Long,
maxX: Long,
minY: Long,
maxY: Long): Seq[Node] = {
minY: Double,
maxY: Double): Seq[Node] = {
val avgReceiverEvents = distribution.map(_.statCounter.mean.toLong)
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
Expand All @@ -371,7 +379,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val receivedRecords = listener.receivedRecordsWithBatchTime.get(receiverId).getOrElse(Seq())

val timelineForEventRate =
TimelineUIData(
new TimelineUIData(
s"receiver-$receiverId-events-timeline",
receivedRecords,
minX,
Expand All @@ -381,19 +389,21 @@ private[ui] class StreamingPage(parent: StreamingTab)
"events/sec").toHtml(jsCollector)

val distributionForEventsRate =
DistributionUIData(
new DistributionUIData(
s"receiver-$receiverId-events-distribution",
receivedRecords.map(_._2),
minY,
maxY,
"events/sec").toHtml(jsCollector)

<tr>
<td rowspan="2" style="vertical-align: middle; width: 193px;">
<td rowspan="2" style="vertical-align: middle; width: 151px;">
<div style="width: 151px;">
<div>
<strong>{receiverName}</strong>
</div>
<div>Avg: {avgReceiverEvents.map(_.toString).getOrElse(emptyCell)} events/sec</div>
</div>
</td>
<td>{receiverActive}</td>
<td>{receiverLocation}</td>
Expand Down Expand Up @@ -449,6 +459,16 @@ private[ui] object StreamingPage {
def formatDurationOption(msOption: Option[Long]): String = {
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
}

def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000 // not used yet
case TimeUnit.MICROSECONDS => milliseconds * 1000 // not used yet
case TimeUnit.MILLISECONDS => milliseconds
case TimeUnit.SECONDS => milliseconds / 1000.0
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
}
}

/**
Expand Down

0 comments on commit c81a1ee

Please sign in to comment.