Skip to content

Commit

Permalink
Move time related methods to Streaming's UIUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed May 4, 2015
1 parent d5d86f6 commit e275e23
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 65 deletions.
37 changes: 0 additions & 37 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,6 @@ private[spark] object UIUtils extends Logging {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

/**
* Return the short string for a `TimeUnit`.
*/
def shortTimeUnitString(unit: TimeUnit): String = unit match {
case TimeUnit.NANOSECONDS => "ns"
case TimeUnit.MICROSECONDS => "us"
case TimeUnit.MILLISECONDS => "ms"
case TimeUnit.SECONDS => "sec"
case TimeUnit.MINUTES => "min"
case TimeUnit.HOURS => "hrs"
case TimeUnit.DAYS => "days"
}

/**
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
* after converting, also with its TimeUnit.
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
return (milliseconds, TimeUnit.MILLISECONDS)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 60) {
return (seconds, TimeUnit.SECONDS)
}
val minutes = seconds / 60
if (minutes < 60) {
return (minutes, TimeUnit.MINUTES)
}
val hours = minutes / 60
if (hours < 24) {
return (hours, TimeUnit.HOURS)
}
val days = hours / 24
(days, TimeUnit.DAYS)
}

def formatDate(date: Date): String = dateFormat.get.format(date)

def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.xml.{Node, Unparsed}

import org.apache.spark.Logging
import org.apache.spark.ui._
import org.apache.spark.ui.UIUtils._
import org.apache.spark.ui.{UIUtils => SparkUIUtils}

/**
* @param timelineDivId the timeline `id` used in the html `div` tag
Expand Down Expand Up @@ -103,13 +103,13 @@ private[ui] class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
* Converting the original data as per `unit`.
*/
def timelineData(unit: TimeUnit): Seq[(Long, Double)] =
data.map(x => x._1 -> StreamingPage.convertToTimeUnit(x._2, unit))
data.map(x => x._1 -> UIUtils.convertToTimeUnit(x._2, unit))

/**
* Converting the original data as per `unit`.
*/
def histogramData(unit: TimeUnit): Seq[Double] =
data.map(x => StreamingPage.convertToTimeUnit(x._2, unit))
data.map(x => UIUtils.convertToTimeUnit(x._2, unit))

val avg: Option[Long] = if (data.isEmpty) None else Some(data.map(_._2).sum / data.size)

Expand Down Expand Up @@ -149,17 +149,17 @@ private[ui] class StreamingPage(parent: StreamingTab)
generateStatTable() ++
generateBatchListTables()
}
UIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
SparkUIUtils.headerSparkPage("Streaming Statistics", content, parent, Some(5000))
}

/**
* Generate html that will load css/js files for StreamingPage
*/
private def generateLoadResources(): Seq[Node] = {
// scalastyle:off
<script src={UIUtils.prependBaseUri("/static/d3.min.js")}></script>
<link rel="stylesheet" href={UIUtils.prependBaseUri("/static/streaming-page.css")} type="text/css"/>
<script src={UIUtils.prependBaseUri("/static/streaming-page.js")}></script>
<script src={SparkUIUtils.prependBaseUri("/static/d3.min.js")}></script>
<link rel="stylesheet" href={SparkUIUtils.prependBaseUri("/static/streaming-page.css")} type="text/css"/>
<script src={SparkUIUtils.prependBaseUri("/static/streaming-page.js")}></script>
// scalastyle:on
}

