## Pavanish Kumar, PhD
Contact me __[linkedIn](https://www.linkedin.com/in/kumar-pavanish-5340a224/)__

#### This tutorial aims to demostrate some of the spark/pyspark ml APIs.

With the spark 2.x release, newer machine learning module was also released which is refred as spark.ml. The previous library spark.Mllib was built on RDD api, however spark.ml is built on dataframe api and thus its api is cleaner,faster and easier to implement. The best thing about using spark for machine learning is that the same code you wrote for small data set can be scalled to massive size dataset. No extra effort is required to build parrallel implementation, everything is abstracted in spark framework. So you can build the modal on your labtop in local spark mode (better alternative is to use Databrics community eddition cluster) using sample dataset, and deploy the same code on large spark cluster.

#### pyspark ml have api for most commenly used machine learning algorithms eg:
- Linear regression
- Logistic regression
- Collaborative filtering (ALS)
- K-means
- PCA
- Decision tree classifier
- Random forest classifier
- Gradient-boosted tree classifier
- Multilayer perceptron classifier
- Linear Support Vector Machine
- and more...

The Tutorial is organised in Following section:
  1. - Data description
  2. - Machine Learning Workflow
          
  4. - Improving prediction model: Parameter tunning
  5. - Extracting the best prediction model

#### 1.Data Description

For this demo I will use forest cover data from uci machine learning repository

 __Context__ <br>
This dataset contains tree observations from four areas of the Roosevelt National Forest in Colorado. All observations are cartographic variables from 30 meter x 30 meter sections of forest. 

__Content__ <br>
The dataset includes information on tree type, shadow coverage, distance to nearby landmarks (roads etcetera), soil type, and local topography.
original data was oneHotEncoded for wilderness_Area and soil_type

Original data had seperate column for each soil_type from Soil_Type1 to Soil_Type40 with value  0 or 1 
simillary Wilderness_Area was also had four columns Wilderness_Area1 to Wilderness_Area4 with value 0 or 1

In this format data takes more storage space and not an efficient way to store the data, so i have transformed the data back to one variable each for soil_type(with 40 levels) and wilderness area(with 4 levels). Ttransformed data has 12 columns, including last true label column i.e Cover_Type. Transformed data can be found here : https://github.com/pavanish/spark/tree/master/data


__The task here is to a build prediction model on 11 features to predict the cover type__

#### 2. Machine Learning workflow
To build a prediction modal we need:
  > 1. Prediction algorithm
  > 2. Training data (If its a classification problem)
  > 3. Build the model
  > 4. Model evaluation

So, for this dataset lets try Logistic regression and decision tree for prediction model.
To access these algorithms lets import these from pyspark

#### _Machine Learning workflow_ :
> - Prediction algorithms

We first need to choose an appropriate algorthms to build the model and import it from the pyspark ml module.
As our task is to classify the multiclass cover type, following two simple classification algorthms can be tried
> Logistic Regression <br>
> decision tree

So let`s import these algorthims first

In [8]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Lets read and explore the data first
spark 2.x has inbuilt API for reading CSV file 
which can be accessed from spark session available as spark

In [10]:
# read the csv file
# inferSchema if set to true will infer the data type by sampling the data, schema can be given in .option()
# inferSchema is less efficient as it need to read and sample the data to get the data type, for real big data in real application providing schema using .option() method will be a better choice
# as the first line of the data is header lets set header to True

forest_cover=spark.read.csv("/FileStore/tables/cover_type_uncode_csv",
                            inferSchema=True,header=True)

# spark.read.csv returns the data in dataframe

__Persisting the data in memory__ <br> 
if we going to refer data many time its prudent to cache it in the memory for faster access.
machine learning algorithms uses iterative update so caching the data will run the application faster and efficiently

In [12]:
# at this step it will not cache the data, well spark is lazy enough to not act on your command unless you really force it to enact using either of the action methods
# df.cache is transformation and because it`s lazy evalution it does nothing.

forest_cover.cache()

In [13]:
# to be sure that data is cached, chain some action to it eg: .count() method which is action method
forest_cover.cache().count()

In [14]:
# lets dive in and see data-structure using df.show(n),  n => number of rows to display. By default; n=20
forest_cover.show(5)

# there is 13 columns in total, first column _c0 is useless and it came because i transformed and saved the data using R and forgot to set row.names=FALSE. Anyway, its an opportunity to discuss one more method => df.drop('colname')


In [15]:
forest_cover.columns # get the names of the columns

In [16]:
forest_cover = forest_cover.drop('_c0') # lets drop the artifact '_c0' column using df.drop() method

In [17]:
forest_cover.columns # sure enough the _c0 gone now

In [18]:
forest_cover.count() # let see total observations count

coverType is the label which we need to predict, so let`s see how many categories are for forest_cover in the label variable cover_type. <br>
groupBy method on dataframe can be called to group on the cover_type variable and then sort and count

In [20]:
forest_cover.groupBy('coverType').count().sort('count',ascending=False).show()

In [21]:
# Its time to check datatype using df.printSchema()
# for evry column it shows colname:dataType(nullable=true) 
# eg;Elevation: integer (nullable = true)
# datatype for the soil is integer as soil type is coded as 4 digit number, we need to cahnge it to stringType
# Will also change the coverType from integer to stringType 
forest_cover.printSchema()

In [22]:
# we can change the datatype using df.withColumn() api
# df.withColum takes two parameter 1: name and 2.column and append new transformed column to the dataframe
# forest_cover=forest_cover.withColumn('label',forest_cover.coverType.cast('String')) ## here the 'label' is the name of the column and column is refered as df.colname and it is chained with another transformation .cast(). 
forest_cover=forest_cover.withColumn('label',forest_cover.coverType.cast('String'))
forest_cover=forest_cover.withColumn('soil',forest_cover.soil.cast('String'))

# as we have coverType as label lets drop this coverType from the dataframe
forest_cover = forest_cover.drop('coverType')


In [23]:
# lets check the schema again
forest_cover.printSchema()

#### _Machine Learning workflow_ :
> - Get training data

So far we imported the pyspark functions and prediction algorthms from pyspark ml api, read,explored and transformed the data <br>
Now lets go to step 2 of the Machine Learning workflow and get the training data for training the prediction model<br>
pysprk provides nice easy function to split the data randomly in desired proportion. We will split the original data into 80,20 ratio <br>
80 % for training, 20 % for test

In [25]:
splits = forest_cover.randomSplit([0.8, 0.2])  # df.randomSplit() takes list of proportion as input
train = splits[0] # splits is list with datasets, as we split it into two first 0.8 (80 %) set it to train
test = splits[1]  # 0.2 (20%) test data

# spliting is random and approximate
# lets see how many observations are ther in each train and test data set
train_rows = train.count()
test_rows = test.count()
print "Training Rows:", train_rows, " Testing Rows:", test_rows

Now the data is ready, let`s move on to assemble the components of prediction modal and implement it. <br>
pyspark provide various method required for buliding prediction modal pipeline.
Infact along with various method it also provide a pipeline API which assembls each components of the pipeline and thus ensures the data flow through the pipeline in a consistent manner.The pyspark api provides consistent api across the different algorithms which makes it really easy to test and try many diffrent ml algorithms with a small change in code. It gives the plug and play modular framework. <br>

each ml algorithms expect feature vector as input for traning the model, and __VectorAssembler__ does just that. It takes the input column in a list and returns the FeatureVector. <br>
we also need to transform the categorical feature using __stringIndexer__ method which take categorical feature as input column and return indexed feature.
we will use stringIndexer to index the soil and wilderness column. We need to indexed the label as well.


the data has two columns of categorical features; soil(40 categories) and wilderness(4 categories)

#### _Machine Learning workflow_
> - Build the model

To build the prediction model few components are needed, let`s export these from pyspark ml. <br>
Following are components needed to build the model : 
  > __pipeline__, __VectorAssembler__, __StringIndexer__, __VectorIndexer__<br>

  __pipeline__ => It String toghether the stages of transformation, in this case pipeline has following stages : stages=[labelIndexer,strIdx1,strIdx2, catVect,catIdx, numVect, featVect, lr] <br>
  __stringIndexer__ => It indexex the categorical or string type into numeric type. <br>
  __VectorIndexer__ => VectorIndexer index the categorical feature, it does oneHotEncoding whcih means it will spread the soil 40 categories in vector of 40 feature <br>
  __VectorAssembler__ => it takes the feature columns as list and return the feature vector
  
  ##### Stages in our model building to predict coverType
  > labelIndexer = this satge will index the label into numericType # <br>
  > strIdx1, and strIndx2 = it indexes the categorical feature <br>
  > catIdx = VectorIndexer index the categorical feature, and does oneHotEncoding <br>
  > numVect = numeric data was assembled as vector using vector assembler <br>
  > featVect = both the categorical and numeric feature are tied toghether into single feature vector again using VectorAssembler <br>
  > lr/dt = gets the algorhitum from pyspark, algorithm specific model parameter can be specified here <br>
  > lr_pipeline = At the end we pass the stages of transformation to pipeline method, pipeline ensures train and test data flow though the transformation in a consistent manner <br>

pipeline returns the __estimator__ and ones we fit the model with train data it returns the __transformer__ <br>

lr_pipeline/dt_pipeline is an __estimator__

In [28]:

from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml import Pipeline

In [29]:

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
strIdx1 = StringIndexer(inputCol = "wilderness", outputCol = "waIdx")
strIdx2 = StringIndexer(inputCol = "soil", outputCol = "soilIdx")
catVect = VectorAssembler(inputCols = ["waIdx","soilIdx"], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ["Elevation","Aspect","Slope",
                                       "Horizontal_Distance_To_Hydrology",
                                       "Vertical_Distance_To_Hydrology",
                                       "Horizontal_Distance_To_Roadways",
                                       "Hillshade_9am",
                                       "Hillshade_Noon",
                                       "Horizontal_Distance_To_Fire_Points"], outputCol="numFeatures")
#minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures") 
featVect = VectorAssembler(inputCols=["idxCatFeatures", "numFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features",maxIter=10,regParam=0.3)
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features",maxBins= 50)
lr_pipeline = Pipeline(stages=[labelIndexer,strIdx1,strIdx2, catVect,catIdx, numVect, featVect, lr])
dt_pipeline = Pipeline(stages=[labelIndexer,strIdx1,strIdx2, catVect,catIdx, numVect, featVect, dt])

# you can see how easy it is to use 2 machine learning method, we just change last stage of the pipeline form lr to dt

In [31]:
# lets fit decision tree

dtModel = dt_pipeline.fit(train)
print "decision tree Pipeline complete!"

In [32]:
# fit logistic regression modal
# after we use the .fit method it returns transformer so lrModel is transformer
lrModel = lr_pipeline.fit(train) 
print "logistic regression Pipeline complete!"

In [33]:
# lets predict the test data using transform method on model return after fiting the data
lr_prediction = lrModel.transform(test)
dt_prediction = dtModel.transform(test)

# transform method will add additional column of prediction to test data set

#### _Machine Learning workflow_ : 
>  - Modal evaluation 

Once the model is built and trained, we can use the model to predict the unseen data. 
To evaluate how the prediction model perform we need an appropriate metric to assess the model performance.
For our example we can use accuracy as metric for model evaluation.
transform method on model add the prediction column to the test data which we can be used to calculate the accuracy. pyspark has some built in evulator metrics which can utilised to measure model performace.<br>
As the task was to predict one of the 7 label (coverType)(1,2,3,4,5,6,7) we need MulticlassClassificationEvaluator which we already imported in above code cells (Cmd 8) its takes labelCol, predictionCol and metricName as input and returns evaluator on which .evaluate method is applied to get the metric from the dataframe having required columns set by labelCol, predictionCol and metricName

In [35]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dt_prediction)
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Decision tree Test Error = %g " % (1.0 - dt_accuracy))
print("Logistic regeression Test Error = %g " % (1.0 - lr_accuracy))
print ("Decision tree prediction accuracy = %g " % dt_accuracy) 
print ("Logistic regeression prediction accuracy = %g " % lr_accuracy)

Accuracy for logistic regression is around 59% while accuracy for decision tree is 10% higher at 69% <br>
This does not mean that logistic regression is bad algorithm probably parameter tunning is needed <br>
It only suggest that with default parameters decision tree works best for this dataset

In [37]:
# Confusion matrix can be created using crosstab function.
dt_prediction.crosstab("indexedLabel","prediction").show()

#### Lets explore the prediction results in a bit more detail

In [39]:
## lets select only required columns i.e prediction and indexedLabel
## for some explanation let also get features and label columns using df.select('colname') method
## feature has 11 values (we had 11 feature columns in data set) assembled in vector using VectorAssembler
## label is true label, indexedLabel is label transformed by stringIndexer use in labelIndexer stage : There is something important to discuss about stringIndexer hold on for a while

#predicted = lr_prediction.select("features", "prediction", "indexedLabel","label")
predicted = dt_prediction.select("features", "prediction", "indexedLabel","label")
predicted.show(10, truncate=False)
predicted.groupBy('prediction').count().show() 
# this just show few observation and it seems prediction was not good: just match prediction col to indexedLabel. in the intial 10 obsevation all 4 is predicted as 2 and 2 is predicted as 6 only last observation is predicted correctly # Note: there is randomness in the .show() method so if you run code again and again you may get different set of data. If we group the prediction column and observe it we find that label 4 is not predicted at all.


In [40]:
predicted.groupBy('label').count().show() # observe the real label distribution

In [41]:
predicted.groupBy('indexedLabel').count().show() ## this shows the fraction of data that belongs to each class of label

# Now is the good time to discuss stringIndexer: 

#### StringIndexer

**_StringIndexer_** takes the label with string type and map it with int type 
so in its conversion it maps the label according to their count order.
StringIndexer orders the label accoring to the count and map the label from 0 to total number of labels
with 0 mapped to label with max count

eg, lets say our labels are of string type Apple, Banana, orange, and Mango
 with thier respective counts as 50, 500, 300, and 400

"Apple":50,"Banana":500,"Orange":300,"Mango":400 : original labels with total counts


StringIndexer will reorder it to 
**"Banana":500,"Mango":400,"Orange":300,"Apple":50 => ("Banana","Mango","Orange","Apple")**

and mapped to ("Banana":0,"Mango":1,"Orange":2,"Apple":3) 
finally will have indexedLabel as => **(0,1,2,3)**



#####in the forest cover data labels are cover_type labelled as 1,2,3,4,5,6,7
you can look at the frequency of these labels (Cmd 40) by prediction.groupBy("label").count().show()

order of the label "2","1","3","7","6","5","4"

StringIndexer will map it to "2":0,"1":1,"3":2,"7":3,"6":4,"5":5,"4":6  so real label 2 is mapped to 0, 1 to 1 and 3 to 2 and so on...

#### 3. Improving prediction model: Parameter tunning
 The initial analysis suggested that decision tree might be a better prediction algorithum for this data. 
 Let`s further fine tune the modal and evalute whether prediction accuracy improves.
 pyspark provide built in functionality for selecting the best mmodal and makes parameter tunning an easy task.

In [44]:
# lets import api for parameter tunning
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [45]:
# Let`s tune the parameter for decision tree and find the best modal
# ParamGridBuilder() api in spark makes it easier to tune the combination of parameter by making grid of parameter to train on
# different parameter can be added to ParamGrid using model.addGrid option 
# as the model will be trained for various combination of parameter we need to pass Evaluator to assess the accuracy of prediction by modal
# we will also use TrainValidationSplit method to do cross validation of the model 

paramGrid = ParamGridBuilder().addGrid(
    dt.impurity, ["gini", "entropy"]).addGrid(
    dt.maxDepth, [1, 20]).addGrid(
    dt.maxBins, [40, 300]).addGrid(
    dt.minInfoGain, [0.0, 0.05] ).build()

MulticlassEval = MulticlassClassificationEvaluator(
    labelCol="indexedLabel",predictionCol="prediction",metricName="accuracy")

tvs = TrainValidationSplit(estimator=dt_pipeline, evaluator=MulticlassEval, 
                           estimatorParamMaps=paramGrid, trainRatio=0.9) # pass it the ml pipeline, evualator and paramGrid and finaly trainRatio 0.9 means it will use 90% of data for training and 10% test for prediction accuracy
                                              

model = tvs.fit(train) # this will take a while to run as it`s training 16 model (total 4 parameter with 2 values each). On databrics community cluster it took around 9 minutes: so run it and take a break

# model here will have the final best model which can be used to predict the new data

In [46]:
prediction = model.transform(test)

In [47]:
# Lets see the accuracy after parameter tunning
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(prediction)
print("Test Error = %g " % (1.0 - accuracy))
print ("Decision tree prediction accuracy = %g " % accuracy) 

In [48]:
## ooooo.... such a dramatic improvement in prediction accuracy
## it jumps to 92% from 70%

In [49]:
#You can get the confusion matrix using crosstab function
prediction.crosstab("indexedLabel","prediction").show()

# prediction is along the row and true lable is along col
# so with improved modal, class 5 (1916 observation) is correctly predicted as 5 in 1388 observation
# with the initial naive modal, only 13 observation was correctly predicted as 5 

#### 4. Extracting the best prediction model
Finally we want to get the parameter of the best model selected by TrainValidationSplit class <br>
TrainValidationSplit class resturns the evaluator and when fit method is used on it with training data it returns transformer <br>
final transformer here is __model__ created by using fit method on tvs object (Cmd 45)<br>

model.bestModel gives the bestModel evaluated on parameter grid, .stages on best model returns the list of stages. The classification model is the last element in the stage which can be assesed in python using [-1] subsetting

In [51]:
model.bestModel.stages

In [53]:
bestModel = model.bestModel.stages[-1]
bestModel

for pyspark to work, Java TrainValidationSplitModel was needed as python object which is done by @classmethod def _from_java(cls, java_stage):<br>
and the another @classmethod def _to_java(self): return a java object which can be used to access the parameters<br>

The detail code is here __[spark ml github](https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py)__

In [55]:
#Lets get the parameter of the best trained model
print (bestModel._java_obj.getImpurity())
print(bestModel._java_obj.getMaxDepth())
print(bestModel._java_obj.getMinInfoGain())
print(bestModel._java_obj.getMaxBins())

In [57]:
# this gives all the parameter grid that got evaluated. We trained on combination of 2 parameter each for impurity('gini','entropy'), maxDepth(0,20),MaxBins(40,300),minInfoGain(0,0.5)
# 4 parameter 2 each give total of 16 combination which means 16 models were evaluated to choose the best modal 
model.extractParamMap()

# model.extractPramMap give all the model and parameter grid trained in sequencial manner


In [58]:
# model.validationMetrics gives the evaluation metric in a list
# as 16 models were trained it return list of 16 values
# to know which model worked best np.argmax method can be used which shows model 15 is the best model np.argmax gives 14 as python is 0 index based its actually 15th model 

import numpy as np

print(np.argmax(model.validationMetrics))
model.validationMetrics

For explanation about decision tree, logistic regression and its parameter you can look in wikipedia or read this excellent book __The Elements of 
Statistical Learning__ //web.stanford.edu/~hastie/ElemStatLearn/. The book is made freely available by its genrous authors __Trevor Hastie, Robert Tibshirani,Jerome FriedmanIts__. 

If you are data scientist and new to spark then pickup this book : __Learning Spark__ - O'Reilly Media. Althogh its bit outdated and cover spark 1.x, its a good for fundamental. <br>
For Machine Learning in spark you can get this O'Reilly book : __Advanced Analytics with Spark, 2nd Edition__. However the book uses scala language. The tutorial is somewhat based on one of the example from this book.

#### __Tell me and I forget. Teach me and I remember. Involve me and I learn__

> _benjamin franklin_