## Titanic - Machine Learning from Disaster Dataset using `pyspark.ml`

### I. Project background and Data Information

This is a famous dataset for machine learning. A description of the dataset can be found at the [kaggle website](https://www.kaggle.com/c/titanic/data). In the following, we apply the logistic regression model from pyspark.ml package to this dataset. The goal is to predict the survival of passagers on board titanic, and to use the pipeline and feature engineering tools from pyspark.ml. 

![Data Dictionary](https://i.ibb.co/mhY89DG/titanic-datadictionary.png)

1.1 Load the data into a Spark DataFrame `titanic`.

In [0]:
titanic = spark.read.csv("file:/databricks/driver/titanic.csv",inferSchema=True, header=True).cache()

In [0]:
# Print Schema
titanic.printSchema()

# Display first 5 rows of data
titanic.limit(5).toPandas()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


### II. Data exploration
- detect data problems
- inform the data engineering steps
- inform the feature selection

Obtain summary statistics on the dataframe, which will inform our data processing strategies

In [0]:
# obtain descriptive statistics
titanic.describe().display()

summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [0]:
# checking missing value
for c in titanic.columns:
  print(c,titanic.filter(titanic[c].isNull()).count())

PassengerId 0
Survived 0
Pclass 0
Name 0
Sex 0
Age 177
SibSp 0
Parch 0
Ticket 0
Fare 0
Cabin 687
Embarked 2


We can observe that Age, Cabin, and Embarked have missing values.

In [0]:
import pyspark.sql.functions as f

def count_sort(col):
   return titanic_cat.select(col).groupBy(col).agg(f.count(col).alias("count")).sort("count",ascending=False).limit(10)

We can observe that:

  > Continuous:
  * Age
  * Fare
  
  > Discrete:
  * SibSp
  * Parch

In [0]:
# Select the categorical columns that is needed
titanic_cat = titanic.select("PassengerId","Survived","Pclass","Sex","SibSp","Parch","Cabin","Embarked")

In [0]:
# using a for loop to apply the count_sort() function for each column and get the top 10 frequency
for c in titanic_cat.columns:
  display(count_sort(f.col(c)))

PassengerId,count
148,1
463,1
471,1
496,1
833,1
243,1
392,1
540,1
623,1
737,1


Survived,count
0,549
1,342


Pclass,count
3,491
1,216
2,184


Sex,count
male,577
female,314


SibSp,count
0,608
1,209
2,28
4,18
3,16
8,7
5,5


Parch,count
0,678
1,118
2,80
5,5
3,5
4,4
6,1


Cabin,count
C23 C25 C27,4
G6,4
B96 B98,4
D,3
C22 C26,3
F2,3
E101,3
F33,3
C65,2
B51 B53 B55,2


Embarked,count
S,644
C,168
Q,77
,0


**Distribution of numerical columns**:

In [0]:
# use built-in historgram function instead (I chose bin number of 8)
# random sample: sample(False, 0.5) - means get 50% sample

from pyspark.sql.functions import round
titanic.select(titanic['Age']).sample(False, 0.5).display()


Age
22.0
35.0
35.0
2.0
27.0
14.0
4.0
58.0
20.0
39.0


Databricks visualization. Run in Databricks to view.

### III. Feature Engineering

The goal of this step is to do necessary feature engineering. 

Here we will focus on
- dealing with missing values
- creating new columns
- converting data types

In [0]:
# select label survied and features that will be used in the model
titanic_cast = titanic.select("Survived","Age","Pclass","Sex","SibSp","fare","Parch","Embarked")

In [0]:
# check schema
titanic_cast.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Embarked: string (nullable = true)



In [0]:
# convert all numerical columns into double type becuase pySpark sometimes (in earlier Spark versions) does not work well with other numerical types.
titanic_cast = titanic_cast.withColumn("Survived",titanic_cast.Survived.cast("double")) \
        .withColumn("Pclass",titanic_cast.Pclass.cast("double")) \
        .withColumn("SibSp",titanic_cast.SibSp.cast("double")) \
        .withColumn("Parch",titanic_cast.Parch.cast("double"))
titanic_cast.printSchema()

root
 |-- Survived: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Embarked: string (nullable = true)



In [0]:
# 1. add new column AgeNA, 1 means imputed; 0 means not imputed
# 2. then set to double type
# 3. impute null age to 29.69911764705882

titanic_final = titanic_cast.withColumn("AgeNA", f.when(titanic_cast.Age.isNull(), 1).otherwise(0))

titanic_final = titanic_final.withColumn("AgeNA",titanic_final.AgeNA.cast("double")) \
                .withColumn("Age", f.when(titanic_final.Age.isNull(), 29.69911764705882).otherwise(titanic_final.Age))

# 4. drop Embarked NA
titanic_final = titanic_final.filter(titanic_final.Embarked.isNotNull())


In [0]:
# print Schema
titanic_final.printSchema()

# obtain descriptive statistics
titanic_final.describe().display()

root
 |-- Survived: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- AgeNA: double (nullable = false)



summary,Survived,Age,Pclass,Sex,SibSp,fare,Parch,Embarked,AgeNA
count,889.0,889.0,889.0,889,889.0,889.0,889.0,889,889.0
mean,0.3824521934758155,29.653446370674192,2.3115860517435323,,0.5241844769403825,32.09668087739029,0.3824521934758155,,0.1991001124859392
stddev,0.4862596883147733,12.968366309252314,0.8346997785705753,,1.103704875596923,49.69750431670795,0.8067607445174785,,0.3995482811002537
min,0.0,0.42,1.0,female,0.0,0.0,0.0,C,0.0
max,1.0,80.0,3.0,male,8.0,512.3292,6.0,S,1.0


In [0]:
# check Embarked null (checked again to see if there are still missing value)
titanic_final.filter(titanic_final.Embarked.isNull()).count()

Out[27]: 0

### IIII. Indexers and Encoders for string/categorical columns (for use with Pipeline)

String columns cannot be directly in with some of the models such as LogisticRegression. Neither are categorical columns. Our strategy is to convert string and categorical variables into a series of binary dummies. 

To convert indexed categorical values into a vector of binary indicators, we can leverage `OneHotEncoder`

Finally, we need to assemble a vector column with all the numerical features. This is achieved by `VectorAssembler`


- [StringIndexer](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.StringIndexer): for convert string labels into numerical labels (0, 1,...), ordered by label frequencies
- [OneHotEncoderEstimator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.OneHotEncoderEstimator): mapping a column of category indices to a column of binary vectors (the least frequent one will be dropped by default).
- [VectorAssembler](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler): merges multiple columns into one vector columns needed for most algorithms

In [0]:
# use stringIndexer to convert the Embarked and Sex columns

from pyspark.ml.feature import StringIndexer,VectorAssembler

si =StringIndexer(inputCols=["Embarked", "Sex"], outputCols = ["Embarked_idx", "Sex_idx"])
titanic_final_si = si.fit(titanic_final).transform(titanic_final)

titanic_final_si.limit(5).display()

Survived,Age,Pclass,Sex,SibSp,fare,Parch,Embarked,AgeNA,Embarked_idx,Sex_idx
0.0,22.0,3.0,male,1.0,7.25,0.0,S,0.0,0.0,0.0
1.0,38.0,1.0,female,1.0,71.2833,0.0,C,0.0,1.0,1.0
1.0,26.0,3.0,female,0.0,7.925,0.0,S,0.0,0.0,1.0
1.0,35.0,1.0,female,1.0,53.1,0.0,S,0.0,0.0,1.0
0.0,35.0,3.0,male,0.0,8.05,0.0,S,0.0,0.0,0.0


In [0]:
# use oneHotEncoder to convert the Sex_idx and Embarked_idx column
from pyspark.ml.feature import OneHotEncoder

ohe = OneHotEncoder(inputCols=["Embarked_idx", "Sex_idx"], outputCols=["Embarked_ohe", "Sex_ohe"])
titanic_final_ohe = ohe.fit(titanic_final_si).transform(titanic_final_si)

titanic_final_ohe.limit(5).display()

Survived,Age,Pclass,Sex,SibSp,fare,Parch,Embarked,AgeNA,Embarked_idx,Sex_idx,Embarked_ohe,Sex_ohe
0.0,22.0,3.0,male,1.0,7.25,0.0,S,0.0,0.0,0.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))"
1.0,38.0,1.0,female,1.0,71.2833,0.0,C,0.0,1.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(1), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())"
1.0,26.0,3.0,female,0.0,7.925,0.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())"
1.0,35.0,1.0,female,1.0,53.1,0.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())"
0.0,35.0,3.0,male,0.0,8.05,0.0,S,0.0,0.0,0.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))"


