<a href="https://colab.research.google.com/github/veranika-izrailyan/Machine_Learning/blob/main/spark_ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pyspark - Machine Learning Pipelines

#### Description
In this Hands-on lab, you will master your knowledge on PySpark, a very popular Python library for big data analysis and modeling. Here, you will learn how to create a machine learning pipeline using the PySpark library, and to perform metric evaluation and model tuning.

Your machine learning skills will be challenged, and by the end of this lab, you should have a deep understanding of how PySpark practically works to build data analysis pipelines.



#### Learning Objectives
Upon completion of this lab you will be able to:
 - fit a Logistic Regression model in PySpark;
 - perform cross-validation in PySPark;
 - evaluate the model performances;
 - perform inference on new, unseen data.


#### Intended Audience
This lab is intended for:
 - Those interested in performing data analysis with Python.
 - Anyone involved in data science and engineering pipelines.

#### Prerequisites
You should possess:
 - An intermediate understanding of Python.
 - Basic knowledge of SQL.
 - Basic knowledge of the following libraries: Pandas.



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# 1. Dataset Creation
We assume you have already completed the `PySpark - Prerocessing` lab. As a consequence, we are not going to dive into the data preprocessing phase here in this hands-on lab: in case you wish to master your skills on data transformation using pyspark, please check the aforementioned Laboratory.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
file_path = "data/tips.csv"
tips = spark.read.csv(file_path, header=True)
tips.createOrReplaceTempView("tips")
tips_table = spark.table("tips")
tips_table = tips_table.withColumn("perc_tips", (tips.tip/tips.total_bill)*100)

In [None]:
tips_table = tips_table.withColumn("total_bill",tips_table.total_bill.cast("double"))
tips_table = tips_table.withColumn("tip", tips_table.tip.cast("double"))
tips_table = tips_table.withColumn("size", tips_table.size.cast("integer"))

In [None]:
tips_table.show()

+----------+----+------+------+---+------+----+------------------+
|total_bill| tip|   sex|smoker|day|  time|size|         perc_tips|
+----------+----+------+------+---+------+----+------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|5.9446733372572105|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|16.054158607350097|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|16.658733936220845|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 13.97804054054054|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|14.680764538430255|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4| 18.62396204033215|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| 22.80501710376283|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|11.607142857142858|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|13.031914893617023|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|21.853856562922868|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2| 16.65043816942551|
|     35.26| 5.0|Female|    No|Sun|Dinner|   4|14.180374361883

## Build a Machine Learning Pipeline with PySpark

Assume that our restaurant's clients can be divided in two categories, based on thei generosity. In particular, we can state that one is generous if he/she gives a tip of at least 15 USD, otherwise we call him/her a stingy person. We can easily build a labelled dataset using this information by performing some data transformation with PySpark.

To do so, we use the `withColumn` method to create two columns:
 1. a binary one called `is_stingy`, which is based on the boolean condition ` tips.perc_tips < 15`;
 2. another one called `label` which actually converts the boolean condition into a binary label - 0 if generous, else 1. 

In [None]:
tips_table = tips_table.withColumn("is_stingy", tips_table.perc_tips < 15)
tips_table = tips_table.withColumn("label", tips_table.is_stingy.cast("integer"))

In [None]:
tips_table.show()

+----------+----+------+------+---+------+----+------------------+---------+-----+
|total_bill| tip|   sex|smoker|day|  time|size|         perc_tips|is_stingy|label|
+----------+----+------+------+---+------+----+------------------+---------+-----+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|5.9446733372572105|     true|    1|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|16.054158607350097|    false|    0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|16.658733936220845|    false|    0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 13.97804054054054|     true|    1|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|14.680764538430255|     true|    1|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4| 18.62396204033215|    false|    0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| 22.80501710376283|    false|    0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|11.607142857142858|     true|    1|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|13.031914893617023|     true|    1|
|   

