diff --git a/data/mllib/streaming_kmeans_data_test.txt b/data/mllib/streaming_kmeans_data_test.txt new file mode 100644 index 0000000000000..649a0d6cf4e22 --- /dev/null +++ b/data/mllib/streaming_kmeans_data_test.txt @@ -0,0 +1,2 @@ +(1.0), [1.7, 0.4, 0.9] +(2.0), [2.2, 1.8, 0.0] diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 8e724fbf068b0..44720147be054 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -49,27 +49,7 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph. Refer to the [`KMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeans) and [`KMeansModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.KMeansModel) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} -import org.apache.spark.mllib.linalg.Vectors - -// Load and parse the data -val data = sc.textFile("data/mllib/kmeans_data.txt") -val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() - -// Cluster the data into two classes using KMeans -val numClusters = 2 -val numIterations = 20 -val clusters = KMeans.train(parsedData, numClusters, numIterations) - -// Evaluate clustering by computing Within Set Sum of Squared Errors -val WSSSE = clusters.computeCost(parsedData) -println("Within Set Sum of Squared Errors = " + WSSSE) - -// Save and load model -clusters.save(sc, "myModelPath") -val sameModel = KMeansModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/KMeansExample.scala %}
@@ -81,51 +61,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the [`KMeans` Java docs](api/java/org/apache/spark/mllib/clustering/KMeans.html) and [`KMeansModel` Java docs](api/java/org/apache/spark/mllib/clustering/KMeansModel.html) for details on the API. -{% highlight java %} -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.clustering.KMeans; -import org.apache.spark.mllib.clustering.KMeansModel; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.SparkConf; - -public class KMeansExample { - public static void main(String[] args) { - SparkConf conf = new SparkConf().setAppName("K-means Example"); - JavaSparkContext sc = new JavaSparkContext(conf); - - // Load and parse data - String path = "data/mllib/kmeans_data.txt"; - JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call(String s) { - String[] sarray = s.split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) - values[i] = Double.parseDouble(sarray[i]); - return Vectors.dense(values); - } - } - ); - parsedData.cache(); - - // Cluster the data into two classes using KMeans - int numClusters = 2; - int numIterations = 20; - KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); - - // Evaluate clustering by computing Within Set Sum of Squared Errors - double WSSSE = clusters.computeCost(parsedData.rdd()); - System.out.println("Within Set Sum of Squared Errors = " + WSSSE); - - // Save and load model - clusters.save(sc.sc(), "myModelPath"); - KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath"); - } -} -{% endhighlight %} +{% include_example java/org/apache/spark/examples/mllib/JavaKMeansExample.java %}
@@ -138,27 +74,7 @@ fact the optimal *k* is usually one where there is an "elbow" in the WSSSE graph Refer to the [`KMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeans) and [`KMeansModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel) for more details on the API. -{% highlight python %} -from pyspark.mllib.clustering import KMeans, KMeansModel -from numpy import array -from math import sqrt - -# Load and parse the data -data = sc.textFile("data/mllib/kmeans_data.txt") -parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) - -# Build the model (cluster the data) -clusters = KMeans.train(parsedData, 2, maxIterations=10, - runs=10, initializationMode="random") - -# Evaluate clustering by computing Within Set Sum of Squared Errors -WSSSE = clusters.computeCost(parsedData) -print("Within Set Sum of Squared Error = " + str(WSSSE)) - -# Save and load model -clusters.save(sc, "myModelPath") -sameModel = KMeansModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example python/mllib/k_means_example.py %}
@@ -188,29 +104,7 @@ to the algorithm. We then output the parameters of the mixture model. Refer to the [`GaussianMixture` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.GaussianMixtureModel) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.clustering.GaussianMixture -import org.apache.spark.mllib.clustering.GaussianMixtureModel -import org.apache.spark.mllib.linalg.Vectors - -// Load and parse the data -val data = sc.textFile("data/mllib/gmm_data.txt") -val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache() - -// Cluster the data into two classes using GaussianMixture -val gmm = new GaussianMixture().setK(2).run(parsedData) - -// Save and load model -gmm.save(sc, "myGMMModel") -val sameModel = GaussianMixtureModel.load(sc, "myGMMModel") - -// output parameters of max-likelihood model -for (i <- 0 until gmm.k) { - println("weight=%f\nmu=%s\nsigma=\n%s\n" format - (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma)) -} - -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala %}
@@ -222,50 +116,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the [`GaussianMixture` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixture.html) and [`GaussianMixtureModel` Java docs](api/java/org/apache/spark/mllib/clustering/GaussianMixtureModel.html) for details on the API. -{% highlight java %} -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.clustering.GaussianMixture; -import org.apache.spark.mllib.clustering.GaussianMixtureModel; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.SparkConf; - -public class GaussianMixtureExample { - public static void main(String[] args) { - SparkConf conf = new SparkConf().setAppName("GaussianMixture Example"); - JavaSparkContext sc = new JavaSparkContext(conf); - - // Load and parse data - String path = "data/mllib/gmm_data.txt"; - JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call(String s) { - String[] sarray = s.trim().split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) - values[i] = Double.parseDouble(sarray[i]); - return Vectors.dense(values); - } - } - ); - parsedData.cache(); - - // Cluster the data into two classes using GaussianMixture - GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd()); - - // Save and load GaussianMixtureModel - gmm.save(sc.sc(), "myGMMModel"); - GaussianMixtureModel sameModel = GaussianMixtureModel.load(sc.sc(), "myGMMModel"); - // Output the parameters of the mixture model - for(int j=0; j
@@ -276,23 +127,7 @@ to the algorithm. We then output the parameters of the mixture model. Refer to the [`GaussianMixture` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixture) and [`GaussianMixtureModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.GaussianMixtureModel) for more details on the API. -{% highlight python %} -from pyspark.mllib.clustering import GaussianMixture -from numpy import array - -# Load and parse the data -data = sc.textFile("data/mllib/gmm_data.txt") -parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')])) - -# Build the model (cluster the data) -gmm = GaussianMixture.train(parsedData, 2) - -# output parameters of model -for i in range(2): - print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu, - "sigma = ", gmm.gaussians[i].sigma.toArray()) - -{% endhighlight %} +{% include_example python/mllib/gaussian_mixture_example.py %}
@@ -334,31 +169,7 @@ which contains the computed clustering assignments. Refer to the [`PowerIterationClustering` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.PowerIterationClusteringModel) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.clustering.{PowerIterationClustering, PowerIterationClusteringModel} -import org.apache.spark.mllib.linalg.Vectors - -// Load and parse the data -val data = sc.textFile("data/mllib/pic_data.txt") -val similarities = data.map { line => - val parts = line.split(' ') - (parts(0).toLong, parts(1).toLong, parts(2).toDouble) -} - -// Cluster the data into two classes using PowerIterationClustering -val pic = new PowerIterationClustering() - .setK(2) - .setMaxIterations(10) -val model = pic.run(similarities) - -model.assignments.foreach { a => - println(s"${a.id} -> ${a.cluster}") -} - -// Save and load model -model.save(sc, "myModelPath") -val sameModel = PowerIterationClusteringModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala %} A full example that produces the experiment described in the PIC paper can be found under [`examples/`](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala). @@ -377,40 +188,7 @@ which contains the computed clustering assignments. Refer to the [`PowerIterationClustering` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClustering.html) and [`PowerIterationClusteringModel` Java docs](api/java/org/apache/spark/mllib/clustering/PowerIterationClusteringModel.html) for details on the API. -{% highlight java %} -import scala.Tuple2; -import scala.Tuple3; - -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.clustering.PowerIterationClustering; -import org.apache.spark.mllib.clustering.PowerIterationClusteringModel; - -// Load and parse the data -JavaRDD data = sc.textFile("data/mllib/pic_data.txt"); -JavaRDD> similarities = data.map( - new Function>() { - public Tuple3 call(String line) { - String[] parts = line.split(" "); - return new Tuple3<>(new Long(parts[0]), new Long(parts[1]), new Double(parts[2])); - } - } -); - -// Cluster the data into two classes using PowerIterationClustering -PowerIterationClustering pic = new PowerIterationClustering() - .setK(2) - .setMaxIterations(10); -PowerIterationClusteringModel model = pic.run(similarities); - -for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) { - System.out.println(a.id() + " -> " + a.cluster()); -} - -// Save and load model -model.save(sc.sc(), "myModelPath"); -PowerIterationClusteringModel sameModel = PowerIterationClusteringModel.load(sc.sc(), "myModelPath"); -{% endhighlight %} +{% include_example java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java %}
@@ -425,23 +203,7 @@ which contains the computed clustering assignments. Refer to the [`PowerIterationClustering` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClustering) and [`PowerIterationClusteringModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.PowerIterationClusteringModel) for more details on the API. -{% highlight python %} -from __future__ import print_function -from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel - -# Load and parse the data -data = sc.textFile("data/mllib/pic_data.txt") -similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')])) - -# Cluster the data into two classes using PowerIterationClustering -model = PowerIterationClustering.train(similarities, 2, 10) - -model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster))) - -# Save and load model -model.save(sc, "myModelPath") -sameModel = PowerIterationClusteringModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example python/mllib/power_iteration_clustering_example.py %}
@@ -587,129 +349,19 @@ to the algorithm. We then output the topics, represented as probability distribu
Refer to the [`LDA` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.LDA) and [`DistributedLDAModel` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.DistributedLDAModel) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel} -import org.apache.spark.mllib.linalg.Vectors - -// Load and parse the data -val data = sc.textFile("data/mllib/sample_lda_data.txt") -val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))) -// Index documents with unique IDs -val corpus = parsedData.zipWithIndex.map(_.swap).cache() - -// Cluster the documents into three topics using LDA -val ldaModel = new LDA().setK(3).run(corpus) - -// Output topics. Each is a distribution over words (matching word count vectors) -println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):") -val topics = ldaModel.topicsMatrix -for (topic <- Range(0, 3)) { - print("Topic " + topic + ":") - for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); } - println() -} - -// Save and load model. -ldaModel.save(sc, "myLDAModel") -val sameModel = DistributedLDAModel.load(sc, "myLDAModel") - -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala %}
Refer to the [`LDA` Java docs](api/java/org/apache/spark/mllib/clustering/LDA.html) and [`DistributedLDAModel` Java docs](api/java/org/apache/spark/mllib/clustering/DistributedLDAModel.html) for details on the API. -{% highlight java %} -import scala.Tuple2; - -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.clustering.DistributedLDAModel; -import org.apache.spark.mllib.clustering.LDA; -import org.apache.spark.mllib.linalg.Matrix; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.mllib.linalg.Vectors; -import org.apache.spark.SparkConf; - -public class JavaLDAExample { - public static void main(String[] args) { - SparkConf conf = new SparkConf().setAppName("LDA Example"); - JavaSparkContext sc = new JavaSparkContext(conf); - - // Load and parse the data - String path = "data/mllib/sample_lda_data.txt"; - JavaRDD data = sc.textFile(path); - JavaRDD parsedData = data.map( - new Function() { - public Vector call(String s) { - String[] sarray = s.trim().split(" "); - double[] values = new double[sarray.length]; - for (int i = 0; i < sarray.length; i++) - values[i] = Double.parseDouble(sarray[i]); - return Vectors.dense(values); - } - } - ); - // Index documents with unique IDs - JavaPairRDD corpus = JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( - new Function, Tuple2>() { - public Tuple2 call(Tuple2 doc_id) { - return doc_id.swap(); - } - } - )); - corpus.cache(); - - // Cluster the documents into three topics using LDA - DistributedLDAModel ldaModel = new LDA().setK(3).run(corpus); - - // Output topics. Each is a distribution over words (matching word count vectors) - System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize() - + " words):"); - Matrix topics = ldaModel.topicsMatrix(); - for (int topic = 0; topic < 3; topic++) { - System.out.print("Topic " + topic + ":"); - for (int word = 0; word < ldaModel.vocabSize(); word++) { - System.out.print(" " + topics.apply(word, topic)); - } - System.out.println(); - } - - ldaModel.save(sc.sc(), "myLDAModel"); - DistributedLDAModel sameModel = DistributedLDAModel.load(sc.sc(), "myLDAModel"); - } -} -{% endhighlight %} +{% include_example java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java %}
Refer to the [`LDA` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA) and [`LDAModel` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDAModel) for more details on the API. -{% highlight python %} -from pyspark.mllib.clustering import LDA, LDAModel -from pyspark.mllib.linalg import Vectors - -# Load and parse the data -data = sc.textFile("data/mllib/sample_lda_data.txt") -parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) -# Index documents with unique IDs -corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache() - -# Cluster the documents into three topics using LDA -ldaModel = LDA.train(corpus, k=3) - -# Output topics. Each is a distribution over words (matching word count vectors) -print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):") -topics = ldaModel.topicsMatrix() -for topic in range(3): - print("Topic " + str(topic) + ":") - for word in range(0, ldaModel.vocabSize()): - print(" " + str(topics[word][topic])) - -# Save and load model -model.save(sc, "myModelPath") -sameModel = LDAModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example python/mllib/latent_dirichlet_allocation_example.py %}
@@ -785,96 +437,16 @@ This example shows how to estimate clusters on streaming data.
Refer to the [`StreamingKMeans` Scala docs](api/scala/index.html#org.apache.spark.mllib.clustering.StreamingKMeans) for details on the API. +And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext. -First we import the necessary classes. - -{% highlight scala %} - -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.clustering.StreamingKMeans - -{% endhighlight %} - -Then we make an input stream of vectors for training, as well as a stream of labeled data -points for testing. We assume a StreamingContext `ssc` has been created, see -[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. - -{% highlight scala %} - -val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) -val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse) - -{% endhighlight %} - -We create a model with random clusters and specify the number of clusters to find - -{% highlight scala %} - -val numDimensions = 3 -val numClusters = 2 -val model = new StreamingKMeans() - .setK(numClusters) - .setDecayFactor(1.0) - .setRandomCenters(numDimensions, 0.0) - -{% endhighlight %} - -Now register the streams for training and testing and start the job, printing -the predicted cluster assignments on new data points as they arrive. - -{% highlight scala %} - -model.trainOn(trainingData) -model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() - -ssc.start() -ssc.awaitTermination() - -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala %}
Refer to the [`StreamingKMeans` Python docs](api/python/pyspark.mllib.html#pyspark.mllib.clustering.StreamingKMeans) for more details on the API. +And Refer to [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for details on StreamingContext. -First we import the necessary classes. - -{% highlight python %} -from pyspark.mllib.linalg import Vectors -from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib.clustering import StreamingKMeans -{% endhighlight %} - -Then we make an input stream of vectors for training, as well as a stream of labeled data -points for testing. We assume a StreamingContext `ssc` has been created, see -[Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) for more info. - -{% highlight python %} -def parse(lp): - label = float(lp[lp.find('(') + 1: lp.find(',')]) - vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) - return LabeledPoint(label, vec) - -trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse) -testData = ssc.textFileStream("/testing/data/dir").map(parse) -{% endhighlight %} - -We create a model with random clusters and specify the number of clusters to find - -{% highlight python %} -model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) -{% endhighlight %} - -Now register the streams for training and testing and start the job, printing -the predicted cluster assignments on new data points as they arrive. - -{% highlight python %} -model.trainOn(trainingData) -print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features)))) - -ssc.start() -ssc.awaitTermination() -{% endhighlight %} +{% include_example python/mllib/streaming_k_means_example.py %}
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java new file mode 100644 index 0000000000000..4d1c64aa3cfcc --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +// $example on$ +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.clustering.GaussianMixture; +import org.apache.spark.mllib.clustering.GaussianMixtureModel; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +// $example off$ + +public class JavaGaussianMixtureExample { + public static void main(String[] args) { + + SparkConf conf = new SparkConf().setAppName("JavaGaussianMixtureExample"); + JavaSparkContext jsc = new JavaSparkContext(conf); + + // $example on$ + // Load and parse data + String path = "data/mllib/gmm_data.txt"; + JavaRDD data = jsc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public Vector call(String s) { + String[] sarray = s.trim().split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) + values[i] = Double.parseDouble(sarray[i]); + return Vectors.dense(values); + } + } + ); + parsedData.cache(); + + // Cluster the data into two classes using GaussianMixture + GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd()); + + // Save and load GaussianMixtureModel + gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel"); + GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(), + "target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel"); + + // Output the parameters of the mixture model + for (int j = 0; j < gmm.k(); j++) { + System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n", + gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma()); + } + // $example off$ + + jsc.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java new file mode 100644 index 0000000000000..a24606a2e9384 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +// $example on$ +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.clustering.KMeans; +import org.apache.spark.mllib.clustering.KMeansModel; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +// $example off$ + +public class JavaKMeansExample { + public static void main(String[] args) { + + SparkConf conf = new SparkConf().setAppName("JavaKMeansExample"); + JavaSparkContext jsc = new JavaSparkContext(conf); + + // $example on$ + // Load and parse data + String path = "data/mllib/kmeans_data.txt"; + JavaRDD data = jsc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public Vector call(String s) { + String[] sarray = s.split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) + values[i] = Double.parseDouble(sarray[i]); + return Vectors.dense(values); + } + } + ); + parsedData.cache(); + + // Cluster the data into two classes using KMeans + int numClusters = 2; + int numIterations = 20; + KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations); + + // Evaluate clustering by computing Within Set Sum of Squared Errors + double WSSSE = clusters.computeCost(parsedData.rdd()); + System.out.println("Within Set Sum of Squared Errors = " + WSSSE); + + // Save and load model + clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel"); + KMeansModel sameModel = KMeansModel.load(jsc.sc(), + "target/org/apache/spark/JavaKMeansExample/KMeansModel"); + // $example off$ + + jsc.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java new file mode 100644 index 0000000000000..4d8b65c54432a --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +// $example on$ +import scala.Tuple2; + +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.mllib.clustering.DistributedLDAModel; +import org.apache.spark.mllib.clustering.LDA; +import org.apache.spark.mllib.clustering.LDAModel; +import org.apache.spark.mllib.linalg.Matrix; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.linalg.Vectors; +// $example off$ + +public class JavaLatentDirichletAllocationExample { + public static void main(String[] args) { + + SparkConf conf = new SparkConf().setAppName("JavaKLatentDirichletAllocationExample"); + JavaSparkContext jsc = new JavaSparkContext(conf); + + // $example on$ + // Load and parse the data + String path = "data/mllib/sample_lda_data.txt"; + JavaRDD data = jsc.textFile(path); + JavaRDD parsedData = data.map( + new Function() { + public Vector call(String s) { + String[] sarray = s.trim().split(" "); + double[] values = new double[sarray.length]; + for (int i = 0; i < sarray.length; i++) + values[i] = Double.parseDouble(sarray[i]); + return Vectors.dense(values); + } + } + ); + // Index documents with unique IDs + JavaPairRDD corpus = + JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map( + new Function, Tuple2>() { + public Tuple2 call(Tuple2 doc_id) { + return doc_id.swap(); + } + } + ) + ); + corpus.cache(); + + // Cluster the documents into three topics using LDA + LDAModel ldaModel = new LDA().setK(3).run(corpus); + + // Output topics. Each is a distribution over words (matching word count vectors) + System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize() + + " words):"); + Matrix topics = ldaModel.topicsMatrix(); + for (int topic = 0; topic < 3; topic++) { + System.out.print("Topic " + topic + ":"); + for (int word = 0; word < ldaModel.vocabSize(); word++) { + System.out.print(" " + topics.apply(word, topic)); + } + System.out.println(); + } + + ldaModel.save(jsc.sc(), + "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel"); + DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(), + "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel"); + // $example off$ + + jsc.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java index 6c6f9768f015e..b62fa90c3420f 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java @@ -24,8 +24,10 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +// $example on$ import org.apache.spark.mllib.clustering.PowerIterationClustering; import org.apache.spark.mllib.clustering.PowerIterationClusteringModel; +// $example off$ /** * Java example for graph clustering using power iteration clustering (PIC). @@ -36,6 +38,7 @@ public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext(sparkConf); @SuppressWarnings("unchecked") + // $example on$ JavaRDD> similarities = sc.parallelize(Lists.newArrayList( new Tuple3(0L, 1L, 0.9), new Tuple3(1L, 2L, 0.9), @@ -51,6 +54,7 @@ public static void main(String[] args) { for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) { System.out.println(a.id() + " -> " + a.cluster()); } + // $example off$ sc.stop(); } diff --git a/examples/src/main/python/mllib/gaussian_mixture_example.py b/examples/src/main/python/mllib/gaussian_mixture_example.py new file mode 100644 index 0000000000000..a60e799d62eb1 --- /dev/null +++ b/examples/src/main/python/mllib/gaussian_mixture_example.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +# $example on$ +from numpy import array +# $example off$ + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="GaussianMixtureExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/gmm_data.txt") + parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')])) + + # Build the model (cluster the data) + gmm = GaussianMixture.train(parsedData, 2) + + # Save and load model + gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") + sameModel = GaussianMixtureModel\ + .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") + + # output parameters of model + for i in range(2): + print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu, + "sigma = ", gmm.gaussians[i].sigma.toArray()) + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/k_means_example.py b/examples/src/main/python/mllib/k_means_example.py new file mode 100644 index 0000000000000..5c397e62ef10e --- /dev/null +++ b/examples/src/main/python/mllib/k_means_example.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +# $example on$ +from numpy import array +from math import sqrt +# $example off$ + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import KMeans, KMeansModel +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="KMeansExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/kmeans_data.txt") + parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) + + # Build the model (cluster the data) + clusters = KMeans.train(parsedData, 2, maxIterations=10, + runs=10, initializationMode="random") + + # Evaluate clustering by computing Within Set Sum of Squared Errors + def error(point): + center = clusters.centers[clusters.predict(point)] + return sqrt(sum([x**2 for x in (point - center)])) + + WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) + print("Within Set Sum of Squared Error = " + str(WSSSE)) + + # Save and load model + clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel") + sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel") + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py new file mode 100644 index 0000000000000..2a1bef5f207b7 --- /dev/null +++ b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import LDA, LDAModel +from pyspark.mllib.linalg import Vectors +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="LatentDirichletAllocationExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/sample_lda_data.txt") + parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) + # Index documents with unique IDs + corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache() + + # Cluster the documents into three topics using LDA + ldaModel = LDA.train(corpus, k=3) + + # Output topics. Each is a distribution over words (matching word count vectors) + print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + + " words):") + topics = ldaModel.topicsMatrix() + for topic in range(3): + print("Topic " + str(topic) + ":") + for word in range(0, ldaModel.vocabSize()): + print(" " + str(topics[word][topic])) + + # Save and load model + ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel") + sameModel = LDAModel\ + .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel") + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/power_iteration_clustering_example.py b/examples/src/main/python/mllib/power_iteration_clustering_example.py new file mode 100644 index 0000000000000..ca19c0ccb60c8 --- /dev/null +++ b/examples/src/main/python/mllib/power_iteration_clustering_example.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="PowerIterationClusteringExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/pic_data.txt") + similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')])) + + # Cluster the data into two classes using PowerIterationClustering + model = PowerIterationClustering.train(similarities, 2, 10) + + model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster))) + + # Save and load model + model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel") + sameModel = PowerIterationClusteringModel\ + .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel") + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/streaming_k_means_example.py b/examples/src/main/python/mllib/streaming_k_means_example.py new file mode 100644 index 0000000000000..e82509ad3ffb6 --- /dev/null +++ b/examples/src/main/python/mllib/streaming_k_means_example.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +# $example on$ +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.clustering import StreamingKMeans +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="StreamingKMeansExample") # SparkContext + ssc = StreamingContext(sc, 1) + + # $example on$ + # we make an input stream of vectors for training, + # as well as a stream of vectors for testing + def parse(lp): + label = float(lp[lp.find('(') + 1: lp.find(')')]) + vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) + + return LabeledPoint(label, vec) + + trainingData = sc.textFile("data/mllib/kmeans_data.txt")\ + .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) + + testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse) + + trainingQueue = [trainingData] + testingQueue = [testingData] + + trainingStream = ssc.queueStream(trainingQueue) + testingStream = ssc.queueStream(testingQueue) + + # We create a model with random clusters and specify the number of clusters to find + model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) + + # Now register the streams for training and testing and start the job, + # printing the predicted cluster assignments on new data points as they arrive. + model.trainOn(trainingStream) + + result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features))) + result.pprint() + + ssc.start() + ssc.stop(stopSparkContext=True, stopGraceFully=True) + # $example off$ + + print("Final centers: " + str(model.latestModel().centers)) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala new file mode 100644 index 0000000000000..b1b3a79d87ae1 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +// $example on$ +import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel} +import org.apache.spark.mllib.linalg.Vectors +// $example off$ + +object GaussianMixtureExample { + + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName("GaussianMixtureExample") + val sc = new SparkContext(conf) + + // $example on$ + // Load and parse the data + val data = sc.textFile("data/mllib/gmm_data.txt") + val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache() + + // Cluster the data into two classes using GaussianMixture + val gmm = new GaussianMixture().setK(2).run(parsedData) + + // Save and load model + gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel") + val sameModel = GaussianMixtureModel.load(sc, + "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel") + + // output parameters of max-likelihood model + for (i <- 0 until gmm.k) { + println("weight=%f\nmu=%s\nsigma=\n%s\n" format + (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma)) + } + // $example off$ + + sc.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala new file mode 100644 index 0000000000000..c4d71d862f375 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +// $example on$ +import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} +import org.apache.spark.mllib.linalg.Vectors +// $example off$ + +object KMeansExample { + + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName("KMeansExample") + val sc = new SparkContext(conf) + + // $example on$ + // Load and parse the data + val data = sc.textFile("data/mllib/kmeans_data.txt") + val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() + + // Cluster the data into two classes using KMeans + val numClusters = 2 + val numIterations = 20 + val clusters = KMeans.train(parsedData, numClusters, numIterations) + + // Evaluate clustering by computing Within Set Sum of Squared Errors + val WSSSE = clusters.computeCost(parsedData) + println("Within Set Sum of Squared Errors = " + WSSSE) + + // Save and load model + clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel") + val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel") + // $example off$ + + sc.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala new file mode 100644 index 0000000000000..f2c8ec01439f1 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +// $example on$ +import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} +import org.apache.spark.mllib.linalg.Vectors +// $example off$ + +object LatentDirichletAllocationExample { + + def main(args: Array[String]) { + + val conf = new SparkConf().setAppName("LatentDirichletAllocationExample") + val sc = new SparkContext(conf) + + // $example on$ + // Load and parse the data + val data = sc.textFile("data/mllib/sample_lda_data.txt") + val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))) + // Index documents with unique IDs + val corpus = parsedData.zipWithIndex.map(_.swap).cache() + + // Cluster the documents into three topics using LDA + val ldaModel = new LDA().setK(3).run(corpus) + + // Output topics. Each is a distribution over words (matching word count vectors) + println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):") + val topics = ldaModel.topicsMatrix + for (topic <- Range(0, 3)) { + print("Topic " + topic + ":") + for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); } + println() + } + + // Save and load model. + ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel") + val sameModel = DistributedLDAModel.load(sc, + "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel") + // $example off$ + + sc.stop() + } +} +// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index bb9c1cbca99cd..a81c9b383ddec 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -22,7 +22,9 @@ import org.apache.log4j.{Level, Logger} import scopt.OptionParser import org.apache.spark.{SparkConf, SparkContext} +// $example on$ import org.apache.spark.mllib.clustering.PowerIterationClustering +// $example off$ import org.apache.spark.rdd.RDD /** @@ -90,6 +92,7 @@ object PowerIterationClusteringExample { Logger.getRootLogger.setLevel(Level.WARN) + // $example on$ val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints) val model = new PowerIterationClustering() .setK(params.k) @@ -101,12 +104,13 @@ object PowerIterationClusteringExample { val assignments = clusters.toList.sortBy { case (k, v) => v.length } val assignmentsStr = assignments .map { case (k, v) => - s"$k -> ${v.sorted.mkString("[", ",", "]")}" - }.mkString(", ") + s"$k -> ${v.sorted.mkString("[", ",", "]")}" + }.mkString(", ") val sizesStr = assignments.map { _._2.length }.sorted.mkString("(", ",", ")") println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr") + // $example off$ sc.stop() } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala index af03724a8ac62..7888af79f87f4 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala @@ -19,10 +19,12 @@ package org.apache.spark.examples.mllib import org.apache.spark.SparkConf +// $example on$ import org.apache.spark.mllib.clustering.StreamingKMeans import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.streaming.{Seconds, StreamingContext} +// $example off$ /** * Estimate clusters on one stream of data and make predictions @@ -58,7 +60,8 @@ object StreamingKMeansExample { System.exit(1) } - val conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample") + // $example on$ + val conf = new SparkConf().setAppName("StreamingKMeansExample") val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse) @@ -74,6 +77,7 @@ object StreamingKMeansExample { ssc.start() ssc.awaitTermination() + // $example off$ } } // scalastyle:on println