Skip to content

Commit

Permalink
Changed streaming UI to attach itself as a tab with the Spark UI.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 10, 2014
1 parent 827e81a commit 1af239b
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 405 deletions.
105 changes: 0 additions & 105 deletions core/src/main/scala/org/apache/spark/ui/FooTab.scala

This file was deleted.

15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ private[spark] object UIUtils {
appName: String,
title: String,
tabs: Seq[UITab],
activeTab: UITab) : Seq[Node] = {
activeTab: UITab,
refreshInterval: Option[Int] = None
) : Seq[Node] = {

val header = tabs.map { tab =>
<li class={if (tab == activeTab) "active" else ""}>
Expand All @@ -78,8 +80,17 @@ private[spark] object UIUtils {
type="text/css" />
<script src={prependBaseUri("/static/sorttable.js")} ></script>
<title>{appName} - {title}</title>
<script type="text/JavaScript">
<!--
function timedRefresh(timeoutPeriod) {
if (timeoutPeriod > 0) {
setTimeout("location.reload(true);",timeoutPeriod);
}
}
// -->
</script>
</head>
<body>
<body onload={s"JavaScript:timedRefresh(${refreshInterval.getOrElse(-1)});"}>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href={prependBaseUri(basePath, "/")} class="brand">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,28 @@

package org.apache.spark.streaming

import scala.collection.mutable.Queue
import scala.collection.Map
import scala.reflect.ClassTag

import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.Props
import akka.actor.SupervisorStrategy
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import scala.collection.Map
import scala.collection.mutable.Queue
import scala.reflect.ClassTag

import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.MetadataCleaner
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.ui.StreamingUI
import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.util.MetadataCleaner

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down Expand Up @@ -159,8 +157,8 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

private[streaming] val ui = new StreamingUI(this)
ui.bind()
private[streaming] val ui = new StreamingTab(this)
ui.start()

/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.{AddBlocks, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.util.{AkkaUtils, Utils}

Expand Down Expand Up @@ -237,7 +237,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
level: StorageLevel
) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
trackerActor ! AddBlocks(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
logDebug("Pushed block " + blockId)
}

Expand All @@ -251,7 +251,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
level: StorageLevel
) {
env.blockManager.putBytes(blockId, bytes, level)
trackerActor ! AddBlocks(ReceivedBlockInfo(streamId, blockId, -1, metadata))
trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
}

/** Set the ID of the DStream that this receiver is associated with */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[streaming] case class RegisterReceiver(
host: String,
receiverActor: ActorRef
) extends NetworkInputTrackerMessage
private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
Expand Down Expand Up @@ -153,7 +153,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
case RegisterReceiver(streamId, typ, host, receiverActor) =>
registerReceiver(streamId, typ, host, receiverActor, sender)
sender ! true
case AddBlocks(receivedBlockInfo) =>
case AddBlock(receivedBlockInfo) =>
addBlocks(receivedBlockInfo)
case DeregisterReceiver(streamId, message) =>
deregisterReceiver(streamId, message)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package org.apache.spark.streaming.ui

import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.{Queue, HashMap}
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.ReceiverInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution


private[ui] class StreamingProgressListener(ssc: StreamingContext) extends StreamingListener {

private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedaBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
private var totalCompletedBatches = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]

val batchDuration = ssc.graph.batchDuration.milliseconds

override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
synchronized {
receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
}
}

override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
}

override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
}

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
totalCompletedBatches += 1L
}

def numNetworkReceivers = synchronized {
ssc.graph.getNetworkInputStreams().size
}

def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
}

def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
}

def waitingBatches: Seq[BatchInfo] = synchronized {
waitingBatchInfos.values.toSeq
}

def runningBatches: Seq[BatchInfo] = synchronized {
runningBatchInfos.values.toSeq
}

def completedBatches: Seq[BatchInfo] = synchronized {
completedaBatchInfos.toSeq
}

def processingDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.processingDelay)
}

def schedulingDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.schedulingDelay)
}

def totalDelayDistribution: Option[Distribution] = synchronized {
extractDistribution(_.totalDelay)
}

def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
(0 until numNetworkReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
batchInfo.get(receiverId).getOrElse(Array.empty)
}
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
// calculate records per second for each batch
blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
}
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
}.toMap
}

def lastReceivedBatchRecords: Map[Int, Long] = {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numNetworkReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
}.toMap
}.getOrElse {
(0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
}
}

def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
receiverInfos.get(receiverId)
}

def lastCompletedBatch: Option[BatchInfo] = {
completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}

def lastReceivedBatch: Option[BatchInfo] = {
allBatches.lastOption
}

private def allBatches: Seq[BatchInfo] = synchronized {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
}

private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
}
Loading

0 comments on commit 1af239b

Please sign in to comment.