Skip to content

Commit

Permalink
Use InputInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed May 2, 2015
1 parent b50fa32 commit 3be4b7a
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ $(function() {
}
}

if (getParameterFromURL("show-receivers-detail") == "true") {
// Show the details for all receivers
if (getParameterFromURL("show-streams-detail") == "true") {
// Show the details for all InputDStream
$('#inputs-table').toggle('collapsed');
$('#triangle').html('▼');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}

def getInputStreamName(streamId: Int): Option[String] = synchronized {
inputStreams.find(_.id == streamId).map(_.name)
}

def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()

/**
* The name of this InputDStream. By default, it's the class name with its id.
*/
def name: String = s"${getClass.getSimpleName}-$id"

/**
* Checks whether the 'time' is valid wrt slideDuration for generating RDD.
* Additionally it also ensures valid times are in strictly increasing order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobI

private[ui] case class BatchUIData(
val batchTime: Time,
val receiverNumRecords: Map[Int, Long],
val streamIdToNumRecords: Map[Int, Long],
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
Expand Down Expand Up @@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receiverNumRecords.map(_._2).sum
def numRecords: Long = streamIdToNumRecords.values.sum
}

private[ui] object BatchUIData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}

def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
def numReceivers: Int = synchronized {
receiverInfos.size
}

def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
Expand Down Expand Up @@ -174,35 +176,41 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
completedBatchUIData.toSeq
}

def allReceivers: Seq[Int] = synchronized {
receiverInfos.keys.toSeq
def streamName(streamId: Int): Option[String] = {
ssc.graph.getInputStreamName(streamId)
}

/**
* Return all of the event rates for each receiver in each batch.
* Return all InputDStream Ids
*/
def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)