### V. Assemble feature columns into a feature vector (for use with the pipeline)

In [0]:
from pyspark.ml.feature import VectorAssembler

# remove the dependent variable cnt
va = VectorAssembler(inputCols=titanic_final_ohe.drop('sex','Embarked','Embarked_idx','Sex_idx','Survived').columns, outputCol="features")

### VI: Model

##### Logistic Regression

##### 6.1 set up the model

In [0]:
# Create logistic regression estimator
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features",labelCol="Survived")

In [0]:
from pyspark.ml import Pipeline
# define pipeline
pl = Pipeline(stages = [si, ohe, va, lr])

##### 6.2 Split data into training and testing

In [0]:
# split data
train, test = titanic_final.randomSplit([0.7,0.3],seed=0)

print("train {} test {}".format(train.count(), test.count()))

train 618 test 271


##### 6.3 fit to model

In [0]:
# train the training set with predefined pipline
plmodel = pl.fit(train)

In [0]:
# fitted model on the test data
results = plmodel.transform(test)

In [0]:
from pyspark.ml.classification import LogisticRegression
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(plmodel.stages[3].coefficients))
print("Intercept: " + str(plmodel.stages[3].intercept))

Coefficients: [-0.034537817836000746,-1.0247165780235168,-0.24085965933583653,0.0017497630720823196,-0.1435968947744598,-0.056046921051805344,-0.07096776976685322,0.24968987411260246,-2.641291131192507]
Intercept: 4.615451431020967


