From e275e23266e49c29a27fcbf41f90e57c739f4405 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 4 May 2015 16:58:44 -0700 Subject: [PATCH] Move time related methods to Streaming's UIUtils --- .../scala/org/apache/spark/ui/UIUtils.scala | 37 ---------- .../spark/streaming/ui/StreamingPage.scala | 39 ++++------ .../apache/spark/streaming/ui/UIUtils.scala | 74 +++++++++++++++++++ .../spark/streaming}/ui/UIUtilsSuite.scala | 19 ++++- 4 files changed, 104 insertions(+), 65 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala rename {core/src/test/scala/org/apache/spark => streaming/src/test/scala/org/apache/spark/streaming}/ui/UIUtilsSuite.scala (71%) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 3a2dadb5661a9..2ddc55d56567f 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -35,43 +35,6 @@ private[spark] object UIUtils extends Logging { override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") } - /** - * Return the short string for a `TimeUnit`. - */ - def shortTimeUnitString(unit: TimeUnit): String = unit match { - case TimeUnit.NANOSECONDS => "ns" - case TimeUnit.MICROSECONDS => "us" - case TimeUnit.MILLISECONDS => "ms" - case TimeUnit.SECONDS => "sec" - case TimeUnit.MINUTES => "min" - case TimeUnit.HOURS => "hrs" - case TimeUnit.DAYS => "days" - } - - /** - * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value - * after converting, also with its TimeUnit. - */ - def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = { - if (milliseconds < 1000) { - return (milliseconds, TimeUnit.MILLISECONDS) - } - val seconds = milliseconds.toDouble / 1000 - if (seconds < 60) { - return (seconds, TimeUnit.SECONDS) - } - val minutes = seconds / 60 - if (minutes < 60) { - return (minutes, TimeUnit.MINUTES) - } - val hours = minutes / 60 - if (hours < 24) { - return (hours, TimeUnit.HOURS) - } - val days = hours / 24 - (days, TimeUnit.DAYS) - } - def formatDate(date: Date): String = dateFormat.get.format(date) def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala index 5526c85c181c0..f8ed28addabb0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -27,7 +27,7 @@ import scala.xml.{Node, Unparsed} import org.apache.spark.Logging import org.apache.spark.ui._ -import org.apache.spark.ui.UIUtils._ +import org.apache.spark.ui.{UIUtils => SparkUIUtils} /** * @param timelineDivId the timeline `id` used in the html `div` tag @@ -103,13 +103,13 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) { * Converting the original data as per `unit`. */ def timelineData(unit: TimeUnit): Seq[(Long, Double)] = - data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit)) + data.map(x => x._1 -> UIUtils.convertToTimeUnit(x._2, unit)) /** * Converting the original data as per `unit`. */ def histogramData(unit: TimeUnit): Seq[Double] = - data.map(x => StreamingPage.convertToTimeUnit(x._2, unit)) + data.map(x => UIUtils.convertToTimeUnit(x._2, unit)) val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size) @@ -149,7 +149,7 @@ private[ui] class StreamingPage(parent: StreamingTab) generateStatTable() ++ generateBatchListTables() } - UIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000)) + SparkUIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000)) } /** @@ -157,9 +157,9 @@ private[ui] class StreamingPage(parent: StreamingTab) */ private def generateLoadResources(): Seq[Node] = { // scalastyle:off - - - + + + // scalastyle:on } @@ -168,15 +168,15 @@ private[ui] class StreamingPage(parent: StreamingTab) val timeSinceStart = System.currentTimeMillis() - startTime
Running batches of - {formatDurationVerbose(listener.batchDuration)} + {SparkUIUtils.formatDurationVerbose(listener.batchDuration)} for - {formatDurationVerbose(timeSinceStart)} + {SparkUIUtils.formatDurationVerbose(timeSinceStart)} since - {UIUtils.formatDate(startTime)} + {SparkUIUtils.formatDate(startTime)}

@@ -247,7 +247,7 @@ private[ui] class StreamingPage(parent: StreamingTab) | document.title, window.location.pathname + '?show-streams-detail=' + status);""" .stripMargin.replaceAll("\\n", "") // it must be only one single line - val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit) + val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit) val jsCollector = new JsCollector @@ -423,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab) if (msg.size > 100) msg.take(97) + "..." else msg }.getOrElse(emptyCell) val receiverLastErrorTime = receiverInfo.map { - r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime) + r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime) }.getOrElse(emptyCell) val receivedRecords = new EventRateUIData(eventRates) @@ -491,22 +491,9 @@ private[ui] object StreamingPage { * Returns a human-readable string representing a duration such as "5 second 35 ms" */ def formatDurationOption(msOption: Option[Long]): String = { - msOption.map(formatDurationVerbose).getOrElse(emptyCell) + msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell) } - /** - * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it - * will discard the fractional part. - */ - def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match { - case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000 - case TimeUnit.MICROSECONDS => milliseconds * 1000 - case TimeUnit.MILLISECONDS => milliseconds - case TimeUnit.SECONDS => milliseconds / 1000.0 - case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0 - case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0 - case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0 - } } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala new file mode 100644 index 0000000000000..c206f973b2c66 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import java.util.concurrent.TimeUnit + +object UIUtils { + + /** + * Return the short string for a `TimeUnit`. + */ + def shortTimeUnitString(unit: TimeUnit): String = unit match { + case TimeUnit.NANOSECONDS => "ns" + case TimeUnit.MICROSECONDS => "us" + case TimeUnit.MILLISECONDS => "ms" + case TimeUnit.SECONDS => "sec" + case TimeUnit.MINUTES => "min" + case TimeUnit.HOURS => "hrs" + case TimeUnit.DAYS => "days" + } + + /** + * Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value + * after converting, also with its TimeUnit. + */ + def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = { + if (milliseconds < 1000) { + return (milliseconds, TimeUnit.MILLISECONDS) + } + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return (seconds, TimeUnit.SECONDS) + } + val minutes = seconds / 60 + if (minutes < 60) { + return (minutes, TimeUnit.MINUTES) + } + val hours = minutes / 60 + if (hours < 24) { + return (hours, TimeUnit.HOURS) + } + val days = hours / 24 + (days, TimeUnit.DAYS) + } + + /** + * Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it + * will discard the fractional part. + */ + def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match { + case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000 + case TimeUnit.MICROSECONDS => milliseconds * 1000 + case TimeUnit.MILLISECONDS => milliseconds + case TimeUnit.SECONDS => milliseconds / 1000.0 + case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0 + case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0 + case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0 + } +} diff --git a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala similarity index 71% rename from core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala rename to streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala index 636b2c389e05e..6df1a63ab2e37 100644 --- a/core/src/test/scala/org/apache/spark/ui/UIUtilsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/UIUtilsSuite.scala @@ -15,8 +15,7 @@ * limitations under the License. */ - -package org.apache.spark.ui +package org.apache.spark.streaming.ui import java.util.concurrent.TimeUnit @@ -49,4 +48,20 @@ class UIUtilsSuite extends FunSuite with Matchers{ time should be (expectedTime +- 1E-6) unit should be (expectedUnit) } + + test("convertToTimeUnit") { + verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS) + verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS) + verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS) + verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS) + verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES) + verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS) + verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS) + } + + private def verifyConvertToTimeUnit( + expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = { + val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit) + convertedTime should be (expectedTime +- 1E-6) + } }