# Spark Lab 2: MLLib

In this lab we will explore the [MLLib library](https://spark.apache.org/docs/1.2.1/mllib-guide.html) for machine learning in Spark. The API of this library is very similar to Scikit Learn, and it plays quite nicely with Pandas.

This lab follows quite closely [this blog post](https://www.mapr.com/blog/churn-prediction-pyspark-using-mllib-and-ml-packages), so if you're lost you can go have  look there for guidance.

Let's start with the usual:
    - vagrant up
    - vagrant ssh

You should have access to Jupyter notebook here:

    http://10.211.55.101:18888/tree
    
The problem we will solve is the prediction of [_churn rate_](https://en.wikipedia.org/wiki/Churn_rate), which is a measure of how many customers are lost over a period of time. This is a very important business metric, in particular for large companies like Telecom companies.

We will use a dataset provided by [BigML](https://bigml.com/). The data has been copied to your VM, but can also be downloaded [here](https://bml-data.s3.amazonaws.com/churn-bigml-80.csv) and [here](https://bml-data.s3.amazonaws.com/churn-bigml-20.csv).

In [None]:
# Disable warnings, set Matplotlib inline plotting and load Pandas package
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
import pandas as pd
pd.options.display.mpl_style = 'default'

Check that the SparkContext and sqlContext are available

In [None]:
sc

In [None]:
sqlContext

## Exercise 1.a: Load the data

Let's start by loading the data. Since the input is a CSV file we'll need to provide a parser.

- Use the sqlContext.read.load function to load the data
    - load the bigml-80 file to an RDD called CV_data
    - load the bigml-20 file to an RDD called final_test_data
    - cache CV_data to speed up things
    
Note that you can print the schema of the RDD if you want to


In [None]:
CV_data = sqlContext.read.load('file:///home/vagrant/data/churn/churn-bigml-80.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

final_test_data = sqlContext.read.load('file:///home/vagrant/data/churn/churn-bigml-20.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
CV_data.cache()
CV_data.printSchema()

## Exercise 1.b: Quick look at the data

- use the `take` function to take the first 5 lines of the `CV_data` RDD and display them as Pandas dataframe
- use the `describe` function to have some summary statistics about the training data 

In [None]:
pd.DataFrame(CV_data.take(5), columns=CV_data.columns).transpose()

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType

CV_data = CV_data.withColumn("Account length", CV_data["Account length"].cast(IntegerType()))
CV_data = CV_data.withColumn("Area code", CV_data["Area Code"].cast(IntegerType()))
CV_data = CV_data.withColumn("Number vmail messages", CV_data["Number vmail messages"].cast(IntegerType()))
CV_data = CV_data.withColumn("Total day minutes", CV_data["Total day minutes"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total day calls", CV_data["Total day calls"].cast(IntegerType()))
CV_data = CV_data.withColumn("Total day charge", CV_data["Total day charge"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total eve minutes", CV_data["Total eve minutes"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total eve calls", CV_data["Total eve calls"].cast(IntegerType()))
CV_data = CV_data.withColumn("Total eve charge", CV_data["Total eve charge"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total night minutes", CV_data["Total night minutes"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total night calls", CV_data["Total night calls"].cast(IntegerType()))
CV_data = CV_data.withColumn("Total night charge", CV_data["Total night charge"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total intl minutes", CV_data["Total intl minutes"].cast(DoubleType()))
CV_data = CV_data.withColumn("Total intl calls", CV_data["Total intl calls"].cast(IntegerType()))
CV_data = CV_data.withColumn("Total intl charge", CV_data["Total intl charge"].cast(DoubleType()))
CV_data = CV_data.withColumn("Customer service calls", CV_data["Customer service calls"].cast(IntegerType()))

CV_data.printSchema()

In [None]:
CV_data.describe().toPandas().transpose()

## Exercise 2: Sample inspection

Not all the features are numeric. `CV_data.dtypes` contains information on the type.

- select the features that are either `int` or `double`
- use the `sample` function to get a 10% sample of the training RDD
- Display a Pandas.scatter_matrix of the sampled data

In [None]:
numeric_features = [t[0] for t in CV_data.dtypes if t[1] == 'int' or t[1] == 'double']

sampled_data = CV_data.select(numeric_features).sample(False, 0.10).toPandas()

axs = pd.scatter_matrix(sampled_data, figsize=(12, 12));

# Rotate axis labels and remove axis ticks
n = len(sampled_data.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

## Exercise 3: Feature selection

Column selection on an RDD works differently than in Scikit Learn. For example if we want to drop 2 columns in Spark, we just apply the `.drop(column)` function 2 times.

- Drop the following columns:
    - State
    - Area Code
    - Total day charge
    - Total eve charge
    - Total night charge
    - Total intl charge
    
Also, we can apply a function to a column with the construct:

    .withColumn('column_name', function(CV_data['column_name']))
    
Use it to transform binary string labels to `1.0` or `0.0`. Treat these columns:

    - Churn
    - International plan
    - Voice mail plan

You may need these two imports:

```python
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction
```

Also, use the `.cache` function to cache your pipeline results so far.

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import UserDefinedFunction

binary_map = {'Yes':1.0, 'No':0.0, 'True':1.0, 'False':0.0}
toNum = UserDefinedFunction(lambda k: binary_map[k], DoubleType())

CV_data = CV_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(CV_data['Churn'])) \
    .withColumn('International plan', toNum(CV_data['International plan'])) \
    .withColumn('Voice mail plan', toNum(CV_data['Voice mail plan'])).cache()

final_test_data = final_test_data.drop('State').drop('Area code') \
    .drop('Total day charge').drop('Total eve charge') \
    .drop('Total night charge').drop('Total intl charge') \
    .withColumn('Churn', toNum(final_test_data['Churn'])) \
    .withColumn('International plan', toNum(final_test_data['International plan'])) \
    .withColumn('Voice mail plan', toNum(final_test_data['Voice mail plan'])).cache()

As before, take 5 lines and display them with Pandas

In [None]:
pd.DataFrame(CV_data.take(5), columns=CV_data.columns).transpose()

## Exercise 4: Train Decision Tree

Time has come to do our first model using MLLib. We will use a decision tree.

- [LabeledPoint](https://spark.apache.org/docs/0.8.1/api/mllib/org/apache/spark/mllib/regression/LabeledPoint.html) allows us to represent a data point with features and labels. Map it across the data using a function
- `.randomSplit` allows us to split the data in train/test sets. Do an 80/20 split
- Train a [DecisionTree](http://spark.apache.org/docs/latest/mllib-decision-tree.html) on the training data
- Display the trained model using `print model.toDebugString()`

You may need the following imports:

```python
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
```

In [None]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree

def labelData(data):
    # label: row[end], features: row[0:end-1]
    return data.map(lambda row: LabeledPoint(row[-1], row[:-1]))

training_data, testing_data = labelData(CV_data).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
                                     categoricalFeaturesInfo={1:2, 2:2},
                                     impurity='gini', maxBins=32)

print model.toDebugString()

In [None]:
print 'Feature 12:', CV_data.columns[12]
print 'Feature 4: ', CV_data.columns[4]

## Exercise 5: Model valuation


The MulticlassMetrics module contains a lot of metrics functions.

- Evaluate the model on the test data using `.predict`
- Calculate the following metrics:
    - Precision of True 
    - Precision of False
    - Recall of True    
    - Recall of False   
    - F-1 Score         
    - Confusion Matrix

- Finally, display how many 

```python
from pyspark.mllib.evaluation import MulticlassMetrics
```

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

def getPredictionsLabels(model, test_data):
    predictions = model.predict(test_data.map(lambda r: r.features))
    return predictions.zip(test_data.map(lambda r: r.label))

def printMetrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print 'Precision of True ', metrics.precision(1)
    print 'Precision of False', metrics.precision(0)
    print 'Recall of True    ', metrics.recall(1)
    print 'Recall of False   ', metrics.recall(0)
    print 'F-1 Score         ', metrics.fMeasure()
    print 'Confusion Matrix\n', metrics.confusionMatrix().toArray()

predictions_and_labels = getPredictionsLabels(model, testing_data)

printMetrics(predictions_and_labels)

In [None]:
CV_data.groupby('Churn').count().toPandas()

## Bonus: Cross Validation

The [original blog post mentioned above](https://www.mapr.com/blog/churn-prediction-pyspark-using-mllib-and-ml-packages) also contains code to implement cross validation. Try it and see if you understand how it's done.

In [None]:
stratified_CV_data = CV_data.sampleBy('Churn', fractions={0: 388./2278, 1: 1.0}).cache()

stratified_CV_data.groupby('Churn').count().toPandas()

In [None]:
training_data, testing_data = labelData(stratified_CV_data).randomSplit([0.8, 0.2])

model = DecisionTree.trainClassifier(training_data, numClasses=2, maxDepth=2,
                                     categoricalFeaturesInfo={1:2, 2:2},
                                     impurity='gini', maxBins=32)

predictions_and_labels = getPredictionsLabels(model, testing_data)
printMetrics(predictions_and_labels)

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def vectorizeData(data):
    return data.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).toDF(['label','features'])

vectorized_CV_data = vectorizeData(stratified_CV_data)

# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(vectorized_CV_data)

# Automatically identify categorical features and index them
featureIndexer = VectorIndexer(inputCol='features',
                               outputCol='indexedFeatures',
                               maxCategories=2).fit(vectorized_CV_data)

# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()

# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

CV_model = crossval.fit(vectorized_CV_data)

# Fetch best model
tree_model = CV_model.bestModel.stages[2]
print tree_model

In [None]:
vectorized_test_data = vectorizeData(final_test_data)

transformed_data = CV_model.transform(vectorized_test_data)
print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

predictions = transformed_data.select('indexedLabel', 'prediction', 'probability')
predictions.toPandas().head()