Skip to content

Commit

Permalink
Use labeled points and predictOnValues in examples
Browse files Browse the repository at this point in the history
  • Loading branch information
freeman-lab committed Oct 29, 2014
1 parent 77dbd3f commit ad9bdc2
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 6 deletions.
5 changes: 3 additions & 2 deletions docs/mllib-clustering.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ First we import the neccessary classes.
{% highlight scala %}

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans

{% endhighlight %}
Expand All @@ -189,7 +190,7 @@ Then we make an input stream of vectors for training, as well as one for testing
{% highlight scala %}

val trainingData = ssc.textFileStream("/training/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(Vectors.parse)
val testData = ssc.textFileStream("/testing/data/dir").map(LabeledPoint.parse)

{% endhighlight %}

Expand All @@ -211,7 +212,7 @@ Now register the streams for training and testing and start the job, printing th
{% highlight scala %}

model.trainOn(trainingData)
model.predictOn(testData).print()
model.predictOnValues(testData).print()

ssc.start()
ssc.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.examples.mllib

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
Expand All @@ -27,9 +28,13 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
* on another stream, where the data streams arrive as text files
* into two different directories.
*
* The rows of the text files must be vector data in the form
* The rows of the training text files must be vector data in the form
* `[x1,x2,x3,...,xn]`
* Where n is the number of dimensions. n must be the same for train and test.
* Where n is the number of dimensions.
*
* The rows of the test text files must be labeled data in the form
* `(y,[x1,x2,x3,...,xn])`
* Where y is some identifier. n must be the same for train and test.
*
* Usage: StreamingKmeans <trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>
*
Expand Down Expand Up @@ -57,15 +62,15 @@ object StreamingKMeans {
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt)

model.trainOn(trainingData)
model.predictOn(testData).print()
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
Expand Down

0 comments on commit ad9bdc2

Please sign in to comment.