# Training a Classification Model on the Titanic Dataset using Spark

## Load Libraries and Begin a Spark Session

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [34]:
spark

## Load Source Data

In [35]:
csv = spark.read.csv('./titanic_dataset.csv', inferSchema=True, header=True)
csv.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

## Preprocess Data

In [36]:
data = csv.select(
    'Pclass', 
    'Sex',
    'SibSp', 
    'Parch', 
    'Age', 
    'Fare', 
    col('Survived').alias('label')
)
data.show(10)

+------+------+-----+-----+----+-------+-----+
|Pclass|   Sex|SibSp|Parch| Age|   Fare|label|
+------+------+-----+-----+----+-------+-----+
|     3|  male|    1|    0|22.0|   7.25|    0|
|     1|female|    1|    0|38.0|71.2833|    1|
|     3|female|    0|    0|26.0|  7.925|    1|
|     1|female|    1|    0|35.0|   53.1|    1|
|     3|  male|    0|    0|35.0|   8.05|    0|
|     3|  male|    0|    0|null| 8.4583|    0|
|     1|  male|    0|    0|54.0|51.8625|    0|
|     3|  male|    3|    1| 2.0| 21.075|    0|
|     3|female|    0|    2|27.0|11.1333|    1|
|     2|female|    1|    0|14.0|30.0708|    1|
+------+------+-----+-----+----+-------+-----+
only showing top 10 rows



In [37]:
# Impute nulls in Age column with the median.

median_age = data.approxQuantile("Age", [0.5], 0.25)[0]
data = data.withColumn(
    "Age",
    when(col("Age").isNull(), median_age).otherwise(col("Age"))
)
data.show(10)

+------+------+-----+-----+----+-------+-----+
|Pclass|   Sex|SibSp|Parch| Age|   Fare|label|
+------+------+-----+-----+----+-------+-----+
|     3|  male|    1|    0|22.0|   7.25|    0|
|     1|female|    1|    0|38.0|71.2833|    1|
|     3|female|    0|    0|26.0|  7.925|    1|
|     1|female|    1|    0|35.0|   53.1|    1|
|     3|  male|    0|    0|35.0|   8.05|    0|
|     3|  male|    0|    0|21.0| 8.4583|    0|
|     1|  male|    0|    0|54.0|51.8625|    0|
|     3|  male|    3|    1| 2.0| 21.075|    0|
|     3|female|    0|    2|27.0|11.1333|    1|
|     2|female|    1|    0|14.0|30.0708|    1|
+------+------+-----+-----+----+-------+-----+
only showing top 10 rows



In [38]:
null_counts = data.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in data.columns]
).collect()[0].asDict()

null_counts

{'Pclass': 0,
 'Sex': 0,
 'SibSp': 0,
 'Parch': 0,
 'Age': 0,
 'Fare': 0,
 'label': 0}

## Splitting the data

In [39]:
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 593  Testing Rows: 298


## Define the pipeline

In [40]:
# converts string values to indexes for categorical features
strIdx = StringIndexer(inputCol = "Sex", outputCol = "SexIdx")

# combines categorical features into a single vector
catVect = VectorAssembler(inputCols = ["SexIdx", "Pclass"], outputCol="catFeatures")

# creates indexes for a vector of categorical features
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

# creates a vector of continuous numeric features
numVect = VectorAssembler(inputCols = ["SibSp", "Parch", "Age", "Fare"], outputCol="numFeatures")

# normalizes continuous numeric features
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")

# creates a vector of categorical and continuous features
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")

# trains a classification model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=100, regParam=0.3)


pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

## Fit the pipeline to training data

In [41]:
pipelineModel = pipeline.fit(train)
pipelineModel

PipelineModel_fbeb60fe3f7a

## Test the predictions

In [48]:
prediction = pipelineModel.transform(test)
predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(100, truncate=False)

+----------------------------------------------------------------------------+----------+---------+
|features                                                                    |prediction|trueLabel|
+----------------------------------------------------------------------------+----------+---------+
|[1.0,0.0,0.0,0.0,0.19577783362653933,0.16883675574220638]                   |1.0       |1        |
|[1.0,0.0,0.0,0.0,0.2586076903744659,0.15458810467956932]                    |1.0       |1        |
|[1.0,0.0,0.0,0.0,0.2963056044232219,0.13526459159462315]                    |1.0       |1        |
|[1.0,0.0,0.0,0.0,0.37170143252073384,0.060507970265993034]                  |1.0       |1        |
|[1.0,0.0,0.0,0.0,0.37170143252073384,0.11111839809247648]                   |1.0       |1        |
|[1.0,0.0,0.0,0.0,0.37170143252073384,0.18249984580226933]                   |1.0       |1        |
|[1.0,0.0,0.0,0.0,0.37170143252073384,0.20772776566317125]                   |1.0       |1        |


