<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Start-Spark" data-toc-modified-id="Start-Spark-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Start Spark</a></span></li><li><span><a href="#Set-Up-Data" data-toc-modified-id="Set-Up-Data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Set Up Data</a></span></li><li><span><a href="#Spark-DF" data-toc-modified-id="Spark-DF-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Spark DF</a></span></li><li><span><a href="#Spark-ML" data-toc-modified-id="Spark-ML-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Spark ML</a></span></li><li><span><a href="#Train-Test-Split" data-toc-modified-id="Train-Test-Split-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Train Test Split</a></span></li><li><span><a href="#Model" data-toc-modified-id="Model-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Model</a></span></li><li><span><a href="#Evaluate" data-toc-modified-id="Evaluate-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Evaluate</a></span></li><li><span><a href="#AUC" data-toc-modified-id="AUC-8"><span class="toc-item-num">8&nbsp;&nbsp;</span>AUC</a></span></li></ul></div>

# PySpark ML Random Forest

Example of using Pyspark ML Random Forest Classifier on Iris dataset. 

Reference: 

- https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
- https://creativedata.atlassian.net/wiki/spaces/SAP/pages/83237142/Pyspark+-+Tutorial+based+on+Titanic+Dataset

## Start Spark

In [1]:
import findspark
findspark.find() 

from pyspark import SparkContext
sc = SparkContext()

from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

import sys
print('python version:', sys.version[:31])

print('Spark Version:', sc.version)

python version: 3.6.5 |Anaconda custom (64-bit)
Spark Version: 2.3.0


## Set Up Data

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import datasets

In [77]:
# load iris 
iris = datasets.load_iris()

# convert to pandas df 
df = pd.DataFrame(iris.data, columns=iris.feature_names)

labels = pd.Categorical.from_codes(iris.target, iris.target_names)

labels_tmp = pd.DataFrame(pd.Series(labels),columns=['y'])

# add labels  
df = pd.concat([df,labels_tmp],axis=1)

print(df.shape)
df.head()

(150, 5)


Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),y
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


## Spark DF

In [69]:
# convert pandas dataframe into spark dataframe 
# sdf = spark data frame 
sdf = sqlcontext.createDataFrame(df)
sdf.show(5)

+-----------------+----------------+-----------------+----------------+------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|     y|
+-----------------+----------------+-----------------+----------------+------+
|              5.1|             3.5|              1.4|             0.2|setosa|
|              4.9|             3.0|              1.4|             0.2|setosa|
|              4.7|             3.2|              1.3|             0.2|setosa|
|              4.6|             3.1|              1.5|             0.2|setosa|
|              5.0|             3.6|              1.4|             0.2|setosa|
+-----------------+----------------+-----------------+----------------+------+
only showing top 5 rows



## Spark ML 

In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString

In [38]:
# convert categorical y label into an index 
sdf = StringIndexer(inputCol="y", outputCol="y_indexed").fit(sdf).transform(sdf)#.show(5)
sdf.show(5)

+-----------------+----------------+-----------------+----------------+------+---------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|     y|y_indexed|
+-----------------+----------------+-----------------+----------------+------+---------+
|              5.1|             3.5|              1.4|             0.2|setosa|      2.0|
|              4.9|             3.0|              1.4|             0.2|setosa|      2.0|
|              4.7|             3.2|              1.3|             0.2|setosa|      2.0|
|              4.6|             3.1|              1.5|             0.2|setosa|      2.0|
|              5.0|             3.6|              1.4|             0.2|setosa|      2.0|
+-----------------+----------------+-----------------+----------------+------+---------+
only showing top 5 rows



In [43]:
sdf = VectorAssembler(inputCols=["sepal length (cm)","sepal width (cm)","petal length (cm)", "petal width (cm)"],outputCol="features").transform(sdf)#.show(5)
sdf.show(5)

+-----------------+----------------+-----------------+----------------+------+---------+-----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|     y|y_indexed|         features|
+-----------------+----------------+-----------------+----------------+------+---------+-----------------+
|              5.1|             3.5|              1.4|             0.2|setosa|      2.0|[5.1,3.5,1.4,0.2]|
|              4.9|             3.0|              1.4|             0.2|setosa|      2.0|[4.9,3.0,1.4,0.2]|
|              4.7|             3.2|              1.3|             0.2|setosa|      2.0|[4.7,3.2,1.3,0.2]|
|              4.6|             3.1|              1.5|             0.2|setosa|      2.0|[4.6,3.1,1.5,0.2]|
|              5.0|             3.6|              1.4|             0.2|setosa|      2.0|[5.0,3.6,1.4,0.2]|
+-----------------+----------------+-----------------+----------------+------+---------+-----------------+
only showing top 5 rows



## Train Test Split

In [47]:
# Split the data into training and test sets (30% held out for testing)
(train_set, test_set) = sdf.randomSplit([0.7, 0.3])

In [49]:
print(train_set.count(),test_set.count())

102 48


## Model

In [68]:
# init model 
rf = RandomForestClassifier(labelCol="y_indexed", featuresCol="features",numTrees=20) 
rf 

RandomForestClassifier_4917bb46d6beb51c561f

In [50]:
# fir the training set 
model = rf.fit(train_set)

In [81]:
# predict the test set 
predictions = model.transform(test_set)

In [99]:
predictions.select("prediction","probability", "y_indexed", "features").show(5)

+----------+-------------+---------+-----------------+
|prediction|  probability|y_indexed|         features|
+----------+-------------+---------+-----------------+
|       2.0|[0.0,0.0,1.0]|      2.0|[4.4,2.9,1.4,0.2]|
|       2.0|[0.0,0.0,1.0]|      2.0|[4.9,3.1,1.5,0.1]|
|       2.0|[0.0,0.0,1.0]|      2.0|[5.0,3.6,1.4,0.2]|
|       2.0|[0.0,0.0,1.0]|      2.0|[5.1,3.5,1.4,0.2]|
|       2.0|[0.1,0.0,0.9]|      2.0|[5.7,4.4,1.5,0.4]|
+----------+-------------+---------+-----------------+
only showing top 5 rows



## Evaluate

In [85]:
evaluator = MulticlassClassificationEvaluator(labelCol="y_indexed", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g" % accuracy)

Test Error = 0.0416667
Accuracy = 0.958333


# Feature Importance 

Result below shows that key 2 has the largest feature importance, which corresponds to petal length. 

In [87]:
# "sepal length (cm)","sepal width (cm)","petal length (cm)", "petal width (cm)"
model.featureImportances 

SparseVector(4, {0: 0.1353, 1: 0.0123, 2: 0.4669, 3: 0.3856})

## AUC

To be implemented in the future

In [98]:
predictions.select('probability').show()

+---------------+
|    probability|
+---------------+
|  [0.0,0.0,1.0]|
|  [0.0,0.0,1.0]|
|  [0.0,0.0,1.0]|
|  [0.0,0.0,1.0]|
|  [0.1,0.0,0.9]|
|[0.15,0.05,0.8]|
|  [0.0,0.0,1.0]|
|  [0.0,0.0,1.0]|
|  [0.0,0.0,1.0]|
|[0.05,0.0,0.95]|
|[0.15,0.0,0.85]|
|  [0.0,0.0,1.0]|
|  [1.0,0.0,0.0]|
|[0.05,0.0,0.95]|
|  [0.9,0.1,0.0]|
|  [1.0,0.0,0.0]|
|  [0.0,1.0,0.0]|
|[0.95,0.05,0.0]|
|  [1.0,0.0,0.0]|
|[0.75,0.25,0.0]|
+---------------+
only showing top 20 rows

