diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css index efef31f8ea9e1..4ace2043d4129 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css @@ -32,18 +32,18 @@ .line { fill: none; - stroke: steelblue; + stroke: #0088cc; stroke-width: 1.5px; } .bar rect { - fill: steelblue; + fill: #0088cc; shape-rendering: crispEdges; } .bar rect:hover { //fill: rgb(49, 91, 125); - fill: rgb(100, 185, 255); + fill: #005580; } .timeline { diff --git a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js index e5f6f3eba3b76..2c1b9c1809bd0 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js +++ b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js @@ -73,7 +73,8 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) { var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]); var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(function(d) { return timeFormat[d]; }); - var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5); + var formatYValue = d3.format(",.2f"); + var yAxis = d3.svg.axis().scale(y).orient("left").ticks(5).tickFormat(formatYValue); var line = d3.svg.line() .x(function(d) { return x(d.x); }) @@ -117,7 +118,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, unitY) { .attr("cy", function(d) { return y(d.y); }) .attr("r", function(d) { return 3; }) .on('mouseover', function(d) { - var tip = d.y + " " + unitY + " at " + timeFormat[d.x]; + var tip = formatYValue(d.y) + " " + unitY + " at " + timeFormat[d.x]; showBootstrapTooltip(d3.select(this).node(), tip); //showGraphTooltip(tip, d3.event.pageX + 5, d3.event.pageY - 25); // show the point @@ -210,9 +211,8 @@ function drawDistribution(id, values, minY, maxY, unitY) { } function prepareTimeline(minY, maxY) { - var y = d3.scale.linear().domain([0, maxY]).tickFormat(5); - console.log(y(maxY)); - var numOfChars = y(maxY).length; + var formatYValue = d3.format(",.2f"); + var numOfChars = formatYValue(maxY).length; var maxPx = numOfChars * 8 + 10; // Make sure we have enough space to show the ticks in the y axis of timeline timelineMarginLeft = maxPx > timelineMarginLeft? maxPx : timelineMarginLeft; diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index f07864141a21c..827b945ec2787 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.ui import java.text.SimpleDateFormat +import java.util.concurrent.TimeUnit import java.util.{Locale, Date} import scala.xml.{Node, Text} @@ -34,6 +35,39 @@ private[spark] object UIUtils extends Logging { override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } + /** + * Return the short string for a `TimeUnit`. + */ + def shortTimeUnitString(unit: TimeUnit): String = unit match { + case TimeUnit.NANOSECONDS => "ns" + case TimeUnit.MICROSECONDS => "us" + case TimeUnit.MILLISECONDS => "ms" + case TimeUnit.SECONDS => "s" + case TimeUnit.MINUTES => "min" + case TimeUnit.HOURS => "h" + case TimeUnit.DAYS => "d" + } + + /** + * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value after converting with + * the `TimeUnit`. + */ + def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = { + if (milliseconds < 1000) { + return (milliseconds, TimeUnit.MILLISECONDS) + } + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return (seconds, TimeUnit.SECONDS) + } + val minutes = seconds / 60 + if (minutes < 60) { + return (minutes, TimeUnit.MINUTES) + } + val hours = minutes / 60 + (hours, TimeUnit.HOURS) + } + def formatDate(date: Date): String = dateFormat.get.format(date) def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index b7185a269d4c6..2fe9fbd8a2811 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.ui import java.text.SimpleDateFormat import java.util.Date +import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest import scala.collection.mutable.ArrayBuffer @@ -38,8 +39,8 @@ import org.apache.spark.util.Distribution * @param maxY the max value of Y axis * @param unitY the unit of Y axis */ -private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long, maxX: Long, - minY: Long, maxY: Long, unitY: String) { +private[ui] class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: Long, maxX: Long, + minY: Double, maxY: Double, unitY: String) { def toHtml(jsCollector: JsCollector): Seq[Node] = { val jsForData = data.map { case (x, y) => @@ -60,8 +61,8 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX: * @param maxY the max value of Y axis * @param unitY the unit of Y axis */ -private[ui] case class DistributionUIData( - divId: String, data: Seq[_], minY: Long, maxY: Long, unitY: String) { +private[ui] class DistributionUIData( + divId: String, data: Seq[_], minY: Double, maxY: Double, unitY: String) { def toHtml(jsCollector: JsCollector): Seq[Node] = { val jsForData = data.mkString("[", ",", "]") @@ -72,7 +73,11 @@ private[ui] case class DistributionUIData( } } -private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) { +private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) { + + def timelineData(unit: TimeUnit) = data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit)) + + def distributionData(unit: TimeUnit) = data.map(x => StreamingPage.convertToTimeUnit(x._2, unit)) val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) @@ -81,7 +86,7 @@ private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) { val max: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).max) } -private[ui] case class DoubleStatUIData(data: Seq[(Long, Double)]) { +private[ui] class DoubleStatUIData(val data: Seq[(Long, Double)]) { val avg: Option[Double] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) @@ -158,17 +163,17 @@ private[ui] class StreamingPage(parent: StreamingTab) val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max - val eventRateForAllReceivers = DoubleStatUIData(batchInfos.map { batchInfo => + val eventRateForAllReceivers = new DoubleStatUIData(batchInfos.map { batchInfo => (batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration) }) - val schedulingDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo => + val schedulingDelay = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo => batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _) }) - val processingTime = MillisecondsStatUIData(batchInfos.flatMap { batchInfo => + val processingTime = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo => batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _) }) - val totalDelay = MillisecondsStatUIData(batchInfos.flatMap { batchInfo => + val totalDelay = new MillisecondsStatUIData(batchInfos.flatMap { batchInfo => batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _) }) @@ -176,12 +181,13 @@ private[ui] class StreamingPage(parent: StreamingTab) // Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the // Y axis ranges same. - val maxTime = + val _maxTime = (for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield m1 max m2 max m3).getOrElse(0L) - List(1, 2, 3).sum // Should start at 0 val minTime = 0L + val (maxTime, unit) = UIUtils.normalizeDuration(_maxTime) + val formattedUnit = UIUtils.shortTimeUnitString(unit) // Use the max input rate for all receivers' graphs to make the Y axis ranges same. // If it's not an integral number, just use its ceil integral number. @@ -196,7 +202,7 @@ private[ui] class StreamingPage(parent: StreamingTab) |else $$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');""".stripMargin.replaceAll("\\n", "") val timelineDataForEventRateOfAllReceivers = - TimelineUIData( + new TimelineUIData( "all-receiver-events-timeline", eventRateForAllReceivers.data, minBatchTime, @@ -206,7 +212,7 @@ private[ui] class StreamingPage(parent: StreamingTab) "events/sec").toHtml(jsCollector) val distributionDataForEventRateOfAllReceivers = - DistributionUIData( + new DistributionUIData( "all-receiver-events-distribution", eventRateForAllReceivers.data.map(_._2), minEventRate, @@ -214,73 +220,75 @@ private[ui] class StreamingPage(parent: StreamingTab) "events/sec").toHtml(jsCollector) val timelineDataForSchedulingDelay = - TimelineUIData( + new TimelineUIData( "scheduling-delay-timeline", - schedulingDelay.data, + schedulingDelay.timelineData(unit), minBatchTime, maxBatchTime, minTime, maxTime, - "ms").toHtml(jsCollector) + formattedUnit).toHtml(jsCollector) val distributionDataForSchedulingDelay = - DistributionUIData( + new DistributionUIData( "scheduling-delay-distribution", - schedulingDelay.data.map(_._2), + schedulingDelay.distributionData(unit), minTime, maxTime, - "ms").toHtml(jsCollector) + formattedUnit).toHtml(jsCollector) val timelineDataForProcessingTime = - TimelineUIData( + new TimelineUIData( "processing-time-timeline", - processingTime.data, + processingTime.timelineData(unit), minBatchTime, maxBatchTime, minTime, maxTime, - "ms").toHtml(jsCollector) + formattedUnit).toHtml(jsCollector) val distributionDataForProcessingTime = - DistributionUIData( + new DistributionUIData( "processing-time-distribution", - processingTime.data.map(_._2), + processingTime.distributionData(unit), minTime, maxTime, - "ms").toHtml(jsCollector) + formattedUnit).toHtml(jsCollector) val timelineDataForTotalDelay = - TimelineUIData( + new TimelineUIData( "total-delay-timeline", - totalDelay.data, + totalDelay.timelineData(unit), minBatchTime, maxBatchTime, minTime, maxTime, - "ms").toHtml(jsCollector) + formattedUnit).toHtml(jsCollector) val distributionDataForTotalDelay = - DistributionUIData( + new DistributionUIData( "total-delay-distribution", - totalDelay.data.map(_._2), + totalDelay.distributionData(unit), minTime, maxTime, - "ms").toHtml(jsCollector) + formattedUnit).toHtml(jsCollector) val table = // scalastyle:off - + - @@ -325,8 +333,8 @@ private[ui] class StreamingPage(parent: StreamingTab) jsCollector: JsCollector, minX: Long, maxX: Long, - minY: Long, - maxY: Long): Seq[Node] = { + minY: Double, + maxY: Double): Seq[Node] = { val content = listener.receivedRecordsDistributions.map { case (receiverId, distribution) => generateInputReceiverRow(jsCollector, receiverId, distribution, minX, maxX, minY, maxY) }.foldLeft[Seq[Node]](Nil)(_ ++ _) @@ -334,10 +342,10 @@ private[ui] class StreamingPage(parent: StreamingTab)
TimelinesHistograms
TimelinesHistograms
+ +
{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)} Input Rate
Avg: {eventRateForAllReceivers.formattedAvg} events/sec
+
{timelineDataForEventRateOfAllReceivers} {distributionDataForEventRateOfAllReceivers}
- - - - + + + + @@ -353,8 +361,8 @@ private[ui] class StreamingPage(parent: StreamingTab) distribution: Option[Distribution], minX: Long, maxX: Long, - minY: Long, - maxY: Long): Seq[Node] = { + minY: Double, + maxY: Double): Seq[Node] = { val avgReceiverEvents = distribution.map(_.statCounter.mean.toLong) val receiverInfo = listener.receiverInfo(receiverId) val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId") @@ -371,7 +379,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val receivedRecords = listener.receivedRecordsWithBatchTime.get(receiverId).getOrElse(Seq()) val timelineForEventRate = - TimelineUIData( + new TimelineUIData( s"receiver-$receiverId-events-timeline", receivedRecords, minX, @@ -381,7 +389,7 @@ private[ui] class StreamingPage(parent: StreamingTab) "events/sec").toHtml(jsCollector) val distributionForEventsRate = - DistributionUIData( + new DistributionUIData( s"receiver-$receiverId-events-distribution", receivedRecords.map(_._2), minY, @@ -389,11 +397,13 @@ private[ui] class StreamingPage(parent: StreamingTab) "events/sec").toHtml(jsCollector) - @@ -449,6 +459,16 @@ private[ui] object StreamingPage { def formatDurationOption(msOption: Option[Long]): String = { msOption.map(formatDurationVerbose).getOrElse(emptyCell) } + + def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match { + case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000 // not used yet + case TimeUnit.MICROSECONDS => milliseconds * 1000 // not used yet + case TimeUnit.MILLISECONDS => milliseconds + case TimeUnit.SECONDS => milliseconds / 1000.0 + case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0 + case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0 + case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0 + } } /**
Status
Location
Last Error Time
Status
Location
Last Error Time
Last Error Message
+ +
{receiverName}
Avg: {avgReceiverEvents.map(_.toString).getOrElse(emptyCell)} events/sec
+
{receiverActive} {receiverLocation}