In [None]:
%ShowTypes on

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// For implicit transformation of RDDs to DataFrames
import sqlContext.implicits._

// For telling Spark to look in the local file system
import java.io._
def localpath(path: String): String = {
    "file://" + new java.io.File(".").getCanonicalPath + "/" + path
}

// For timing expression evaluation
def time[R](block: => R): R = {
    val start: Long = System.nanoTime()
    val result = block
    val end: Long = System.nanoTime()
    val duration: Double = (end - start) / 1000000000.0
    println("Elapsed time: " + duration + "s")
    result
}

println("Using Spark version " + sc.version)

# Spark MLlib
<!-- requirement: small_data/gutenberg -->
*Official documentation [here](https://spark.apache.org/docs/latest/mllib-guide.html).*

## Algorithms

Spark supports a number of machine-learning algorithms.

- Classification and Regression
    - SVM, linear regression
    - SVR, logistic regression
    - Naive Bayes
    - Decision Trees
    - Random Forests and Gradient-Boosted Trees
- Clustering
    - K-means (and streaming K-means)
    - Gaussian Mixture Models
    - Latent Dirichlet Allocation
- Dimensionality Reduction
    - SVD and PCA
- It also has support for lower-level optimization primitives:
    - Stochastic Gradient Descent
    - Low-memory BFGS and L-BFGS

### Parallelized SGD

For linear models like SVM, Linear Regression, and Logistic Regression, the cost function we're trying to optimize is essentially an average over the individual error term from each data point. This is particularly great for parallelization.  For example, in linear regression, recall that the gradient is

$$\begin{align}
\frac{\partial \log(L(\beta))}{\partial \beta} &= \frac{\partial}{\partial \beta} \frac{1}{2}\sum_j \|y_j - X_{j \cdot} \cdot \beta\| \\
&= \frac{1}{2}\sum_j \frac{\partial}{\partial \beta} \|y_j - X_{j \cdot} \cdot \beta\| \\
& = \sum_j y_j - X_{j \cdot} \cdot \beta \\
& \approx \sum_{sample \mbox{ } j} y_j - X_{j \cdot} \cdot \beta
\end{align}$$

The key *mathematical properties* we have used are:

1. the error functions are the sum of error contributions of different training instances
1. linearity of the derivative
1. associativity of addition
1. downsampling giving an unbiased estimator

Since the last sum is over the different training instances and these are stored on different nodes, we can parallelize the computation of the gradient in SGD across multiple nodes.  Of course, we still need to maintain the running weight $\beta$ that has to be present on every node (through a broadcast variable that is updated).  Notice that SVM, Linear Regression, and Logistic Regression all have error functions that are just sums over training instances so SGD can be used for all these algorithms.

Spark's [implementation](http://spark.apache.org/docs/latest/mllib-optimization.html#stochastic-gradient-descent-sgd) uses a tunable minibatch size parameter to sample a percentage of the features RDD. For each iteration, the updated weights are broadcast to the executors, and the update is calculated for each data point and sent back to be aggregated.

Parallelization handles increasing number of sampled data points m quite well since there are no interaction terms and each calculation is independent. Controlling how the algorithm iterates to convergence is also important, and can be done with parameters for the total iterations and step size.

## ML vs. MLlib packages

Confusingly, there are two machine learning APIs in Spark, the `mllib` package based on RDDs and the `ml` package based on DataFrames. For years these have been developed somewhat in parallel, resulting in duplication and asymmetry in functionality.

With Spark 2.0+, `mllib` is in maintenance mode and will likely be deprecated in future in favor of the DataFrame-based API which more closely resembles libraries like Scikit-learn. Below is one example of the RDD-based API; the rest of the notebook will focus on DataFrames.

In [None]:
import org.apache.spark.mllib.regression.LinearRegressionWithSGD 
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// parameters
val trainingIterations = 10
val trainingFraction = .6

// generate data
val data = sc.parallelize(1 to 10000).map( _ =>
    LabeledPoint(
        math.random,
        Vectors.dense(math.random, math.random, math.random)
    )
)

// split the training and test sets
val (training, test) = {
    val splits = data.randomSplit(
        Array(trainingFraction, 1.0 - trainingFraction),
        seed = 42L
    )
    (splits(0).cache(), splits(1))
}

// train model
val lrmodel = LinearRegressionWithSGD.train(training, trainingIterations)

// get r2 score
val r2 = {
    val predictionAndLabels = test.map{
        case LabeledPoint(label, features) => 
            (lrmodel.predict(features), label)
    }
    new RegressionMetrics(predictionAndLabels).r2
}

In [None]:
print(r2)

If you're interested in methods for introspecting some of these objects, the inline `<tab>` autocomplete can help. You can also use `getClass` and its sub-methods for learning more about things.

In [None]:
test.getClass.getPackage

In [None]:
test.getClass.getMethods

In [None]:
lrmodel.getClass.getName

## Spark ML
Spark ML implements the ideas of transformers, estimators, and pipelines by standardizing APIs across machine learning algorithms. This can streamline more complex workflows.

The core functionality includes:
* DataFrames - built off Spark SQL, can be created either directly or from RDDs as seen above
* Transformers - algorithms that accept a DataFrame as input and return a DataFrame as output
* Estimators - algorithms that accept a DataFrame as input and return a Transformer as output
* Pipelines - chaining together Transformers and Estimators
* Parameters - common API for specifying hyperparameters

For example, a learning algorithm can be implemented as an Estimator which trains on a DataFrame of features and returns a Transformer which can output predictions based on a test DataFrame.

Full documentation can be found [here](http://spark.apache.org/docs/latest/ml-guide.html)

In [None]:
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.sql.Row

val reviews = Seq(
    ("Prose is well-written, but style is an impediment to learning. Should be called 'Reviewing Spark,' not 'Learning Spark'", 0.0),
    ("Nice Headstart to Spark", 1.0),
    ("Start here: Excellent reference for Spark", 1.0),
    ("Insightful and so Spark-tastic!", 1.0),
    ("Good intro but wordy and lacking details in areas", 0.0),
    ("Best of the Books Currently Available", 1.0),
    ("A good resource for people interested in learning Spark", 1.0),
    ("Great Overview", 1.0)
)

val test_reviews = Seq(
    ("A decent guided tour of Spark and its major components.", 0.0),
    ("10/10 would buy again", 1.0),
    ("it is simple to follow. well organized. straight ...", 1.0),
    ("Just what you need to get started in Apache Spark.", 1.0),
    ("Very good book for learning Spark", 1.0)
)

val training = sqlContext.createDataFrame(reviews).toDF("title", "label").cache()
val test = sqlContext.createDataFrame(test_reviews).toDF("title", "label")

val tokenizer = (new Tokenizer()
    .setInputCol("title")
    .setOutputCol("words"))
val hashingTF = (new HashingTF()
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features"))
val logreg = (new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.01))

val tokens = tokenizer.transform(training)
val hashes = hashingTF.transform(tokens)
val model = logreg.fit(hashes)

// Make predictions on test documents
val test_tokens = tokenizer.transform(test)
val test_hashes = hashingTF.transform(test_tokens)

{ model.transform(test_hashes)
    .select("title", "label", "prediction")
    .collect()
    .foreach { case Row(text: String, label: Double, prediction: Double) =>
               println(s"$text -> label = $label, prediction = $prediction")
             }
}

In [None]:
// Note that if you use a Pipeline it won't have a coefficients attribute.
model.coefficients

**Exercise**: Rewrite the above using a Pipeline.

## Cross-validation and grid search

In [None]:
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.{Pipeline, PipelineModel}

In [None]:
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, logreg))

