Skip to content

Commit

Permalink
Add timeline and histogram graphs for streaming statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Apr 15, 2015
1 parent 6be9189 commit dd653a1
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 32 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
sorttable.js
d3.min.js
.*avsc
.*txt
.*json
Expand Down
30 changes: 30 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,36 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

========================================================================
For d3 (core/src/main/resources/org/apache/spark/ui/static/d3.min.js):
========================================================================

Copyright (c) 2010-2015, Michael Bostock
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

* The name Michael Bostock may not be used to endorse or promote products
derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/d3.min.js

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

.graph {
font: 10px sans-serif;
}

.axis path, .axis line {
fill: none;
stroke: #000;
shape-rendering: crispEdges;
}

.line {
fill: none;
stroke: steelblue;
stroke-width: 1.5px;
}

.bar rect {
fill: steelblue;
shape-rendering: crispEdges;
}

.bar text {
fill: #000;
}
120 changes: 120 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

function drawTimeline(id, data, unit) {
var margin = {top: 20, right: 20, bottom: 70, left: 50};
var width = 500 - margin.left - margin.right;
var height = 250 - margin.top - margin.bottom;

var x = d3.time.scale().range([0, width]);
var y = d3.scale.linear().range([height, 0]);

var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(d3.time.format("%H:%M:%S"));
var yAxis = d3.svg.axis().scale(y).orient("left");

var line = d3.svg.line()
.x(function(d) { return x(new Date(d.x)); })
.y(function(d) { return y(d.y); });

var svg = d3.select(id).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");

x.domain(d3.extent(data, function(d) { return d.x; }));
y.domain(d3.extent(data, function(d) { return d.y; }));

svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis)
.selectAll("text")
.style("text-anchor", "end")
.attr("dx", "-.8em")
.attr("dy", ".15em")
.attr("transform", "rotate(-65)");

svg.append("g")
.attr("class", "y axis")
.call(yAxis)
.append("text")
.attr("x", 25)
.attr("y", -5)
.attr("dy", ".71em")
.style("text-anchor", "end")
.text(unit);

svg.append("path")
.datum(data)
.attr("class", "line")
.attr("d", line);
}

function drawDistribution(id, values, unit) {
var margin = {top: 10, right: 30, bottom: 30, left: 50};
var width = 500 - margin.left - margin.right;
var height = 250 - margin.top - margin.bottom;

var y = d3.scale.linear()
.domain(d3.extent(values, function(d) { return d; }))
.range([height, 0]);
var data = d3.layout.histogram().bins(y.ticks(10))(values);
console.log(values)
console.log(data)
var barHeight = height / data.length

var x = d3.scale.linear()
.domain([0, d3.max(data, function(d) { return d.y; })])
.range([0, width]);

var xAxis = d3.svg.axis().scale(x).orient("bottom");
var yAxis = d3.svg.axis().scale(y).orient("left");

var svg = d3.select(id).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");

var bar = svg.selectAll(".bar")
.data(data)
.enter().append("g")
.attr("class", "bar")

bar.append("rect")
.attr("x", 1)
.attr("y", function(d) { return y(d.x) - margin.bottom + margin.top; })
.attr("width", function(d) { return x(d.y); })
.attr("height", function(d) { return (height - margin.bottom) / data.length; });

bar.append("text")
.attr("dy", ".75em")
.attr("x", function(d) { return x(d.y) + 10; })
.attr("y", function(d) { return y(d.x) + 16 - margin.bottom; })
.attr("text-anchor", "middle")
.text(function(d) { return d.y; });

svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis);

svg.append("g")
.attr("class", "y axis")
.call(yAxis);
}
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Distribution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util
import java.io.PrintStream

import scala.collection.immutable.IndexedSeq
import scala.collection.mutable.ArrayBuffer

/**
* Util for getting some stats from a small sample of numeric values, with some handy
Expand Down Expand Up @@ -67,6 +68,34 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
out.println(statCounter)
showQuantiles(out)
}

def histogram(bins: Int): Seq[(Double, Int)] = {
require(bins > 0)
val stat = statCounter
val binSize = (stat.max - stat.min) / (bins - 1)
val points = new Array[Double](bins)
for (i <- 0 until (bins - 1)) {
points(i) = stat.min + binSize * i
}
points(bins - 1) = stat.max
val counts = new Array[Int](bins)
for (i <- startIdx until endIdx) {
val v = data(i)
var j = 0
var find = false
while (!find && j < bins - 1) {
if (v < points(j) + binSize / 2) {
counts(j) += 1
find = true
}
j += 1
}
if (!find) {
counts(bins - 1) += 1
}
}
points.zip(counts)
}
}

private[spark] object Distribution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,11 @@ case class BatchInfo(
*/
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption

/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
infos.map(_.numRecords).sum
}.sum
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
private val receiverLastErrorTime = new HashMap[Int, Long]

val batchDuration = ssc.graph.batchDuration.milliseconds

Expand All @@ -51,6 +52,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
synchronized {
receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
receiverLastErrorTime(receiverError.receiverInfo.streamId) = System.currentTimeMillis()
}
}

Expand Down Expand Up @@ -149,6 +151,31 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}.toMap
}

def receivedRecordsWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
val latestBlockInfos = retainedBatches.take(batchInfoLimit).map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.receivedBlockInfo)
}
(0 until numReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map {
case (batchTime, receivedBlockInfo) =>
(batchTime, receivedBlockInfo.get(receiverId).getOrElse(Array.empty))
}
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map {
case (batchTime, blockInfo) =>
// calculate records per second for each batch
(batchTime, blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
}
(receiverId, recordsOfParticularReceiver)
}.toMap
}


def allReceivedRecordsWithBatchTime: Seq[(Long, Double)] = synchronized {
retainedBatches.take(batchInfoLimit).map { batchInfo =>
(batchInfo.batchTime.milliseconds, batchInfo.numRecords.toDouble * 1000 / batchDuration)
}
}

def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
Expand All @@ -164,6 +191,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}

def receiverLastErrorTimeo(receiverId: Int): Option[Long] = synchronized {
receiverLastErrorTime.get(receiverId)
}

def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
Expand Down
Loading

0 comments on commit dd653a1

Please sign in to comment.