Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-6263
Browse files Browse the repository at this point in the history
  • Loading branch information
Lewuathe committed May 10, 2015
2 parents a353354 + bf7e81a commit 454c73d
Show file tree
Hide file tree
Showing 20 changed files with 1,123 additions and 132 deletions.
16 changes: 16 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

========================================================================
For vis.js (core/src/main/resources/org/apache/spark/ui/static/vis.min.js):
========================================================================
Copyright (C) 2010-2015 Almende B.V.

Vis.js is dual licensed under both

* The Apache 2.0 License
http://www.apache.org/licenses/LICENSE-2.0

and

* The MIT License
http://opensource.org/licenses/MIT

Vis.js may be distributed under either license.

========================================================================
BSD-style licenses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ div#application-timeline, div#job-timeline {
margin-bottom: 30px;
}

#application-timeline div.legend-area {
#application-timeline div.legend-area,
#job-timeline div.legend-area {
margin-top: 5px;
}

Expand Down
31 changes: 12 additions & 19 deletions core/src/main/resources/org/apache/spark/ui/static/timeline-view.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,24 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {

function setupJobEventAction() {
$(".item.range.job.application-timeline-object").each(function() {
var getJobId = function(baseElem) {
var getSelectorForJobEntry = function(baseElem) {
var jobIdText = $($(baseElem).find(".application-timeline-content")[0]).text();
var jobId = jobIdText.match("\\(Job (\\d+)\\)")[1];
return jobId;
return "#job-" + jobId;
};

$(this).click(function() {
window.location.href = "job/?id=" + getJobId(this);
var jobPagePath = $(getSelectorForJobEntry(this)).find("a").attr("href")
window.location.href = jobPagePath
});

$(this).hover(
function() {
$("#job-" + getJobId(this)).addClass("corresponding-item-hover");
$(getSelectorForJobEntry(this)).addClass("corresponding-item-hover");
$($(this).find("div.application-timeline-content")[0]).tooltip("show");
},
function() {
$("#job-" + getJobId(this)).removeClass("corresponding-item-hover");
$(getSelectorForJobEntry(this)).removeClass("corresponding-item-hover");
$($(this).find("div.application-timeline-content")[0]).tooltip("hide");
}
);
Expand Down Expand Up @@ -97,32 +98,24 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {

function setupStageEventAction() {
$(".item.range.stage.job-timeline-object").each(function() {
var getStageIdAndAttempt = function(baseElem) {
var getSelectorForStageEntry = function(baseElem) {
var stageIdText = $($(baseElem).find(".job-timeline-content")[0]).text();
var stageIdAndAttempt = stageIdText.match("\\(Stage (\\d+\\.\\d+)\\)")[1].split(".");
return stageIdAndAttempt;
return "#stage-" + stageIdAndAttempt[0] + "-" + stageIdAndAttempt[1];
};

$(this).click(function() {
var idAndAttempt = getStageIdAndAttempt(this);
var id = idAndAttempt[0];
var attempt = idAndAttempt[1];
window.location.href = "../../stages/stage/?id=" + id + "&attempt=" + attempt;
var stagePagePath = $(getSelectorForStageEntry(this)).find("a").attr("href")
window.location.href = stagePagePath
});

$(this).hover(
function() {
var idAndAttempt = getStageIdAndAttempt(this);
var id = idAndAttempt[0];
var attempt = idAndAttempt[1];
$("#stage-" + id + "-" + attempt).addClass("corresponding-item-hover");
$(getSelectorForStageEntry(this)).addClass("corresponding-item-hover");
$($(this).find("div.job-timeline-content")[0]).tooltip("show");
},
function() {
var idAndAttempt = getStageIdAndAttempt(this);
var id = idAndAttempt[0];
var attempt = idAndAttempt[1];
$("#stage-" + id + "-" + attempt).removeClass("corresponding-item-hover");
$(getSelectorForStageEntry(this)).removeClass("corresponding-item-hover");
$($(this).find("div.job-timeline-content")[0]).tooltip("hide");
}
);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1161,8 +1161,8 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
def countApproxDistinct(p: Int, sp: Int): Long = withScope {
require(p >= 4, s"p ($p) must be at least 4")
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
require(p >= 4, s"p ($p) must be >= 4")
require(sp <= 32, s"sp ($sp) must be <= 32")
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
val zeroCounter = new HyperLogLogPlus(p, sp)
aggregate(zeroCounter)(
Expand All @@ -1187,8 +1187,9 @@ abstract class RDD[T: ClassTag](
* It must be greater than 0.000017.
*/
def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
countApproxDistinct(p, 0)
countApproxDistinct(if (p < 4) 4 else p, 0)
}

/**
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val simpleRdd = sc.makeRDD(uniformDistro, 10)
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2)
assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1)
assert(error(simpleRdd.countApproxDistinct(0.02), size) < 0.1)
assert(error(simpleRdd.countApproxDistinct(0.5), size) < 0.22)
}

test("SparkContext.union") {
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1915,7 +1915,7 @@ In that case, consider
[reducing](#reducing-the-processing-time-of-each-batch) the batch processing time.

The progress of a Spark Streaming program can also be monitored using the
[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface,
[StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) interface,
which allows you to get receiver status and processing times. Note that this is a developer API
and it is likely to be improved upon (i.e., more information reported) in the future.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scopt.OptionParser
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.clustering.{EMLDAOptimizer, OnlineLDAOptimizer, DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

Expand All @@ -48,6 +48,7 @@ object LDAExample {
topicConcentration: Double = -1,
vocabSize: Int = 10000,
stopwordFile: String = "",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10) extends AbstractParams[Params]

Expand Down Expand Up @@ -78,6 +79,10 @@ object LDAExample {
.text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
s" default: ${defaultParams.stopwordFile}")
.action((x, c) => c.copy(stopwordFile = x))
opt[String]("algorithm")
.text(s"inference algorithm to use. em and online are supported." +
s" default: ${defaultParams.algorithm}")
.action((x, c) => c.copy(algorithm = x))
opt[String]("checkpointDir")
.text(s"Directory for checkpointing intermediate results." +
s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
Expand Down Expand Up @@ -128,7 +133,17 @@ object LDAExample {

// Run LDA.
val lda = new LDA()
lda.setK(params.k)

val optimizer = params.algorithm.toLowerCase match {
case "em" => new EMLDAOptimizer
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
case "online" => new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
case _ => throw new IllegalArgumentException(
s"Only em, online are supported but got ${params.algorithm}.")
}

lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDocConcentration(params.docConcentration)
.setTopicConcentration(params.topicConcentration)
Expand All @@ -137,14 +152,18 @@ object LDAExample {
sc.setCheckpointDir(params.checkpointDir.get)
}
val startTime = System.nanoTime()
val ldaModel = lda.run(corpus).asInstanceOf[DistributedLDAModel]
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9

println(s"Finished training LDA model. Summary:")
println(s"\t Training time: $elapsed sec")
val avgLogLikelihood = ldaModel.logLikelihood / actualCorpusSize.toDouble
println(s"\t Training data average log likelihood: $avgLogLikelihood")
println()

if (ldaModel.isInstanceOf[DistributedLDAModel]) {
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble
println(s"\t Training data average log likelihood: $avgLogLikelihood")
println()
}

// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
Expand Down
Loading

0 comments on commit 454c73d

Please sign in to comment.