## Evaluating the model

### Basic Metrics

In [49]:
tp = float(predicted.filter("prediction == 1.0 AND trueLabel == 1").count())
fp = float(predicted.filter("prediction == 1.0 AND trueLabel == 0").count())
tn = float(predicted.filter("prediction == 0.0 AND trueLabel == 0").count())
fn = float(predicted.filter("prediction == 0.0 AND trueLabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
acc = (tp + tn) / (tp + tn + fp + fn)
metrics = [
    ("TP", tp),
    ("FP", fp),
    ("TN", tn),
    ("FN", fn),
    ("Accuracy", acc),
    ("Precision", pr),
    ("Recall", re),
    ("F1", 2*pr*re/(re+pr))
]

for metric in metrics:
    print(metric[0], metric[1])

TP 55.0
FP 5.0
TN 181.0
FN 57.0
Accuracy 0.7919463087248322
Precision 0.9166666666666666
Recall 0.49107142857142855
F1 0.6395348837209303


### Area under ROC 

In [50]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(prediction)
print ("AUR = ", aur)

AUR =  0.8405097926267276


In [51]:
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=False)

+--------------------------------------------+----------------------------------------+----------+---------+
|rawPrediction                               |probability                             |prediction|trueLabel|
+--------------------------------------------+----------------------------------------+----------+---------+
|[-0.7971267639840626,0.7971267639840626]    |[0.3106404671578376,0.6893595328421624] |1.0       |1        |
|[-0.7432661064531597,0.7432661064531597]    |[0.3222903494762376,0.6777096505237624] |1.0       |1        |
|[-0.6945151789868745,0.6945151789868745]    |[0.3330294030263333,0.6669705969736667] |1.0       |1        |
|[-0.5419338205156712,0.5419338205156712]    |[0.367737841105456,0.6322621588945441]  |1.0       |1        |
|[-0.6191320617672343,0.6191320617672343]    |[0.34997887615459816,0.6500211238454019]|1.0       |1        |
|[-0.7280132243624037,0.7280132243624037]    |[0.3256308629816599,0.6743691370183401] |1.0       |1        |
|[-0.76649444507790

## Hyperparameter Tuning

In [54]:
paramGrid = ParamGridBuilder().addGrid(
    lr.regParam, [0.3, 0.1]
).addGrid(
    lr.maxIter, [100, 50, 20, 10, 5]
).addGrid(
    lr.threshold, [0.5, 0.4, 0.3]
).build()

cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(), estimatorParamMaps=paramGrid, 
                    numFolds=2)

model = cv.fit(train)

newPrediction = model.transform(test)
newPredicted = prediction.select("features", "prediction", "trueLabel")
newPredicted.show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.0,...|       1.0|        1|
|[1.0,0.0,0.0,0.16...|       1.0|        1|
|[1.0,0.0,0.0,0.16...|       1.0|        1|
|[1.0,0.0,0.0,0.33...|       1.0|        1|
|[1.0,0.0,0.125,0....|       1.0|        1|
|[1.0,0.0,0.125,0....|       1.0|        1|
|[1.0,0.0,0.125,0....|       1.0|        1|
|[1.0,0.0,0.125,0....|       1.0|        1|
|[1.0,0.0,0.125,0....|       1.0|        1|
|[1.0,0.0,0.125,0....|       1.0

In [55]:
# Recalculate basic metrics
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
acc2 = (tp2 + tn2) / (tp2 + tn2 + fp2 + fn2)
metrics = [
    ("TP", tp2),
    ("FP", fp2),
    ("TN", tn2),
    ("FN", fn2),
    ("Accuracy", acc2),
    ("Precision", pr2),
    ("Recall", re2),
    ("F1", 2*pr2*re2/(re2+pr2))
]

for metric in metrics:
    print(metric[0], metric[1])

TP 70.0
FP 18.0
TN 168.0
FN 42.0
Accuracy 0.7986577181208053
Precision 0.7954545454545454
Recall 0.625
F1 0.7


In [56]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator.evaluate(prediction)
print( "AUR2 = ", aur2)

AUR2 =  0.8405097926267276


## Summary

We have successfully built a logistic regression classifier for the titanic dataset to predict whether a passenger will survive or not.

The final metrics have been acheived after hyperparameter tuning via 2 - fold cross validation.