In [None]:
/** Saving and loading examples
pipeline.write.overwrite().save(localpath("models/unfit_pipeline"))
model.write.overwrite().save(localpath("models/fitted_pipeline"))
val sameModel = PipelineModel.load(localpath("models/fitted_pipeline"))
*/
println("Model loaded from disk")

In [None]:
val paramGrid = (new ParamGridBuilder()
    .addGrid(hashingTF.numFeatures, Array(10, 100, 1000))
    .addGrid(logreg.regParam, Array(0.1, 0.01, 0.001))
    .build())

In [None]:
val cv = (new CrossValidator()
    .setEstimator(pipeline)
    .setEvaluator(new BinaryClassificationEvaluator)
    .setEstimatorParamMaps(paramGrid)
    .setNumFolds(3))

*Note*: A more traditional validation set without folding is available in `TrainValidationSplit`.

In [None]:
val cvModel = cv.fit(training)

In [None]:
{ cvModel.transform(test)
    .select("title", "label", "prediction")
    .collect()
    .foreach { case Row(text: String, label: Double, prediction: Double) =>
               println(s"$text -> label = $label, prediction = $prediction")
             }
}

In [None]:
cvModel.bestModel.asInstanceOf[PipelineModel].stages

In [None]:
{ cvModel
    .bestModel
    .asInstanceOf[PipelineModel]
    .stages(2)
    .asInstanceOf[LogisticRegressionModel]
    .coefficients
}

