From 914b8ffd575b30670cc9829d25d6f277e7746841 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 10 Apr 2014 19:30:19 -0700 Subject: [PATCH] Moved utils functions to UIUtils. --- .../scala/org/apache/spark/ui/UIUtils.scala | 108 +++++++++++-- .../spark/streaming/StreamingContext.scala | 3 +- ...ala => StreamingJobProgressListener.scala} | 2 +- .../spark/streaming/ui/StreamingPage.scala | 153 +++--------------- .../spark/streaming/ui/StreamingTab.scala | 31 +++- .../org/apache/spark/streaming/UISuite.scala | 55 ++++++- 6 files changed, 197 insertions(+), 155 deletions(-) rename streaming/src/main/scala/org/apache/spark/streaming/ui/{StreamingProgressListener.scala => StreamingJobProgressListener.scala} (98%) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index fcda341ae5941..b210c8d852898 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -18,12 +18,13 @@ package org.apache.spark.ui import java.text.SimpleDateFormat -import java.util.Date +import java.util.{Locale, Date} import scala.xml.Node +import org.apache.spark.Logging /** Utility functions for generating XML pages with spark content. */ -private[spark] object UIUtils { +private[spark] object UIUtils extends Logging { // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. private val dateFormat = new ThreadLocal[SimpleDateFormat]() { @@ -49,6 +50,80 @@ private[spark] object UIUtils { "%.1f h".format(hours) } + /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */ + def formatDurationVerbose(ms: Long): String = { + try { + val second = 1000L + val minute = 60 * second + val hour = 60 * minute + val day = 24 * hour + val week = 7 * day + val year = 365 * day + + def toString(num: Long, unit: String): String = { + if (num == 0) { + "" + } else if (num == 1) { + s"$num $unit" + } else { + s"$num ${unit}s" + } + } + + val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms" + val secondString = toString((ms % minute) / second, "second") + val minuteString = toString((ms % hour) / minute, "minute") + val hourString = toString((ms % day) / hour, "hour") + val dayString = toString((ms % week) / day, "day") + val weekString = toString((ms % year) / week, "week") + val yearString = toString(ms / year, "year") + + Seq( + second -> millisecondsString, + minute -> s"$secondString $millisecondsString", + hour -> s"$minuteString $secondString", + day -> s"$hourString $minuteString $secondString", + week -> s"$dayString $hourString $minuteString", + year -> s"$weekString $dayString $hourString" + ).foreach { case (durationLimit, durationString) => + if (ms < durationLimit) { + // if time is less than the limit (upto year) + return durationString + } + } + // if time is more than a year + return s"$yearString $weekString $dayString" + } catch { + case e: Exception => + logError("Error converting time to string", e) + // if there is some error, return blank string + return "" + } + } + + /** Generate a human-readable string representing a number (e.g. 100 K) */ + def formatNumber(records: Double): String = { + val trillion = 1e12 + val billion = 1e9 + val million = 1e6 + val thousand = 1e3 + + val (value, unit) = { + if (records >= 2*trillion) { + (records / trillion, " T") + } else if (records >= 2*billion) { + (records / billion, " B") + } else if (records >= 2*million) { + (records / million, " M") + } else if (records >= 2*thousand) { + (records / thousand, " K") + } else { + (records, "") + } + } + "%.1f%s".formatLocal(Locale.US, value, unit) + } + // Yarn has to go through a proxy so the base uri is provided and has to be on all links val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") @@ -146,21 +221,36 @@ private[spark] object UIUtils { /** Returns an HTML table constructed by generating a row for each object in a sequence. */ def listingTable[T]( headers: Seq[String], - makeRow: T => Seq[Node], - rows: Seq[T], + generateDataRow: T => Seq[Node], + data: Seq[T], fixedWidth: Boolean = false): Seq[Node] = { - val colWidth = 100.toDouble / headers.size - val colWidthAttr = if (fixedWidth) colWidth + "%" else "" var tableClass = "table table-bordered table-striped table-condensed sortable" if (fixedWidth) { tableClass += " table-fixed" } - + val colWidth = 100.toDouble / headers.size + val colWidthAttr =if (fixedWidth) colWidth + "%" else "" + val headerRow: Seq[Node] = { + // if none of the headers have "\n" in them + if (headers.forall(!_.contains("\n"))) { + // represent header as simple text + headers.map(h => {h}) + } else { + // represent header text as list while respecting "\n" + headers.map { case h => + + + + } + } + } - {headers.map(h => )} + {headerRow} - {rows.map(r => makeRow(r))} + {data.map(r => generateDataRow(r))}
{h}
} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 906d4067a14eb..ff5d0aaa3d0bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -157,8 +157,7 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter - private[streaming] val ui = new StreamingTab(this) - ui.start() + private[streaming] val uiTab = new StreamingTab(this) /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala similarity index 98% rename from streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala rename to streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 32a4644e2a3e9..8921b99f53a23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -28,7 +28,7 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted import org.apache.spark.util.Distribution -private[ui] class StreamingProgressListener(ssc: StreamingContext) extends StreamingListener { +private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener { private val waitingBatchInfos = new HashMap[Time, BatchInfo] private val runningBatchInfos = new HashMap[Time, BatchInfo] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 5cd900c2f88f0..58960812e1205 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -17,30 +17,24 @@ package org.apache.spark.streaming.ui -import java.util.{Calendar, Locale} +import java.util.Calendar import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.Logging import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ import org.apache.spark.util.Distribution /** Page for Spark Web UI that shows statistics of a streaming job */ private[ui] class StreamingPage(parent: StreamingTab) extends UIPage("") with Logging { - private val ssc = parent.ssc - private val sc = ssc.sparkContext - private val sparkUI = sc.ui - private val listener = new StreamingProgressListener(ssc) - private val calendar = Calendar.getInstance() - private val startTime = calendar.getTime() + private val listener = parent.streamingListener + private val startTime = Calendar.getInstance().getTime() private val emptyCellTest = "-" - ssc.addStreamingListener(listener) - parent.attachPage(this) - /** Render the page */ override def render(request: HttpServletRequest): Seq[Node] = { val content = @@ -49,7 +43,7 @@ private[ui] class StreamingPage(parent: StreamingTab) generateNetworkStatsTable() ++ generateBatchStatsTable() UIUtils.headerSparkPage( - content, sparkUI.basePath, sc.appName, "Streaming", sparkUI.getTabs, parent, Some(5000)) + content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000)) } /** Generate basic stats of the streaming program */ @@ -60,13 +54,13 @@ private[ui] class StreamingPage(parent: StreamingTab) Started at: {startTime.toString}
  • - Time since start: {msDurationToString(timeSinceStart)} + Time since start: {formatDurationVerbose(timeSinceStart)}
  • Network receivers: {listener.numNetworkReceivers}
  • - Batch interval: {msDurationToString(listener.batchDuration)} + Batch interval: {formatDurationVerbose(listener.batchDuration)}
  • Processed batches: {listener.numTotalCompletedBatches} @@ -85,7 +79,7 @@ private[ui] class StreamingPage(parent: StreamingTab) val headerRow = Seq( "Receiver", "Location", - "Records in last batch", + "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime())+ "]", "Minimum rate\n[records/sec]", "25th percentile rate\n[records/sec]", "Median rate\n[records/sec]", @@ -96,15 +90,15 @@ private[ui] class StreamingPage(parent: StreamingTab) val receiverInfo = listener.receiverInfo(receiverId) val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest) - val receiverLastBatchRecords = numberToString(lastBatchReceivedRecord(receiverId)) + val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId)) val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => - d.getQuantiles().map(r => numberToString(r.toLong)) + d.getQuantiles().map(r => formatDurationVerbose(r.toLong)) }.getOrElse { Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest) } Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats } - Some(listingTable(headerRow, dataRows, fixedWidth = true)) + Some(listingTable(headerRow, dataRows)) } else { None } @@ -124,19 +118,19 @@ private[ui] class StreamingPage(parent: StreamingTab) val processingDelayQuantilesRow = { Seq( "Processing Time", - msDurationToString(lastCompletedBatch.flatMap(_.processingDelay)) + formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay)) ) ++ getQuantiles(listener.processingDelayDistribution) } val schedulingDelayQuantilesRow = { Seq( "Scheduling Delay", - msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay)) + formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay)) ) ++ getQuantiles(listener.schedulingDelayDistribution) } val totalDelayQuantilesRow = { Seq( "Total Delay", - msDurationToString(lastCompletedBatch.flatMap(_.totalDelay)) + formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay)) ) ++ getQuantiles(listener.totalDelayDistribution) } val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", @@ -146,7 +140,7 @@ private[ui] class StreamingPage(parent: StreamingTab) schedulingDelayQuantilesRow, totalDelayQuantilesRow ) - Some(listingTable(headerRow, dataRows, fixedWidth = true)) + Some(listingTable(headerRow, dataRows)) } else { None } @@ -162,130 +156,25 @@ private[ui] class StreamingPage(parent: StreamingTab) content } - /** - * Returns a human-readable string representing a number - */ - private def numberToString(records: Double): String = { - val trillion = 1e12 - val billion = 1e9 - val million = 1e6 - val thousand = 1e3 - - val (value, unit) = { - if (records >= 2*trillion) { - (records / trillion, " T") - } else if (records >= 2*billion) { - (records / billion, " B") - } else if (records >= 2*million) { - (records / million, " M") - } else if (records >= 2*thousand) { - (records / thousand, " K") - } else { - (records, "") - } - } - "%.1f%s".formatLocal(Locale.US, value, unit) - } /** * Returns a human-readable string representing a duration such as "5 second 35 ms" */ - private def msDurationToString(ms: Long): String = { - try { - val second = 1000L - val minute = 60 * second - val hour = 60 * minute - val day = 24 * hour - val week = 7 * day - val year = 365 * day - - def toString(num: Long, unit: String): String = { - if (num == 0) { - "" - } else if (num == 1) { - s"$num $unit" - } else { - s"$num ${unit}s" - } - } - - val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms" - val secondString = toString((ms % minute) / second, "second") - val minuteString = toString((ms % hour) / minute, "minute") - val hourString = toString((ms % day) / hour, "hour") - val dayString = toString((ms % week) / day, "day") - val weekString = toString((ms % year) / week, "week") - val yearString = toString(ms / year, "year") - - Seq( - second -> millisecondsString, - minute -> s"$secondString $millisecondsString", - hour -> s"$minuteString $secondString", - day -> s"$hourString $minuteString $secondString", - week -> s"$dayString $hourString $minuteString", - year -> s"$weekString $dayString $hourString" - ).foreach { case (durationLimit, durationString) => - if (ms < durationLimit) { - // if time is less than the limit (upto year) - return durationString - } - } - // if time is more than a year - return s"$yearString $weekString $dayString" - } catch { - case e: Exception => - logError("Error converting time to string", e) - // if there is some error, return blank string - return "" - } - } - - /** - * Returns a human-readable string representing a duration such as "5 second 35 ms" - */ - private def msDurationToString(msOption: Option[Long]): String = { - msOption.map(msDurationToString).getOrElse(emptyCellTest) + private def formatDurationOption(msOption: Option[Long]): String = { + msOption.map(formatDurationVerbose).getOrElse(emptyCellTest) } /** Get quantiles for any time distribution */ private def getQuantiles(timeDistributionOption: Option[Distribution]) = { - timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) } + timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) } } - /** Generate an HTML table constructed by generating a row for each object in a sequence. */ - def listingTable[T]( - headerRow: Seq[String], - dataRows: Seq[Seq[String]], - fixedWidth: Boolean = false - ): Seq[Node] = { - - val colWidth = 100.toDouble / headerRow.size - val colWidthAttr = if (fixedWidth) colWidth + "%" else "" - var tableClass = "table table-bordered table-striped table-condensed sortable" - if (fixedWidth) { - tableClass += " table-fixed" - } - - def generateHeaderRow(header: Seq[String]): Seq[Node] = { - headerRow.map { case h => - - - - } - } - + /** Generate HTML table from string data */ + private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { def generateDataRow(data: Seq[String]): Seq[Node] = { {data.map(d => {d})} } - - - {generateHeaderRow(headerRow)} - - {dataRows.map(r => generateDataRow(r))} - -
    + UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index 1aaf7764b5ceb..5a817b067e4fe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -18,15 +18,34 @@ package org.apache.spark.streaming.ui import org.apache.spark.streaming.StreamingContext -import org.apache.spark.ui.UITab +import org.apache.spark.ui.{SparkUI, UITab} import org.apache.spark.Logging +import java.util.concurrent.atomic.AtomicInteger -/** Spark Web UI tab that shows statistics of a streaming job */ -private[spark] class StreamingTab(val ssc: StreamingContext) - extends UITab("streaming") with Logging { +/** Streaming tab in the Spark web UI */ +private[spark] class StreamingTab(ssc: StreamingContext) + extends UITab(StreamingTab.streamingTabName) with Logging { - val streamingPage = new StreamingPage(this) - ssc.sc.ui.attachTab(this) + val parent = ssc.sc.ui + val streamingListener = new StreamingJobProgressListener(ssc) + val basePath = parent.basePath + val appName = parent.appName + + ssc.addStreamingListener(streamingListener) + attachPage(new StreamingPage(this)) + parent.attachTab(this) + + def headerTabs = parent.getTabs def start() { } } + +object StreamingTab { + private val atomicInteger = new AtomicInteger(0) + + /** Generate the name of the streaming tab. For the first streaming tab it will be */ + def streamingTabName: String = { + val count = atomicInteger.getAndIncrement + if (count == 0) "streaming" else s"streaming-$count" + } +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala index 8f6e3ea9dce40..5bba5d9a39dd7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -17,15 +17,60 @@ package org.apache.spark.streaming -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} -import org.apache.spark.streaming.dstream.InputDStream import scala.reflect.ClassTag -import org.apache.spark.rdd.RDD import scala.util.Random +import scala.io.Source + +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ +import org.scalatest.matchers.ShouldMatchers + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.InputDStream + +class UISuite extends FunSuite with ShouldMatchers with BeforeAndAfterAll with BeforeAndAfter { + var sc: SparkContext = null + var ssc: StreamingContext = null + + override def beforeAll() { + val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) + conf.set("spark.cleaner.ttl", "1800") + sc = new SparkContext(conf) + } + + override def afterAll() { + if (sc != null) sc.stop() + } + + before { + ssc = new StreamingContext(sc, Seconds(1)) + } + + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + } -class UISuite extends FunSuite with BeforeAndAfterAll { + test("streaming tab in spark UI") { + val ssc = new StreamingContext(sc, Seconds(1)) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val uiData = Source.fromURL( + ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + assert(uiData.contains("streaming")) + } + } + + test("multiple streaming tabs") { + val ssc1 = new StreamingContext(sc, Seconds(1)) + val ssc2 = new StreamingContext(sc, Seconds(2)) + ssc1.uiTab.prefix should not be ssc2.uiTab.prefix + } - test("Testing") { + ignore("Testing") { runStreaming(1000000) }