Let's check the distribution of the label column with respect to the gender:

In [None]:
by_label_sex = tips_table.groupBy("label", "sex")
by_label_sex.count().show()

+-----+------+-----+
|label|   sex|count|
+-----+------+-----+
|    1|Female|   34|
|    0|  Male|   83|
|    1|  Male|   74|
|    0|Female|   53|
+-----+------+-----+



### Encode categorical variables using PySpark

When fitting a Machine Learning model we need to perform a few steps of data manipulation and transformation, such as converting categorical features into numerical ones. To do so, we can employ the PySpark `StringIndexer` function. The `StringIndexer` allows to encode a categorical feature into a numerical one. It starts by assigning the lowest rank - i.e. zero - to the most frequent value of that column. 

In [None]:
from pyspark.ml.feature import StringIndexer

We here apply the `StringIndexer` to the `sex` column as follows: we pass the raw column `sex`, and we specify the output column as `sex_index`. We then apply a fit and transform method on the `tips_table` to convert the `sex` column into a categorical one.

In [None]:
sex_indexer = StringIndexer(inputCol="sex", outputCol="sex_index")

In [None]:
df1 = sex_indexer.fit(tips_table).transform(tips_table)
df1.show()

+----------+----+------+------+---+------+----+------------------+---------+-----+---------+
|total_bill| tip|   sex|smoker|day|  time|size|         perc_tips|is_stingy|label|sex_index|
+----------+----+------+------+---+------+----+------------------+---------+-----+---------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|5.9446733372572105|     true|    1|      1.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|16.054158607350097|    false|    0|      0.0|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|16.658733936220845|    false|    0|      0.0|
|     23.68|3.31|  Male|    No|Sun|Dinner|   2| 13.97804054054054|     true|    1|      0.0|
|     24.59|3.61|Female|    No|Sun|Dinner|   4|14.680764538430255|     true|    1|      1.0|
|     25.29|4.71|  Male|    No|Sun|Dinner|   4| 18.62396204033215|    false|    0|      0.0|
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| 22.80501710376283|    false|    0|      0.0|
|     26.88|3.12|  Male|    No|Sun|Dinner|   4|11.607142857142858|    

Please note that the value `"Male"` gets index 0 because it is the most frequent, and we can easily check this:

In [None]:
tips_table.groupBy("sex").count().show()

+------+-----+
|   sex|count|
+------+-----+
|Female|   87|
|  Male|  157|
+------+-----+



We now apply to `sex`, `smoker`, `day` and `time` both the `StringIndexer` and the `OneHotEncoder`: while the former has been discussed just a few lines above, the latter deserves a few more words. In a nutshell, this transformer class creates, in a new Spark DaatFrame, a dummy variable for each single value observed in the column.

For example with 4 categories, an input value of `2.0` would map to an output vector of `[0.0, 0.0, 1.0]`. The last category is not included by default (configurable via the argument `dropLast`), because it makes the vector entries sum up to one, and hence linearly dependent.

In [None]:
from pyspark.ml.feature import OneHotEncoder

In [None]:
# sex
sex_indexer = StringIndexer(inputCol="sex", outputCol="sex_index")
sex_encoder = OneHotEncoder(inputCol="sex_index", outputCol="sex_fact")

# smoker
smoker_indexer = StringIndexer(inputCol="smoker", outputCol="smoker_index")
smoker_encoder = OneHotEncoder(inputCol="smoker_index", outputCol="smoker_fact")

# day
day_indexer = StringIndexer(inputCol="day", outputCol="day_index")
day_encoder = OneHotEncoder(inputCol="day_index", outputCol="day_fact")

# time
time_indexer = StringIndexer(inputCol="time", outputCol="time_index")
time_encoder = OneHotEncoder(inputCol="time_index", outputCol="time_fact")

### Assembling Columns using VectorAssembler

