# Spark - Machine Learning Fundamentals
In this courselet, we are going to explore the basics in the use of Spark as a tool to train a Machine Learning model. By the end of this courselet, you should be able to:

- Recognize the fundamentals in training a ML model using Spark
- Identify two different ML algorithms
- Identify the main libraries and documentation to perform your tasks

This courselet presupposes a foundational understanding of fundamental machine learning processes and the Spark framework. It is designed primarily to introduce coding within these contexts, rather than to focus exclusively on the development of rigorously accurate models.

For this courselet, we will use taxi trips reported to the City of Chicago in 2020. This data is publicly available through the [Chicago Data Portal](https://data.cityofchicago.org/en/Transportation/Taxi-Trips-2020/r2u4-wwk3/about_data) If you previously covered the Exploratory Data Analysis with Spark courselet, you should be familiar with this dataset. 

In this courselet, we are going to explore the following cases:

- **Regression:** We are going to try to estimate the fare price of a trip, given a collection of features based on location, temporality, and trip duration.
- **Clustering:** We are going to segmentate our trips in 10 different clusters, using the coordinates and temporality components as our clustering features.

We are using the data in [Parquet format](https://parquet.apache.org/), given the several advantages of this format.

### Getting Started

As a very first step, we will start by initiating our Spark session

In [1]:
import pyspark
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("ML Process") \
    .getOrCreate()

In [2]:
# We start by loading the data and printing the schema
df = spark.read.parquet("data/chicago-taxi-2020.parquet",header=True, inferSchema=True)
df.printSchema() 

root
 |-- Trip ID: string (nullable = true)
 |-- Taxi ID: string (nullable = true)
 |-- Trip Start Timestamp: string (nullable = true)
 |-- Trip End Timestamp: string (nullable = true)
 |-- Trip Seconds: double (nullable = true)
 |-- Trip Miles: double (nullable = true)
 |-- Pickup Census Tract: double (nullable = true)
 |-- Dropoff Census Tract: double (nullable = true)
 |-- Pickup Community Area: double (nullable = true)
 |-- Dropoff Community Area: double (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Tips: double (nullable = true)
 |-- Tolls: double (nullable = true)
 |-- Extras: double (nullable = true)
 |-- Trip Total: double (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Pickup Centroid Latitude: double (nullable = true)
 |-- Pickup Centroid Longitude: double (nullable = true)
 |-- Pickup Centroid Location: string (nullable = true)
 |-- Dropoff Centroid Latitude: double (nullable = true)
 |-- Dropoff Centroid

In [3]:
# A display of the first few rows
df.limit(5).toPandas()

Unnamed: 0,Trip ID,Taxi ID,Trip Start Timestamp,Trip End Timestamp,Trip Seconds,Trip Miles,Pickup Census Tract,Dropoff Census Tract,Pickup Community Area,Dropoff Community Area,...,Extras,Trip Total,Payment Type,Company,Pickup Centroid Latitude,Pickup Centroid Longitude,Pickup Centroid Location,Dropoff Centroid Latitude,Dropoff Centroid Longitude,Dropoff Centroid Location
0,16c7456d99031528c238bd02f40df5ab9bdf9778,88d3be8c1334607f62a8c058f680dd7fbb57826ec7408d...,01/01/2020 12:00:00 AM,01/01/2020 12:00:00 AM,60.0,0.0,,,,,...,9.0,12.27,Credit Card,Top Cab Affiliation,,,,,,
1,472eef1d5c7a5e5ee033f673942f367dc71869f1,199fa05b63204aa1c620c161c5cebe43b0909b1ee99864...,01/01/2020 12:00:00 AM,01/01/2020 12:30:00 AM,1740.0,0.0,,,,43.0,...,0.0,13.0,Cash,Blue Ribbon Taxi Association Inc.,,,,41.761578,-87.572782,POINT (-87.5727819867 41.7615779081)
2,031a4d882fb3315a490d0b5c358c945ad9b9856d,aabecb47e958f99860a3b4d01f14d53644ac26126d9519...,01/01/2020 12:00:00 AM,01/01/2020 12:15:00 AM,720.0,0.7,,,8.0,8.0,...,5.0,14.8,Credit Card,Taxi Affiliation Services,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.899602,-87.633308,POINT (-87.6333080367 41.899602111)
3,3c416c246829dfb3f78cc93cbc7dfecdd379be15,ba106251fbb2b52177138ccbb8a1327a83c89470c240b2...,01/01/2020 12:00:00 AM,01/01/2020 12:15:00 AM,720.0,0.8,,,8.0,8.0,...,2.0,9.75,Cash,Taxi Affiliation Services,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.899602,-87.633308,POINT (-87.6333080367 41.899602111)
4,3c0a22971ae070ce35e0c03f1b8e92fe7aa840cd,1f1970d8e52c7e2575aeb68eb3b6ab0e21c77b728267df...,01/01/2020 12:00:00 AM,01/01/2020 12:00:00 AM,556.0,0.77,,,8.0,8.0,...,0.0,6.75,Cash,Sun Taxi,41.899602,-87.633308,POINT (-87.6333080367 41.899602111),41.899602,-87.633308,POINT (-87.6333080367 41.899602111)


### Exploratory Analysis

We briefly explore our data. We might want to start by analyzing the features that will be part of our regression model and displaying the missingness rate per column in the dataframe.

In [4]:
# List of continuous variables for our regression analysis
continuous = ["Trip Seconds", "Trip Miles", "Fare", "Tips", "Tolls", "Extras", "Trip Total"]
df_cont = df.select(continuous)
df_cont_summary = df_cont.describe()

df_cont_summary.show()

+-------+------------------+-----------------+------------------+------------------+--------------------+------------------+-----------------+
|summary|      Trip Seconds|       Trip Miles|              Fare|              Tips|               Tolls|            Extras|       Trip Total|
+-------+------------------+-----------------+------------------+------------------+--------------------+------------------+-----------------+
|  count|           3887483|          3889002|           3888700|           3888700|             3888700|           3888700|          3888700|
|   mean| 874.7527292080763|3.677313007297242|15.701397827037205|1.4626049785276904|0.002017751433641...|1.0959525856970214|18.36848553758443|
| stddev|1784.5672882912645|6.099401337576302| 88.83487861203875|2.8588320586587246|  0.2049892615858179|23.893356793019223|92.44467251419816|
|    min|               0.0|              0.0|               0.0|               0.0|                 0.0|               0.0|              0.0|

In [5]:
# Missingness rate per column
from pyspark.sql.functions import col, count, when, lit
total_count = df.count()
missingness_rate = df.select([((count(when(col(c).isNull(), c)) / lit(total_count))).alias(c) for c in df.columns])

missingness_rate.toPandas() # Using toPandas method to make it look nicely

Unnamed: 0,Trip ID,Taxi ID,Trip Start Timestamp,Trip End Timestamp,Trip Seconds,Trip Miles,Pickup Census Tract,Dropoff Census Tract,Pickup Community Area,Dropoff Community Area,...,Extras,Trip Total,Payment Type,Company,Pickup Centroid Latitude,Pickup Centroid Longitude,Pickup Centroid Location,Dropoff Centroid Latitude,Dropoff Centroid Longitude,Dropoff Centroid Location
0,0.0,5.2e-05,0.0,0.000148,0.000398,8e-06,0.541555,0.544762,0.071719,0.094838,...,8.5e-05,8.5e-05,0.0,0.0,0.071538,0.071538,0.071538,0.093113,0.093113,0.093113


### Data pre-processing and feature engineering

**Regression Case**

For our regression modeling, our data pre-processing will go as follows:
1. We will start by creating a sub-df in which we'll exclusively keep those features that are part of our analysis
2. We will remove outliers from the *Fare* column (our target feature) by using the [1.5xIQR rule](https://www.khanacademy.org/math/statistics-probability/summarizing-quantitative-data/box-whisker-plots/a/identifying-outliers-iqr-rule#:~:text=A%20commonly%20used%20rule%20says,3%20%2B%201.5%20%E2%8B%85%20IQR%20%E2%80%8D%20.)
3. We will extract the hour and the day of the week from *Trip Start Timestamp*
4. We will encode the hour, day of the week and community area of the pick up to treat them as categories for the model
5. We will place all of our explanatory features into a vector column using [VectorAssembler](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html)
6. We will reduce the number of features using Principal Component Analysis
7. We will create our final dataframe, keeping the relevant features with *Fare*, our target column.

**Vector Assembler** 

An important component of our data preparation pipeline will be the transformation of our dataframees into a column vector representations through the use of the VectorAssembler class. By transforming our dataframe into this representation, as PySpark algorithms need data to be represented like that in order to achieve an efficiente parallel processing.

In [6]:
# Select regression features
regression_features = ["Trip Start Timestamp", "Trip Seconds", "Trip Miles", "Pickup Community Area", "Fare"]
df_reg = df.select(*regression_features).dropna(how='any', subset=regression_features) # We make sure to drop any missing values

In [7]:
# Removing outliers from our target column
from pyspark.sql.functions import col, lit

quantiles = df_reg.approxQuantile("Fare", [0.25, 0.75], 0.05) # https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.approxQuantile.html
Q1, Q3 = quantiles

IQR = Q3 - Q1 #Calculate IQR
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

df_reg = df_reg.filter((col("Fare") >= lit(lower_bound)) & (col("Fare") <= lit(upper_bound))) # We remove the outliers using the bounds

In [8]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import to_timestamp, hour, dayofweek

# We start by converting the column's format to timestamp and extracting hour and day of the week
df_reg =  df_reg.withColumn("Trip Start Timestamp", 
                            to_timestamp("Trip Start Timestamp", 'MM/dd/yyyy hh:mm:ss a')) \
                .withColumn("pickup_hour", hour("Trip Start Timestamp")) \
                .withColumn("pickup_day_of_week", dayofweek("Trip Start Timestamp"))

# We have to convert the new columns into string format to index them
df_reg = df_reg.withColumn("pickup_hour", df_reg["pickup_hour"].cast("string")) \
               .withColumn("pickup_day_of_week", df_reg["pickup_day_of_week"].cast("string"))

# Now we index and encode the new columns, along with Pickup Community Area
hour_indexer = StringIndexer(inputCol="pickup_hour", 
                             outputCol="pickup_hour_indexed")
day_of_week_indexer = StringIndexer(inputCol="pickup_day_of_week", 
                                    outputCol="pickup_day_of_week_indexed")
community_area_indexer = StringIndexer(inputCol="Pickup Community Area", 
                                       outputCol="Pickup Community Area Index")

hour_encoder = OneHotEncoder(inputCols=["pickup_hour_indexed"], 
                             outputCols=["pickup_hour_vec"])
day_of_week_encoder = OneHotEncoder(inputCols=["pickup_day_of_week_indexed"], 
                                    outputCols=["pickup_day_of_week_vec"])
community_area_encoder = OneHotEncoder(inputCols=["Pickup Community Area Index"], 
                                       outputCols=["pickup_community_area_vec"])

# We create a transformations (pipeline https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.Pipeline.html)
pipeline_reg = Pipeline(stages=[hour_indexer, day_of_week_indexer, 
                            community_area_indexer, hour_encoder, 
                            day_of_week_encoder, community_area_encoder])

# Now we execute the pipeline
df_reg = pipeline_reg.fit(df_reg).transform(df_reg)


In [9]:
# Now we transform our features of interest into a single column vector called "features" 
reg_input_cols = ["Trip Seconds", "Trip Miles", "pickup_hour_vec", "pickup_day_of_week_vec", "pickup_community_area_vec"]
reg_assembler = VectorAssembler(inputCols=reg_input_cols, outputCol="features")
df_reg = reg_assembler.transform(df_reg)

Now, to reduce the dimensionality and only keep the most important features, we will apply [Principal Component Analysis]() to reduce the number of features and only keep 3 components.

In [10]:
from pyspark.ml.feature import PCA

# We apply PCA on the "features" column
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
pcaModel = pca.fit(df_reg)
df_reg_pca = pcaModel.transform(df_reg)

In [11]:
# Our final data only includes our components vector and the target column (Fare)
reg_data= df_reg_pca.select(col("pcaFeatures").alias("features"), col("Fare"))

**Clustering Case**

For our regression modeling, our data pre-processing will go as follows:

1. We will start by creating a sub-df in which we'll exclusively keep those features that are part of our analysis
2. We will extract the hour and the day of the week from *Trip Start Timestamp*
3. We will encode the hour and day of the week to treat them as categories
4. We will place all of our explanatory features into a vector column using [VectorAssembler](https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html)l data.

In [12]:
# Select clustering features 
clustering_features = ["Trip Start Timestamp", "Pickup Centroid Latitude", "Pickup Centroid Longitude"]
df_clust = df.select(*clustering_features).dropna(how='any', subset=clustering_features) # We make sure to drop any missing values

In [13]:
# The following lines represent a similar process as the one we did before for regression
df_clust =  df_clust.withColumn("Trip Start Timestamp", 
                            to_timestamp("Trip Start Timestamp", 'MM/dd/yyyy hh:mm:ss a')) \
                .withColumn("pickup_hour", hour("Trip Start Timestamp")) \
                .withColumn("pickup_day_of_week", dayofweek("Trip Start Timestamp"))

df_clust = df_clust.withColumn("pickup_hour", df_clust["pickup_hour"].cast("string")) \
               .withColumn("pickup_day_of_week", df_clust["pickup_day_of_week"].cast("string"))

pipeline_clust = Pipeline(stages=[hour_indexer, day_of_week_indexer, 
                            hour_encoder, day_of_week_encoder])

df_clust = pipeline_clust.fit(df_clust).transform(df_clust)

In [14]:
clust_input_cols = ["pickup_hour_vec", "pickup_day_of_week_vec", "Pickup Centroid Latitude", "Pickup Centroid Longitude"]
clust_assembler = VectorAssembler(inputCols=clust_input_cols, outputCol="features")
df_clust = clust_assembler.transform(df_clust)

### Training

**Regression**

We proceed to train our model. First, we will split the data into our training and testing datasets at a 80/20 distribution.

In [15]:
# We split the data into training and testing
train, test = reg_data.randomSplit([0.8, 0.2], seed=42)

In [16]:
# Now we train the model
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="Fare")
lr_model = lr.fit(train)

**Clustering**

Given the nature of cluster analysis, splitting is not necessary. We set the number of clusters to 10 and run the algorithm.

In [17]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(10).setSeed(123)
cluster_model = kmeans.fit(df_clust)

### Evaluation

**Regression** 

To evaluate the performance of our regression model, we calculate the [Root Mean Square Error](https://en.wikipedia.org/wiki/Root-mean-square_deviation).

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator

# We first make create a new df with a new columns named "prediction", using our LR Model
predictions = lr_model.transform(test)

# Now we create a evaluator, setting the column, the
reg_evaluator = RegressionEvaluator(labelCol="Fare", predictionCol="prediction", metricName="rmse")

# Calculate RMSE
rmse = reg_evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Root Mean Squared Error (RMSE) on test data = 4.272231860406048


**Clustering**

For clustering, we are evaluating by using the [Silhouette Score](https://en.wikipedia.org/wiki/Silhouette_(clustering)#:~:text=The%20silhouette%20ranges%20from%20%E2%88%921,poorly%20matched%20to%20neighboring%20clusters.).

In [19]:
from pyspark.ml.evaluation import ClusteringEvaluator

clusters = cluster_model.transform(df_clust)

# Evaluate the model
clust_evaluator = ClusteringEvaluator()

# Calculate Silhoutte Score
silhouette = clust_evaluator.evaluate(clusters)
print(f"Silhouette Score: {silhouette}")

Silhouette Score: 0.3096852624096427


### Saving the model for deployment

Once we are comfortable with the results of a model, and we want to make it availble for production deployment, we can store the model through the *.save()* method, to eventually load it in the production environment using the *.load()* method.

In [None]:
# Choosing the current path for storing. This can be any path
path = "the/path/to/lr_model"
lr_model.save(path)

In [None]:
# Now, in a production environment, we could load the model like this

model_path = "the/path/to/lr_model"
model = LinearRegressionModel.load(model_path)

### Explore More

The extensive collection of algorithms, framweworks and utilities that PySpark offers for Machine Learning tasks can be found in the following links:

- [MLlib (DataFrame-based)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)
- [MLlib (RDD-based)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.mllib.html)