<img width="200" style="float:left" 
     src="https://upload.wikimedia.org/wikipedia/commons/f/f3/Apache_Spark_logo.svg" />

<img style="float:left" src="https://storage.googleapis.com/kaggle-competitions/kaggle/3136/logos/header.png" />

# Sections
* [Description](#0)
* [1. Setup](#1)
  * [1.1 Start Hadoop](#1.1)  
  * [1.2 Search for Spark Installation](#1.2)
  * [1.3 Create SparkSession](#1.3)
* [2. Lab](#2)
  * [2.1 Check Lab Files](#2.1)
* [3. Data Preparation](#3)
  * [3.1 Data Cleansing](#3.1)
  * [3.1 Feature Engineering](#3.1)
* [4. Model Training](#4)
* [5. Model Evaluation](#5)
* [6. Model Selection](#6)
* [7. Model Persistence](#7)
* [8. Model Loading](#8)
* [9. Challenge](#9)
* [10. TearDown](#10)
  * [10.1 Stop Hadoop](#10.1)

<a id='0'></a>
## Description
<p>
<div>The goal for this lab is:</div>
<ul>    
    <li>Practice Spark's Machine Learning API</li>
</ul>
</p>

The goal is to create a machine learning model to predict if a passenger would survive or not, therefore is a classification problem.

<a id='1'></a>
## 1. Setup

Since we are going to process data stored from HDFS let's start the service

<a id='1.1'></a>
### 1.1 Start Hadoop

Start Hadoop

Open a terminal and execute
```sh
hadoop-start.sh
```

<a id='1.2'></a>
### 1.2 Search for Spark Installation 
This step is only needed in our course environment; other Spark environments you might see out there, might not need this statement.

In [None]:
import findspark
findspark.init()

In order to improve how data is displayed, I'll setup Pandas accordingly.

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', None)

<a id='1.3'></a>
### 1.3 Create SparkSession

By setting this environment variable we can include extra libraries in our Spark cluster.<br/>

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/hive3/lib/hive-hcatalog-core-3.1.2.jar pyspark-shell'

Time to create the SparkSession which we'll use to send our Spark code:

In [None]:
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Titanic - Analytics - MLlib")
    .config("spark.sql.warehouse.dir","hdfs://localhost:9000/warehouse")
    .enableHiveSupport()
    .getOrCreate())

<a id='2'></a>
## 2. Lab

<a id='2.1'></a>
### 2.1 Check Lab Files

In order to complete this lab you need to previosly upload the datasets into HDFS.<br/>

Check you have the data ready in HDFS

http://localhost:50070/explorer.html#/datalake/raw/kaggle/titanic/

<a id='2.2'></a>
### 2.2 Data Information
We have the following information about the dataset:
<table style="float:left">
<tbody>
<tr><th><b>Variable</b></th><th><b>Definition</b></th><th><b>Key</b></th></tr>
<tr>
<td>survival</td>
<td>Survival</td>
<td>0 = No, 1 = Yes</td>
</tr>
<tr>
<td>pclass</td>
<td>Ticket class</td>
<td>1 = 1st, 2 = 2nd, 3 = 3rd</td>
</tr>
<tr>
<td>sex</td>
<td>Sex</td>
<td></td>
</tr>
<tr>
<td>Age</td>
<td>Age in years</td>
<td></td>
</tr>
<tr>
<td>sibsp</td>
<td># of siblings / spouses aboard the Titanic</td>
<td></td>
</tr>
<tr>
<td>parch</td>
<td># of parents / children aboard the Titanic</td>
<td></td>
</tr>
<tr>
<td>ticket</td>
<td>Ticket number</td>
<td></td>
</tr>
<tr>
<td>fare</td>
<td>Passenger fare</td>
<td></td>
</tr>
<tr>
<td>cabin</td>
<td>Cabin number</td>
<td></td>
</tr>
<tr>
<td>embarked</td>
<td>Port of Embarkation</td>
<td>C = Cherbourg, Q = Queenstown, S = Southampton</td>
</tr>
</tbody>
</table>

<a id='3'></a>
## 3. Data Preparation

<a id='3.1'></a>
### 3.1 Data Cleansing

In [None]:
titanic_raw = (spark.read
                    .option("inferSchema", "true")
                    .option('header', 'true')
                    .csv("hdfs://localhost:9000/datalake/raw/kaggle/titanic/")
                    .cache())

In [None]:
titanic_raw.limit(5).toPandas()

In [None]:
titanic_raw.printSchema()

Let's do some *Exploratory Data Analysis* to understand our data a bit better

In [None]:
passengers_count = titanic_raw.count()
print (f"Total number of passenger: {passengers_count}")

**Summary of data**:

In [None]:
titanic_raw.summary().toPandas()

Let's analize the number of **passengers who survived**:

In [None]:
titanic_raw.groupBy("Survived").count().toPandas()

342 out of the 891 passengers survived.

Let's dig into **specific information about survivors** by exploring some more data.

The survival rate can be determined by different features of the dataset such as *sex*, *port of embarcation*, *age*, ...

Let's analyze the **survival rate using feature sex**:

In [None]:
titanic_raw.groupBy("Sex","Survived").count().toPandas()

Even though the number of males is greater than the number of females in the ship, **females' survival rate is twice of the males'**.
Let's analyze the **survival rate using feature pclass**:

In [None]:
titanic_raw.groupBy("Pclass","Survived").count().toPandas()

It's clear that people in *pclass #1* had much more priority than people in *pclass #3*; **even though the number of passengers in pclass #3 was higher, the survival rate was very low.**

In [None]:
titanic_df = titanic_raw

### Null Values

Let's **check if there are null values** that we need to remove before moving forward.

There are **two ways of coming up with the number of null values**:

In [None]:
from pyspark.sql.functions import isnull, when, count, col

# Option 1
titanic_df.select([count(when(isnull(c), c)).alias(c) for c in titanic_df.columns]).toPandas()

The other way is summarizing

In [None]:
# Option 2
titanic_df.summary().toPandas()

There are 3 features with missing data.

**Cabin** feature has 687 (891 - 204) null values.

**Embarked** feature has 2 (891 - 889) null values.

**Age** feature has 177 (891 - 714) null values.

Let's decide what to do

#### Cabin
Since there are so many missing values we have to get rid of this feature

In [None]:
titanic_df = titanic_df.drop("Cabin")

#### Embarked
Embarked feature has only two missining values. Let's check values within Embarked



In [None]:
titanic_df.groupBy("Embarked").count().toPandas()

Majority Passengers boarded from "S". We can impute these with "S"

In [None]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

#### Age
This is going to be a tricky one :) in some scenarios you can replace a missing value with the mean,median, mode value of the dataset.There is a functionality available is Spark fot his job called <a href="https://spark.apache.org/docs/latest/ml-features.html#imputer">Imputer</a>

But this approach won't work here... **it might happen that you end up assignment a 4 years old kid a average age value of 29**.

We'll try to come up with a representative number based on other features. The **Name feature seems to be a good one** to calculate an average age for missing values.

**Names start with a word like Mr or Mrs**; calculating the average age for each group, will be closer to the real missing value... let's go for it:

In [None]:
from pyspark.sql.functions import regexp_extract
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

The regular expression "([A-Za-z]+)\\." extracts the words we're looking for: **text containing characters between A-Z or a-z and followed by a .(dot)**.

In [None]:
titanic_df.limit(5).toPandas()

In [None]:
titanic_df.select("Initial").distinct().sort("Initial").toPandas()

There are some misspelled Initials like Mlle or Mme that stand for Miss. I will replace them with Miss and same thing for other values.

In [None]:
titanic_df = titanic_df.replace(
               ['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [None]:
titanic_df.select("Initial").distinct().toPandas()

Let's impute missing values in age feature based on average age of Initials
1. Calculate the average value based on the initials and create a DataFrame with those values:

In [None]:
from pyspark.sql.functions import round

avg_age_df = (titanic_df.groupby('Initial').avg('Age')
                        .withColumnRenamed("avg(Age)","Age"))
avg_age_df.toPandas()

2. Let's create a temporary DF removing the Age column in those records/rows without a age (Age field with Null value):

In [None]:
titanic_df_noage = titanic_df.where(col("Age").isNull()).drop("Age")
titanic_df_noage.limit(1).toPandas()

3. Add an Age field to the previous DataFrame by joining it with the average age DataFrame:

In [None]:
titanic_df_noage_with_avg = titanic_df_noage.join(avg_age_df, "Initial")
titanic_df_noage_with_avg.limit(1).toPandas()

4. Take the original titanic_df, **keep records/rows with a non-null age** and add the previous DataFrame to the result via an union transformation:

In [None]:
titanic_df_fixed = (titanic_df.where(col("Age").isNotNull())
                      .unionByName(titanic_df_noage_with_avg))

titanic_df_fixed.where(col("Age").isNull()).count()

In [None]:
titanic_df = titanic_df_fixed

#### Family_size and Alone
Let's create a couple of features called **Family_size** and **Alone**, which might bring some insights on survival rate and the size of the families.

**Family_size** is the total number of *parch (parents/children)* and *sibsp (siblings/spouses)* per row. **Alone** will be a flag set when the size of the family equals to 0.

In [None]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [None]:
titanic_df.groupBy("Family_Size").count().toPandas()

In [None]:
from pyspark.sql.functions import lit
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(lit(0)))

In [None]:
titanic_df.columns

<a id='3.2'></a>
### 3.2. Feature Engineering
It's time to convert data into a suitable format for machine learning algorithms.<br/> 
First let's get rid of columns with unique values that don't contribute to a persons survival probability

In [None]:
from pyspark.sql.functions import countDistinct

titanic_df.select([countDistinct(c).alias(c) for c in titanic_df.columns]).toPandas()

In [None]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Initial")

In [None]:
titanic_df.printSchema()

Let's cast all numerical values to doubles

In [None]:
titanic_df = titanic_df.select(col('Survived').cast('double'),
                              col('Pclass').cast('double'),
                              col('Sex'),
                              col('Age').cast('double'),
                              col('SibSp').cast('double'),
                              col('Parch').cast('double'),
                              col('Fare').cast('double'),
                              col('Embarked'),
                              col('Family_Size').cast('double'),
                              col('Alone').cast('double')
                             )

In [None]:
titanic_df.printSchema()

#### 3.2.2 Feature Transformation

We need to **translate the values in string columns into  numerical values**.

In order to do so, we are going to *encode* categorical values using:<br/>

[StringIndexer](https://spark.apache.org/docs/latest/ml-features#stringindexer) <br/>
[OneHotEncoder](https://spark.apache.org/docs/latest/ml-features#onehotencoder)

In [None]:
label_column = "Survived"

categoricalCols = [field for (field, dataType) in titanic_df.dtypes if ((dataType == "string") & (field != label_column))]
numericCols = [field for (field, dataType) in titanic_df.dtypes if ((dataType == "double") & (field != label_column))]

print (f"categorical columns: {categoricalCols}")
print (f"numerical columns: {numericCols}")

In [None]:
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]

print (f"StringIndexer column names: {indexOutputCols}")
print (f"OHE column names: {oheOutputCols}")

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

oheEncoder = OneHotEncoder(inputCols=indexOutputCols,outputCols=oheOutputCols)

Checking StringIndexer

In [None]:
temp_df = stringIndexer.fit(titanic_df).transform(titanic_df)
temp_df.toPandas()

Checking OneHotEncoder

In [None]:
oheEncoder.fit(temp_df).transform(temp_df).toPandas()

In [None]:
assemblerInputs = oheOutputCols + numericCols
print("Feature columns: ",assemblerInputs)

#### 3.2.3 Feature Assembling

It's finally time to **assemble the features in one single vector**, which is what the algorithm will expect, by using something called [VectorAssembler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=vectorassembler#pyspark.ml.feature.VectorAssembler).

As the **"Survived" variable** is the one we want to predict, **all the other variables** will be considered to build the **list with required features**:

In [None]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol="features")

In [None]:
from pyspark.ml import Pipeline

test_pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler])
features_df = test_pipeline.fit(titanic_df).transform(titanic_df)
features_df.limit(2).toPandas()

<a id='4'></a>
## 4. Model Training

Here is the list of few classification algorithms from Spark ML we are going to try:

<ul>
<li>LogisticRegression</li>
<li>DecisionTreeClassifier</li>
<li>RandomForestClassifier</li>
<li>Gradient-boosted tree classifier</li>
<li>NaiveBayes</li>
<li>Linear Support Vector Machine</li>
</ul>

There are some points in the machine learning workflow were randomness takes place, for example during the sets splitting and some ML algorithms like RandomForest.

In order to make our experiments reproducible and get always the same results with the same data no matter how many times we execute our code, we are going to use a seed.

The seed is tipically a prime number.

In [None]:
seed=11

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import LinearSVC

lr = LogisticRegression(labelCol="Survived", featuresCol="features")
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features",seed=seed)
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features",maxDepth=10,seed=seed)
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10,seed=seed)
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
svm = LinearSVC(labelCol="Survived", featuresCol="features")

classifiers = [lr,dt,rf,gbt,nb,svm]
classifiers

Let's create a pipeline for every classifier

In [None]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]
pipelines

<a id='5'></a>
## 5. Model Evaluation
We're going to evaluate our classification model by using [MulticlassClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.MulticlassClassificationEvaluator.html)

We're going to use the accuray metric.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Survived",  metricName="accuracy")

<a id='6'></a>
## 6. Model Selection

Now that the data is all set, let's split it into training and test. We can use a 80-20 ratio

In [None]:
(trainingData, testData) = titanic_df.randomSplit([0.8,0.2],seed=seed)

It's a good practice to keep the same distribution of 0's and 1's in the training set, and specially critical in umbalanced/skew datasets. This is called **Stratified Train-Test Split**

In [None]:
(trainingData1, testData1) = titanic_df.where("Survived=0").randomSplit([0.8,0.2],seed=seed)
(trainingData2, testData2) = titanic_df.where("Survived=1").randomSplit([0.8,0.2],seed=seed)

traininData = trainingData1.unionByName(trainingData2)
testData = testData1.unionByName(testData2)

Let's train all the classifiers

In [None]:
models = [pipeline.fit(trainingData) for pipeline in pipelines]
models

Let's evaluate all the models

In [None]:
names = []
values = [] 
for model in models:
    prediction_df = model.transform(testData)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df = pd.DataFrame(data)
df.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df

The best model is **RandomForestClassificationModel**

In [None]:
best_model=df.iloc[0]['model']

Confusion matrix

In [None]:
best_model.transform(testData).groupby("Survived").pivot("prediction").count().toPandas()

<a id='7'></a>
## 7. Model Persistence
Spark provides functionality to save the model/pipeline so that we can use it later for inference (batch or streaming)

In [None]:
modelPath = "hdfs://localhost:9000/model-registry/titanic-survival-classifier"
best_model.write().overwrite().save(modelPath)

Check directory contents

http://localhost:50070/explorer.html#/model-registry/titanic-survival-classifier/


<a id='8'></a>
## 8. Model Loading

In [None]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [None]:
predictions = savedModel.transform(testData)
predictions.select("features", "Survived", "prediction").limit(200).toPandas()

<a id='9'></a>
## 10. Challenge

¿Can you improve this model?

Try to find and remove outliers.

Try new features or drop existing features.

Try different feature transformations. 

Try different feature scalers.

Try different algorithms and parameters.

Try cross-validation or train-validation split with grid parameters

<p style="color:white">https://www.kaggle.com/startupsci/titanic-data-science-solutions</p>

<a id='10'></a>
## 10. Tear Down

Once we complete the the lab we can stop all the services

<a id='10.1'></a>
### 10.1 Stop Hadoop

Stops Hadoop
Open a terminal and execute
```sh
hadoop-stop.sh
```