/**
* Return all of the event rates for each InputDStream in each batch.
*/
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val latestBatches = retainedBatches.map { batchUIData =>
(batchUIData.batchTime.milliseconds, batchUIData.receiverNumRecords)
val _retainedBatches = retainedBatches
val latestBatches = _retainedBatches.map { batchUIData =>
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
}
(0 until numReceivers).map { receiverId =>
streamIds.map { streamId =>
val eventRates = latestBatches.map {
case (batchTime, receiverNumRecords) =>
val numRecords = receiverNumRecords.getOrElse(receiverId, 0L)
case (batchTime, streamIdToNumRecords) =>
val numRecords = streamIdToNumRecords.getOrElse(streamId, 0L)
(batchTime, numRecords * 1000.0 / batchDuration)
}
(receiverId, eventRates)
(streamId, eventRates)
}.toMap
}

def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords)
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L))
streamIds.map { streamId =>
(streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
}.toMap
}.getOrElse {
(0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
streamIds.map(streamId => (streamId, 0L)).toMap
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max

val eventRateForAllReceivers = new EventRateUIData(batches.map { batchInfo =>
val eventRateForAllStreams = new EventRateUIData(batches.map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
})

Expand All @@ -231,37 +231,37 @@ private[ui] class StreamingPage(parent: StreamingTab)
val (maxTime, normalizedUnit) = UIUtils.normalizeDuration(_maxTime)
val formattedUnit = UIUtils.shortTimeUnitString(normalizedUnit)

// Use the max input rate for all receivers' graphs to make the Y axis ranges same.
// Use the max input rate for all InputDStreams' graphs to make the Y axis ranges same.
// If it's not an integral number, just use its ceil integral number.
val maxEventRate = eventRateForAllReceivers.max.map(_.ceil.toLong).getOrElse(0L)
val maxEventRate = eventRateForAllStreams.max.map(_.ceil.toLong).getOrElse(0L)
val minEventRate = 0L

// JavaScript to show/hide the receiver sub table.
// JavaScript to show/hide the InputDStreams sub table.
val triangleJs =
s"""$$('#inputs-table').toggle('collapsed');
|var status = false;
|if ($$(this).html() == '$BLACK_RIGHT_TRIANGLE_HTML') {
|$$(this).html('$BLACK_DOWN_TRIANGLE_HTML');status = true;}
|else {$$(this).html('$BLACK_RIGHT_TRIANGLE_HTML');status = false;}
|window.history.pushState('',
| document.title, window.location.pathname + '?show-receivers-detail=' + status);"""
| document.title, window.location.pathname + '?show-streams-detail=' + status);"""
.stripMargin.replaceAll("\\n", "") // it must be only one single line

val batchInterval = StreamingPage.convertToTimeUnit(listener.batchDuration, normalizedUnit)

val jsCollector = new JsCollector

val graphUIDataForEventRateOfAllReceivers =
val graphUIDataForEventRateOfAllStreams =
new GraphUIData(
"all-receiver-events-timeline",
"all-receiver-events-histogram",
eventRateForAllReceivers.data,
"all-stream-events-timeline",
"all-stream-events-histogram",
eventRateForAllStreams.data,
minBatchTime,
maxBatchTime,
minEventRate,
maxEventRate,
"events/sec")
graphUIDataForEventRateOfAllReceivers.generateDataJs(jsCollector)
graphUIDataForEventRateOfAllStreams.generateDataJs(jsCollector)

val graphUIDataForSchedulingDelay =
new GraphUIData(
Expand Down Expand Up @@ -299,7 +299,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
formattedUnit)
graphUIDataForTotalDelay.generateDataJs(jsCollector)

val hasReceiver = listener.allReceivers.nonEmpty
// It's false before the user registers the first InputDStream
val hasStream = listener.streamIds.nonEmpty

val numCompletedBatches = listener.retainedCompletedBatches.size
val numActiveBatches = batchTimes.length - numCompletedBatches
Expand All @@ -317,21 +318,21 @@ private[ui] class StreamingPage(parent: StreamingTab)
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div>
{if (hasReceiver) {
{if (hasStream) {
<span id="triangle" onclick={Unparsed(triangleJs)}>{Unparsed(BLACK_RIGHT_TRIANGLE_HTML)}</span>
}}
<strong>Input Rate</strong>
</div>
<div>Avg: {eventRateForAllReceivers.formattedAvg} events/sec</div>
<div>Avg: {eventRateForAllStreams.formattedAvg} events/sec</div>
</div>
</td>
<td class="timeline">{graphUIDataForEventRateOfAllReceivers.generateTimelineHtml(jsCollector)}</td>
<td class="histogram">{graphUIDataForEventRateOfAllReceivers.generateHistogramHtml(jsCollector)}</td>
<td class="timeline">{graphUIDataForEventRateOfAllStreams.generateTimelineHtml(jsCollector)}</td>
<td class="histogram">{graphUIDataForEventRateOfAllStreams.generateHistogramHtml(jsCollector)}</td>
</tr>
{if (hasReceiver) {
{if (hasStream) {
<tr id="inputs-table" style="display: none;" >
<td colspan="3">
{generateInputReceiversTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
{generateInputDStreamsTable(jsCollector, minBatchTime, maxBatchTime, minEventRate, maxEventRate)}
</td>
</tr>
}}
Expand Down Expand Up @@ -372,14 +373,14 @@ private[ui] class StreamingPage(parent: StreamingTab)
generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
}

private def generateInputReceiversTable(
private def generateInputDStreamsTable(
jsCollector: JsCollector,
minX: Long,
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
val content = listener.allReceivers.map { receiverId =>
generateInputReceiverRow(jsCollector, receiverId, minX, maxX, minY, maxY)
val content = listener.receivedEventRateWithBatchTime.map { case (streamId, eventRates) =>
generateInputDStreamRow(jsCollector, streamId, eventRates, minX, maxX, minY, maxY)
}.foldLeft[Seq[Node]](Nil)(_ ++ _)

// scalastyle:off
Expand All @@ -400,33 +401,36 @@ private[ui] class StreamingPage(parent: StreamingTab)
// scalastyle:on
}

private def generateInputReceiverRow(
private def generateInputDStreamRow(
jsCollector: JsCollector,
receiverId: Int,
streamId: Int,
eventRates: Seq[(Long, Double)],
minX: Long,
maxX: Long,
minY: Double,
maxY: Double): Seq[Node] = {
val receiverInfo = listener.receiverInfo(receiverId)
val receiverName = receiverInfo.map(_.name).getOrElse(s"Receiver-$receiverId")
// If this is a ReceiverInputDStream, we need to show the receiver info. Or we only need the
// InputDStream name.
val receiverInfo = listener.receiverInfo(streamId)
val receiverName = receiverInfo.map(_.name).
orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
val receiverActive = receiverInfo.map { info =>
if (info.active) "ACTIVE" else "INACTIVE"
}.getOrElse(emptyCell)
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
val receiverLastError = listener.receiverInfo(receiverId).map { info =>
val receiverLastError = receiverInfo.map { info =>
val msg = s"${info.lastErrorMessage} - ${info.lastError}"
if (msg.size > 100) msg.take(97) + "..." else msg
}.getOrElse(emptyCell)
val receiverLastErrorTime = receiverInfo.map {
r => if (r.lastErrorTime < 0) "-" else UIUtils.formatDate(r.lastErrorTime)
}.getOrElse(emptyCell)
val receivedRecords =
new EventRateUIData(listener.receivedEventRateWithBatchTime.get(receiverId).getOrElse(Seq()))
val receivedRecords = new EventRateUIData(eventRates)

val graphUIDataForEventRate =
new GraphUIData(
s"receiver-$receiverId-events-timeline",
s"receiver-$receiverId-events-histogram",
s"stream-$streamId-events-timeline",
s"stream-$streamId-events-histogram",
receivedRecords.data,
minX,
maxX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.numRecords should be(600)
batchUIData.get.outputOpIdSparkJobIdPairs should be
Seq(OutputOpIdAndSparkJobId(0, 0),
Expand Down Expand Up @@ -182,7 +182,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
batchUIData.get.receiverNumRecords should be (Map.empty)
batchUIData.get.streamIdToNumRecords should be (Map.empty)
batchUIData.get.numRecords should be (0)
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))

Expand Down Expand Up @@ -211,17 +211,14 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)

for (_ <- 0 until 2 * limit) {
val receivedBlockInfo = Map(
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
1 -> Array(ReceivedBlockInfo(1, 300, null))
)
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)

// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))

// onBatchStarted
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))

// onJobStart
Expand All @@ -238,7 +235,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.onJobStart(jobStart4)

// onBatchCompleted
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}

Expand Down

0 comments on commit 3be4b7a

Please sign in to comment.