In [None]:
training.unpersist()

### Example algorithm: Word2Vec

In [None]:
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}

// val text = sc.parallelize(reviews + test_reviews).map((line, score) => (line.split(" "), score)).toDF("text", "score")
val gutenberg = sc.textFile(localpath("small_data/gutenberg/")).map(line => (line.split(" "), 1)).toDF("text", "score")
val w2v = new Word2Vec().setInputCol("text").setOutputCol("vectors")
val model = w2v.fit(gutenberg)
val result = model.transform(gutenberg)

In [None]:
val vectorDF = model.getVectors
model.findSynonyms("woman", 10).rdd.take(10).foreach(println)

In [None]:
vectorDF.show(5)

In [None]:
vectorDF.printSchema

If we try to convert this DataFrame to RDD, we'll end up with an RDD of Row objects. It'll then be very difficult to cast the objects back to Vectors.

This is where DataSets come in handy.

In [None]:
try{
    val vectorDS = vectorDF.as[(String, Vector)]
} catch {
    case e => e.printStackTrace
} finally {
    println("Huh?")
}

In [None]:
// This is just a taste of how fun it is to have two parallel, slightly different projects
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.linalg.Vectors._

In [None]:
// Can also define a case class and use that here
val vectorDS = vectorDF.as[(String, org.apache.spark.ml.linalg.Vector)]

In [None]:
// Now we don't get an RDD of Rows
val vectorRDD = vectorDS.rdd

In [None]:
val king_vec = vectorRDD.lookup("king")(0)
val queen_vec = vectorRDD.lookup("queen")(0)
val man_vec = vectorRDD.lookup("man")(0)
val woman_vec = vectorRDD.lookup("woman")(0)

In [None]:
println(king_vec)