Expand All @@ -168,15 +168,15 @@ private[ui] class StreamingPage(parent: StreamingTab)
val timeSinceStart = System.currentTimeMillis() - startTime
<div>Running batches of
<strong>
{formatDurationVerbose(listener.batchDuration)}
{SparkUIUtils.formatDurationVerbose(listener.batchDuration)}
</strong>
for
<strong>
{formatDurationVerbose(timeSinceStart)}
{SparkUIUtils.formatDurationVerbose(timeSinceStart)}
</strong>
since
<strong>
{UIUtils.formatDate(startTime)}
{SparkUIUtils.formatDate(startTime)}
</strong>
</div>
<br />
Expand Down Expand Up @@ -247,7 +247,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
| document.title, window.location.pathname + '?show-streams-detail=' + status);"""
.stripMargin.replaceAll("\\n", "") // it must be only one single line

val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit)
val batchInterval = UIUtils.convertToTimeUnit(listener.batchDuration, normalizedUnit)

val jsCollector = new JsCollector

Expand Down Expand Up @@ -423,7 +423,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
if (msg.size > 100) msg.take(97) + "..." else msg
}.getOrElse(emptyCell)
val receiverLastErrorTime = receiverInfo.map {
r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime)
r => if (r.lastErrorTime < 0) "-" else SparkUIUtils.formatDate(r.lastErrorTime)
}.getOrElse(emptyCell)
val receivedRecords = new EventRateUIData(eventRates)

Expand Down Expand Up @@ -491,22 +491,9 @@ private[ui] object StreamingPage {
* Returns a human-readable string representing a duration such as "5 second 35 ms"
*/
def formatDurationOption(msOption: Option[Long]): String = {
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
msOption.map(SparkUIUtils.formatDurationVerbose).getOrElse(emptyCell)
}

/**
* Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
* will discard the fractional part.
*/
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
case TimeUnit.MICROSECONDS => milliseconds * 1000
case TimeUnit.MILLISECONDS => milliseconds
case TimeUnit.SECONDS => milliseconds / 1000.0
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.ui

import java.util.concurrent.TimeUnit

object UIUtils {

/**
* Return the short string for a `TimeUnit`.
*/
def shortTimeUnitString(unit: TimeUnit): String = unit match {
case TimeUnit.NANOSECONDS => "ns"
case TimeUnit.MICROSECONDS => "us"
case TimeUnit.MILLISECONDS => "ms"
case TimeUnit.SECONDS => "sec"
case TimeUnit.MINUTES => "min"
case TimeUnit.HOURS => "hrs"
case TimeUnit.DAYS => "days"
}

/**
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
* after converting, also with its TimeUnit.
*/
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
if (milliseconds < 1000) {
return (milliseconds, TimeUnit.MILLISECONDS)
}
val seconds = milliseconds.toDouble / 1000
if (seconds < 60) {
return (seconds, TimeUnit.SECONDS)
}
val minutes = seconds / 60
if (minutes < 60) {
return (minutes, TimeUnit.MINUTES)
}
val hours = minutes / 60
if (hours < 24) {
return (hours, TimeUnit.HOURS)
}
val days = hours / 24
(days, TimeUnit.DAYS)
}

/**
* Convert `milliseconds` to the specified `unit`. We cannot use `TimeUnit.convert` because it
* will discard the fractional part.
*/
def convertToTimeUnit(milliseconds: Long, unit: TimeUnit): Double = unit match {
case TimeUnit.NANOSECONDS => milliseconds * 1000 * 1000
case TimeUnit.MICROSECONDS => milliseconds * 1000
case TimeUnit.MILLISECONDS => milliseconds
case TimeUnit.SECONDS => milliseconds / 1000.0
case TimeUnit.MINUTES => milliseconds / 1000.0 / 60.0
case TimeUnit.HOURS => milliseconds / 1000.0 / 60.0 / 60.0
case TimeUnit.DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
* limitations under the License.
*/


package org.apache.spark.ui
package org.apache.spark.streaming.ui

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -49,4 +48,20 @@ class UIUtilsSuite extends FunSuite with Matchers{
time should be (expectedTime +- 1E-6)
unit should be (expectedUnit)
}

test("convertToTimeUnit") {
verifyConvertToTimeUnit(60.0 * 1000 * 1000 * 1000, 60 * 1000, TimeUnit.NANOSECONDS)
verifyConvertToTimeUnit(60.0 * 1000 * 1000, 60 * 1000, TimeUnit.MICROSECONDS)
verifyConvertToTimeUnit(60 * 1000, 60 * 1000, TimeUnit.MILLISECONDS)
verifyConvertToTimeUnit(60, 60 * 1000, TimeUnit.SECONDS)
verifyConvertToTimeUnit(1, 60 * 1000, TimeUnit.MINUTES)
verifyConvertToTimeUnit(1.0 / 60, 60 * 1000, TimeUnit.HOURS)
verifyConvertToTimeUnit(1.0 / 60 / 24, 60 * 1000, TimeUnit.DAYS)
}

private def verifyConvertToTimeUnit(
expectedTime: Double, milliseconds: Long, unit: TimeUnit): Unit = {
val convertedTime = UIUtils.convertToTimeUnit(milliseconds, unit)
convertedTime should be (expectedTime +- 1E-6)
}
}

0 comments on commit e275e23

Please sign in to comment.