# Tutorial of PySpark on Titanic Dataset

Apache Spark is an open-source distributed framework that is used for Big Data analysis. 
Pyspark is the Python API for Apache Spark, equipped with libraries such as PySparkSQL and MLlib.

<img src="images/distributed-computing-with-spark-7-638.jpg" alt="distributed computing image" width ="500"/>
<img src="images/pyspark-overview.png" alt="pyspark overview image" width ="500"/>

In [None]:
import pandas_profiling

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression

spark = SparkSession.builder.appName('titanic_logreg').getOrCreate()
df = spark.read.csv('titanic.csv', inferSchema = True, header = True)
df.show(5)

## Explore Data

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
titanic_df = df.toPandas()
titanic_df.sample(5)
#note:
#"sibsp" == Number of Siblings/Spouses Aboard
#"parch" == Number of Parents/Children Aboard

In [None]:
titanic_df.info()

In [None]:
titanic_df.describe()

## Pre-processing

In [None]:
# Drop PassengerId, Cabin, Ticket
my_col = df.select(['Survived','Pclass','Sex','Age','SibSp','Parch','Fare','Embarked'])

In [None]:
final_data = my_col.na.drop()
final_data.toPandas().info()

## Exploring Data for Insights

Visually plotting a data category (eg, passenger age) as it relates to categorical data (eg, pclass = passenger class) may give insights into how data may be correlated.  

In the graph below, we see the age distributions of passengers in the different fare classes (1st, 2nd, 3rd), where passenger class may be taken as a proxy for socio-economic class.

In [None]:
import matplotlib.pyplot as plt
for x in [1,2,3]:    ## for 3 classes
    final_data.toPandas().Age[final_data.toPandas().Pclass == x].plot(kind="kde")
plt.title("Age wrt Pclass")
plt.legend(("1st","2nd","3rd"))

## A Deeper Dive into Data Distributions and Correlations

The titanic_df.describe() call, above, provided a quick view of statistical descriptors of each data type.  For a deeper dive, the pandas-profiling .profile_report() call, below, provides further information, such as visual summaries of distributions, plotting of one data interactions, the highlighting of highly correlated variables, Spearman, Pearson and Kendall matrices, etc. 


In [None]:
final_data.toPandas().profile_report()

## VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder

A <b>transformer</b> converts one data frame to another, oftentimes appending or combining columns.The transformers below are considered <b>feature transformers</b>, as they transform specific features in the data frame instead of the whole dataframe itself.

* <b>VectorAssembler</b>: combines a given list of columns into one single vector. Often used to combine raw features and features generated by other feature transformers into one feature vector. 
* <b>StringIndexer:</b> encodes a string column of labels to a column of label indices
* <b>VectorIndexer:</b> takes an input column in vector form, decides which values are categorical and changes those values to indices. This is often used for Decision Trees and Tree Ensembles.
* <b>OneHotEncoder:</b> maps a categorical feature, represented as a label index, to a binary vector. Each binary entry in that vector indicates the presence of a certain feature value out of the all the categorical features.

In [None]:
from pyspark.ml.feature import (VectorAssembler, StringIndexer, VectorIndexer, OneHotEncoder)

gender_indexer = StringIndexer(inputCol = 'Sex', outputCol = 'SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol = 'SexVec')

In [None]:
embark_indexer = StringIndexer(inputCol = 'Embarked', outputCol = 'EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol = 'EmbarkIndex', outputCol = 'EmbarkVec')

In [None]:
assembler = VectorAssembler(inputCols = ['Pclass', 'SexVec', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkVec'], outputCol = 'features')

## What is a Pipeline?

Spark represents the machine learning workflow using functions called transformers and estimators, located within a pipeline that links these functions together. 

As mentioned earlier, a <b>transformer</b> converts one data frame to another, oftentimes appending or combining columns. An <b>estimator</b> is an algorithm that fits or trains  data in some way, producing a model for that data.

A <b>pipeline</b> is a series of pipeline stages linked together. Pipeline stages are either transformers and estimators. Our pipeline below consists of 6 stages. It links the feature transformers of gender_indexer, embark_indexer, gender_encoder, embark_encoder and assembler, with our estimator, log_reg.

<img src="images/ml-Pipeline.png" alt= "pipeline image" width="500">
<img src="images/ml-PipelineModel.png" alt="pipelinemodel image" width="500">

In [None]:
from pyspark.ml import Pipeline

log_reg = LogisticRegression(featuresCol = 'features', labelCol = 'Survived')

In [None]:
pipeline = Pipeline(stages = [gender_indexer, embark_indexer, 
                             gender_encoder, embark_encoder,
                             assembler, log_reg])

In [None]:
train, test = final_data.randomSplit([0.7, 0.3])

The Estimator class has a <b>fit()</b> function which is called on the train data frame, passing that data through the stages of our pipeline. The fit() function trains and creates a linear regression model (we call it fit_model) that we can now pass data into, in order to make predictions as to whether a passenger died or survived. 

Our fitted pipeline, fit_model, is a <b>transformer</b>. It transforms data frames of passenger information into dataframes that include predictions by calling the Transformer class' transform() function on our test data set.

In [None]:
fit_model = pipeline.fit(train)

In [None]:
results = fit_model.transform(test)

In [None]:
results.select('prediction', 'Survived').toPandas().sample(10)

## Visualization

After the model is trained and tested, one way the results can be explored is through different types of plots that emphasize predictions per data type, like below.

In [None]:
import seaborn as sns
g = sns.FacetGrid(results.toPandas(), col="Sex", row="Survived", margin_titles=True)
g.map(plt.hist, "prediction",color="purple");

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction', labelCol = 'Survived')
AUC = eval.evaluate(results)
AUC

Or, for example, display aggregate predictions.

In [None]:
g = sns.FacetGrid(results.toPandas(),col='Survived')
g = g.map(sns.distplot,'prediction') #was age