Now, our goal is to combine the numerical features `["total_bill", "tip", "time_fact", "day_fact", "smoker_fact", "sex_fact",  "size", "perc_tips"]` into a single vector column, that we call for simplicity `features`. To do so, we use the VectorAssembler, which is a trnasformer that combines a givn list of columns into a single vector column. VectorAssembler will have two parameters:

*   `inputCols`: list of features to combine into a single vector column;
*   `outputCol`: the new column that will contain the transformed vector.


Let’s create our assembler:

In [None]:
from pyspark.ml.feature import VectorAssembler
vec_assembler = VectorAssembler(
    inputCols=["total_bill", "tip", "time_fact", "day_fact", "smoker_fact", "sex_fact",  "size", "perc_tips"], 
    outputCol="features"
    )

Now using this assembler we can transform the raw dataset and take a look as the result. However, we have an issue: in the original dataset, we do not have access to, for instance, the columns `time_index` and `time_fact`, since they come from the StringIndexer and OneHotEncoder, respectively. So we need to somehow combine them using the `Pipeline` object.

### Create the Data Pipeline in PySpark

In [None]:
from pyspark.ml import Pipeline
tips_pipe = Pipeline(
    stages=[
            sex_indexer, sex_encoder, 
            smoker_indexer, smoker_encoder, 
            day_indexer, day_encoder, 
            time_indexer, time_encoder, 
            vec_assembler
            ]
            )

In [None]:
piped_data = tips_pipe.fit(tips_table).transform(tips_table)

Now using this pipeline we can transform the original dataset and take a look as the result:

In [None]:
piped_data.show()