##### 6.4 Evaluate model performance

In [0]:
display(results.limit(5))

Survived,Age,Pclass,Sex,SibSp,fare,Parch,Embarked,AgeNA,Embarked_idx,Sex_idx,Embarked_ohe,Sex_ohe,features,rawPrediction,probability,prediction
0.0,1.0,3.0,male,4.0,39.6875,1.0,S,0.0,0.0,0.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 9, values -> List(1.0, 3.0, 4.0, 39.6875, 1.0, 0.0, 1.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(2.2430868320394843, -2.2430868320394843))","Map(vectorType -> dense, length -> 2, values -> List(0.904052549008072, 0.09594745099192803))",0.0
0.0,2.0,3.0,female,3.0,27.9,2.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 9, values -> List(2.0, 3.0, 3.0, 27.9, 2.0, 0.0, 1.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.4403039136662299, 0.4403039136662299))","Map(vectorType -> dense, length -> 2, values -> List(0.39166855509357174, 0.6083314449064283))",1.0
0.0,2.0,3.0,male,4.0,39.6875,1.0,S,0.0,0.0,0.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 9, values -> List(2.0, 3.0, 4.0, 39.6875, 1.0, 0.0, 1.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(2.2776246498754844, -2.2776246498754844))","Map(vectorType -> dense, length -> 2, values -> List(0.9070068906498442, 0.09299310935015581))",0.0
0.0,9.0,3.0,female,3.0,27.9,2.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 9, values -> List(9.0, 3.0, 3.0, 27.9, 2.0, 0.0, 1.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(-0.19853918881422405, 0.19853918881422405))","Map(vectorType -> dense, length -> 2, values -> List(0.4505276039251695, 0.5494723960748304))",1.0
0.0,9.0,3.0,female,4.0,31.275,2.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 9, values -> List(9.0, 3.0, 4.0, 31.275, 2.0, 0.0, 1.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(0.036415020153334865, -0.036415020153334865))","Map(vectorType -> dense, length -> 2, values -> List(0.509102749166052, 0.490897250833948))",0.0


In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# evaluate the acu with BinaryClassificationEvaluator
# evaluate the accuracy and f1 with MultiClassificationEvaluator
accuracy = MulticlassClassificationEvaluator(labelCol='Survived', metricName="accuracy")
auc = BinaryClassificationEvaluator(labelCol='Survived', metricName="areaUnderROC")
f1 = MulticlassClassificationEvaluator(labelCol='Survived', metricName="f1")