Note: you cannot easily add these vectors! These are really meant to be used just for machine-learning algorithms. Convert these to [Breeze](https://github.com/scalanlp/breeze) vectors for your basic linear algebra manipulations. Like numpy, Breeze uses low-level libraries like BLAS and LAPACK under the hood. It is not as full ledged as numpy and type-safety can be a little annoying though.

In [None]:
import breeze.linalg._

val king = new DenseVector(king_vec.toArray)
val queen = new DenseVector(queen_vec.toArray)
val woman = new DenseVector(woman_vec.toArray)
val man = new DenseVector(man_vec.toArray)

In [None]:
println(squaredDistance(queen, king) + "\n" +
        squaredDistance(queen, woman) + "\n" +
        squaredDistance(queen, man) + "\n" +
        squaredDistance(queen, (king + man - woman)) + "\n" +
        squaredDistance(queen, (king - man + woman)))

## Feature preprocessing

In [None]:
vectorDS.show(5)

In [None]:
// Note that calling `select` will return Rows again. Be careful!
val sampleVector: org.apache.spark.ml.linalg.Vector = vectorDS.take(1)(0)._2
println(sampleVector.size)

In [None]:
import org.apache.spark.ml.feature.VectorSlicer

val firstSlicer = (new VectorSlicer()
    .setInputCol("vector")
    .setOutputCol("first")
    .setIndices(Array(0)))

val lastSlicer = (new VectorSlicer()
    .setInputCol("vector")
    .setOutputCol("last")
    .setIndices(Array(sampleVector.size - 1)))

val medSlicer = (new VectorSlicer()
    .setInputCol("vector")
    .setOutputCol("med")
    .setIndices((45 until 55).toArray))

In [None]:
val output = medSlicer.transform(
    lastSlicer.transform(
        firstSlicer.transform(vectorDS)
    )
)
output.columns

In [None]:
import org.apache.spark.ml.feature.VectorAssembler

val assembler = (new VectorAssembler()
    .setInputCols(Array("first", "last", "med"))
    .setOutputCol("features"))

val newOutput = assembler.transform(output)
newOutput.columns

In [None]:
import org.apache.spark.ml.feature.Binarizer

val binarizer = (new Binarizer()
    .setThreshold(0.015)
    .setInputCol("features")
    .setOutputCol("binFeatures"))

val binOutput = binarizer.transform(newOutput)
binOutput.select("features", "binFeatures").take(1)(0)

### Exercise: Use SVM to predict colon cancer from gene expressions
You can start getting a feel for the MLlib operations by following the [Spark docs example](https://spark.apache.org/docs/1.3.0/mllib-linear-methods.html#linear-support-vector-machines-svms) on this dataset.

#### About the data format: LibSVM
MLlib conveniently provides a data loading method, `MLUtils.loadLibSVMFile()`, for the LibSVM format for which many other languages (R, Matlab, etc.) also have loading methods.  
A dataset of *n* features will have one row per datum, with the label and values of each feature organized as follows:
>{label} 1:{value} 2:{value} ... n:{value}

Take these two datapoints with six features and labels of -1 and 1 respectively as an example:
>-1.000000  1:2.080750 2:1.099070 3:0.927763 4:1.029080 5:-0.130763 6:1.265460  
1.000000  1:1.109460 2:0.786453 3:0.445560 4:-0.146323 5:-0.996316 6:0.555759 

#### About the colon-cancer dataset
This dataset was introduced in the 1999 paper [Broad patterns of gene expression revealed by clustering analysis of tumor and normal colon tissues probed by oligonucleotide arrays.](http://www.pnas.org/content/96/12/6745.short)  

Here's the abstract of the paper:  
> *Oligonucleotide arrays can provide a broad picture of the state of the cell, by monitoring the expression level of thousands of genes at the same time. It is of interest to develop techniques for extracting useful information from the resulting data sets. Here we report the application of a two-way clustering method for analyzing a data set consisting of the expression patterns of different cell types. Gene expression in 40 tumor and 22 normal colon tissue samples was analyzed with an Affymetrix oligonucleotide array complementary to more than 6,500 human genes. An efficient two-way clustering algorithm was applied to both the genes and the tissues, revealing broad coherent patterns that suggest a high degree of organization underlying gene expression in these tissues. Coregulated families of genes clustered together, as demonstrated for the ribosomal proteins. Clustering also separated cancerous from noncancerous tissue and cell lines from in vivo tissues on the basis of subtle distributed patterns of genes even when expression of individual genes varied only slightly between the tissues. Two-way clustering thus may be of use both in classifying genes into functional groups and in classifying tissues based on gene expression.*

There are 2000 features, 62 data points (40 tumor (label=0), 22 normal (label=1)), and 2 classes (labels) for the colon cancer dataset. 

#### Exit Tickets
1. When would you use `org.apache.spark.mllib.linalg.Vector` versus `breeze.linalg.DenseVector`?
1. Why can SVM, Linear Regression, and Logistic Regression be parallelized?  How would you parallelize KMeans?


*Copyright &copy; 2015 The Data Incubator.  All rights reserved.*