+----------+----+------+------+---+------+----+------------------+---------+-----+---------+-------------+------------+-------------+---------+-------------+----------+-------------+--------------------+
|total_bill| tip|   sex|smoker|day|  time|size|         perc_tips|is_stingy|label|sex_index|     sex_fact|smoker_index|  smoker_fact|day_index|     day_fact|time_index|    time_fact|            features|
+----------+----+------+------+---+------+----+------------------+---------+-----+---------+-------------+------------+-------------+---------+-------------+----------+-------------+--------------------+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|5.9446733372572105|     true|    1|      1.0|    (1,[],[])|         0.0|(1,[0],[1.0])|      1.0|(3,[1],[1.0])|       0.0|(1,[0],[1.0])|[16.99,1.01,1.0,0...|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|16.054158607350097|    false|    0|      0.0|(1,[0],[1.0])|         0.0|(1,[0],[1.0])|      1.0|(3,[1],[1.0])|       0.0|(1,[0],[1.0])|[1

We see that the last column `features` is the combination of several columns obtained by applying the StringIndexer, OneHotEncoder and VectorAssembler.

### Classify customers based on their "generosity"

Let us split the data into the standard rule 80-20: 80% of the raw data is used to train a classifier, and the rest to test the model on unseed data.

In [None]:
training, test = piped_data.randomSplit([.8, .2])

We use a Logistic Regression to classify our clients in those two categories.
In case you want to master your knowledge on Logistic Regression, please check our content library: we have a few courses where this concept is explained. Among many, you can refer to `Building Machine Learning Pipelines with scikit-learn - Part 2`.

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='features', labelCol='label')

We can fit easily the modelby passing the training data, and then we can check the model performances by looking at the raw data, as follows:

In [None]:
lr_fit = lr.fit(training)
lr_fit.transform(training).select('features', 'rawPrediction', 'probability', 'prediction', 'label').show()

+--------------------+--------------------+--------------------+----------+-----+
|            features|       rawPrediction|         probability|prediction|label|
+--------------------+--------------------+--------------------+----------+-----+
|(10,[0,1,2,8,9],[...|[1227.69183676080...|           [1.0,0.0]|       0.0|    0|
|[7.25,1.0,1.0,1.0...|[-550.98960464462...|[5.10804973058571...|       1.0|    1|
|[7.25,5.15,1.0,0....|[24383.0224430444...|           [1.0,0.0]|       0.0|    0|
|[7.51,2.0,0.0,0.0...|[4959.57156545339...|           [1.0,0.0]|       0.0|    0|
|[7.56,1.44,0.0,0....|[1667.34713489663...|           [1.0,0.0]|       0.0|    0|
|[7.74,1.44,1.0,1....|[1776.49706574858...|           [1.0,0.0]|       0.0|    0|
|[8.35,1.5,0.0,0.0...|[1283.45009513032...|           [1.0,0.0]|       0.0|    0|
|[8.51,1.25,0.0,0....|[-161.43668973437...|[7.74348518224750...|       1.0|    1|
|[8.52,1.48,0.0,0....|[947.60491743479,...|           [1.0,0.0]|       0.0|    0|
|(10,[0,1,7,8,9]

Qualitatively, we are not doing bad. However, this is just a qualitative evaluation. We need to be much more consistent on both the way we trained the model and the evaluation process. As for the first, model's performances depend on the way the data is splitted. So a way to avoid splitting bias is to perform cross-validation. Also, please keep in mind that any model is characterized by a set of hyperparameters, that typically need to be estimated. We can therefore add a specific grid for a set of hyperparameters, and perform a search to look for the best hyperparameters combination.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

In [None]:
grid = ParamGridBuilder()
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0,1]) # used to perform regularization
grid = grid.build()

In [None]:
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")

We initialize a `CrossValidator` instance by passing the model we wish to use - in our case a Logistic Regression, the hyperparameters we wish to explore, and the metric used to choose the best model.

In [None]:
cv = CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

The following snippet might take a while, since we need to perform grid search cross validation:

In [None]:
models = cv.fit(training)
best_lr = models.bestModel

We can easily access to the best model, as follows:

In [None]:
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_ea46406b0a92, numClasses=2, numFeatures=10


### Testing the model on unseen data 

Let us evaluate the model on new data: we pick the best model, we transform the raw test data with respect to that model, and we evaluate the performances.

In [None]:
test_results = best_lr.transform(test)

In [None]:
test_results.select('features', 'rawPrediction', 'probability', 'prediction', 'label').show()

+--------------------+--------------------+--------------------+----------+-----+
|            features|       rawPrediction|         probability|prediction|label|
+--------------------+--------------------+--------------------+----------+-----+
|[3.07,1.0,1.0,1.0...|[16.7379200648088...|[0.99999994619611...|       0.0|    0|
|[10.29,2.6,1.0,0....|[9.71900369344847...|[0.99993987373687...|       0.0|    0|
|[10.33,1.67,1.0,0...|[1.22517766467732...|[0.77297343848383...|       0.0|    0|
|[10.77,1.47,1.0,1...|[-1.1085218986669...|[0.24814655502182...|       1.0|    1|
|[11.02,1.98,1.0,1...|[3.12190993862046...|[0.95778751565675...|       0.0|    0|
|[11.59,1.5,1.0,1....|[-1.5681096705846...|[0.17248604013624...|       1.0|    1|
|[11.69,2.31,0.0,0...|[4.59536608620883...|[0.99000243744303...|       0.0|    0|
|[12.43,1.8,0.0,0....|[-0.3478353565796...|[0.41390743970723...|       1.0|    1|
|[12.66,2.5,1.0,0....|[4.58300563577699...|[0.98987935460645...|       0.0|    0|
|(10,[0,1,5,8,9]

In [None]:
metric_test = evaluator.evaluate(test_results)
print(f'Area Under Receiver Operating Characteristic: {metric_test}')

Area Under Receiver Operating Characteristic: 1.0


We are perfectly predicting the test labels.

In [None]:
# ====================================
# Validation Check
# DO NOT CHANGE THIS CELL
# ====================================
with open('results/vcf_01.txt', 'w') as f:
    f.write("%s\n" % metric_test)

**End Laboratory**