print(accuracy.evaluate(results))
print(auc.evaluate(results))
print(f1.evaluate(results))


0.8339483394833949
0.8772224754957838
0.8328755708063246


##### Random Forest

In [0]:
from pyspark.ml.classification import RandomForestClassifier

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10)

# build pipeline of rf
rfpl = Pipeline(stages = [si, ohe, va, rf])

# train the training set with predefined pipline
plmodel_rf = rfpl.fit(train)

# fitted model on the test data
rf_results = plmodel_rf.transform(test)

# report performance metrics
print(accuracy.evaluate(rf_results))
print(auc.evaluate(rf_results))
print(f1.evaluate(rf_results))

0.8081180811808119
0.872891497606565
0.8074465495085243


In [0]:
# define the paramGrid

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder().addGrid(rf.maxDepth,[5,7]).addGrid(rf.numTrees,[20,30]).build()

# create the crossvalidator
cv = CrossValidator(estimator=rf, evaluator=auc, estimatorParamMaps=paramGrid)

# build pipeline and have cv as part of it
pipeline = Pipeline(stages = [si, ohe, va, cv])

# train the pipeline
pipelineModel = pipeline.fit(train)

# fitted model on the test data
rfcv_results = pipelineModel.transform(test)

# predictions results (shows 5 rows of the results)
display(rfcv_results.limit(5))

# report performance metrics
print(accuracy.evaluate(rfcv_results))
print(auc.evaluate(rfcv_results))
print(f1.evaluate(rfcv_results))

Survived,Age,Pclass,Sex,SibSp,fare,Parch,Embarked,AgeNA,Embarked_idx,Sex_idx,Embarked_ohe,Sex_ohe,features,rawPrediction,probability,prediction
0.0,1.0,3.0,male,4.0,39.6875,1.0,S,0.0,0.0,0.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 9, values -> List(1.0, 3.0, 4.0, 39.6875, 1.0, 0.0, 1.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(14.549486624486626, 5.450513375513375))","Map(vectorType -> dense, length -> 2, values -> List(0.7274743312243312, 0.27252566877566875))",0.0
0.0,2.0,3.0,female,3.0,27.9,2.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 9, values -> List(2.0, 3.0, 3.0, 27.9, 2.0, 0.0, 1.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(17.01424658674659, 2.9857534132534136))","Map(vectorType -> dense, length -> 2, values -> List(0.8507123293373294, 0.14928767066267065))",0.0
0.0,2.0,3.0,male,4.0,39.6875,1.0,S,0.0,0.0,0.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(0), values -> List(1.0))","Map(vectorType -> dense, length -> 9, values -> List(2.0, 3.0, 4.0, 39.6875, 1.0, 0.0, 1.0, 0.0, 1.0))","Map(vectorType -> dense, length -> 2, values -> List(14.549486624486626, 5.450513375513375))","Map(vectorType -> dense, length -> 2, values -> List(0.7274743312243312, 0.27252566877566875))",0.0
0.0,9.0,3.0,female,3.0,27.9,2.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 9, values -> List(9.0, 3.0, 3.0, 27.9, 2.0, 0.0, 1.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(17.206670829170832, 2.7933291708291708))","Map(vectorType -> dense, length -> 2, values -> List(0.8603335414585415, 0.1396664585414585))",0.0
0.0,9.0,3.0,female,4.0,31.275,2.0,S,0.0,0.0,1.0,"Map(vectorType -> sparse, length -> 2, indices -> List(0), values -> List(1.0))","Map(vectorType -> sparse, length -> 1, indices -> List(), values -> List())","Map(vectorType -> dense, length -> 9, values -> List(9.0, 3.0, 4.0, 31.275, 2.0, 0.0, 1.0, 0.0, 0.0))","Map(vectorType -> dense, length -> 2, values -> List(16.456670829170832, 3.543329170829171))","Map(vectorType -> dense, length -> 2, values -> List(0.8228335414585415, 0.17716645854145852))",0.0


0.8487084870848709
0.8581319808525198
0.8459081527473851


In [0]:
# View the best model parameters
print(pipelineModel.stages[3].bestModel.explainParams())

bootstrap: Whether bootstrap samples are used when building trees. (default: True)
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the featur