# PySpark Challenge: Titanic

In [1]:
# create SparkContext

import pyspark
from pyspark import SparkContext
sc =SparkContext()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/31 19:02:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# create sqlContext

from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)



In [3]:
# read csv into PySpark df

df = sqlContext.read.option("inferSchema",True).option("header",True).csv("titanic_dataset.csv")

### Data Exploration

In [4]:
# see dtypes

df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
# .head()

df.show(5, truncate = False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

In [6]:
df.describe().show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

                                                                                

In [7]:
df.crosstab('sex', 'survived').sort("sex_survived").show()

+------------+---+---+
|sex_survived|  0|  1|
+------------+---+---+
|      female| 81|233|
|        male|468|109|
+------------+---+---+



In [8]:
df.groupBy('parch').count().sort('parch').show()

+-----+-----+
|parch|count|
+-----+-----+
|    0|  678|
|    1|  118|
|    2|   80|
|    3|    5|
|    4|    4|
|    5|    5|
|    6|    1|
+-----+-----+



In [9]:
df.groupBy('sibsp').count().sort('sibsp').show()

+-----+-----+
|sibsp|count|
+-----+-----+
|    0|  608|
|    1|  209|
|    2|   28|
|    3|   16|
|    4|   18|
|    5|    5|
|    8|    7|
+-----+-----+



In [10]:
# find nans

from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|    0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [11]:
# find null

from pyspark.sql.functions import isnull

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



### Data Prep

In [12]:
# drop cabin due to ~so many~ nulls

df = df.drop('Cabin')

In [13]:
# drop name - not necessary for prediction

df = df.drop('Name')

In [14]:
# drop ticket - not necessary for prediction

df = df.drop('Ticket')

In [15]:
# drop PassengerID - not necessary for prediction

df = df.drop('PassengerID')

In [16]:
df.show(5)

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 5 rows



In [17]:
# Impute missing values for Age

from pyspark.ml.feature import Imputer

# instantiate
imputer = Imputer()

# set input and output columns
imputer.setInputCols(['Age'])
imputer.setOutputCols(['age_imputed'])

# apply imputer
model = imputer.fit(df)
df = model.transform(df)

In [18]:
# find null

from pyspark.sql.functions import isnull

df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+--------+------+---+---+-----+-----+----+--------+-----------+
|Survived|Pclass|Sex|Age|SibSp|Parch|Fare|Embarked|age_imputed|
+--------+------+---+---+-----+-----+----+--------+-----------+
|       0|     0|  0|177|    0|    0|   0|       2|          0|
+--------+------+---+---+-----+-----+----+--------+-----------+



In [19]:
# drop age (we only need age_imputed)

df = df.drop('Age')

In [20]:
# Encode Sex

from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="Sex", outputCol="sex_encoded")
model = stringIndexer.fit(df)
df = model.transform(df)

In [21]:
# drop now rendundant sex column

df = df.drop('Sex')

In [22]:
df.show(10)

+--------+------+-----+-----+-------+--------+-----------------+-----------+
|Survived|Pclass|SibSp|Parch|   Fare|Embarked|      age_imputed|sex_encoded|
+--------+------+-----+-----+-------+--------+-----------------+-----------+
|       0|     3|    1|    0|   7.25|       S|             22.0|        0.0|
|       1|     1|    1|    0|71.2833|       C|             38.0|        1.0|
|       1|     3|    0|    0|  7.925|       S|             26.0|        1.0|
|       1|     1|    1|    0|   53.1|       S|             35.0|        1.0|
|       0|     3|    0|    0|   8.05|       S|             35.0|        0.0|
|       0|     3|    0|    0| 8.4583|       Q|29.69911764705882|        0.0|
|       0|     1|    0|    0|51.8625|       S|             54.0|        0.0|
|       0|     3|    3|    1| 21.075|       S|              2.0|        0.0|
|       1|     3|    0|    2|11.1333|       S|             27.0|        1.0|
|       1|     2|    1|    0|30.0708|       C|             14.0|        1.0|

In [25]:
# drop two rows with null values in embarked

df = df.na.drop(subset=["Embarked"])

In [26]:
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+--------+------+-----+-----+----+--------+-----------+-----------+
|Survived|Pclass|SibSp|Parch|Fare|Embarked|age_imputed|sex_encoded|
+--------+------+-----+-----+----+--------+-----------+-----------+
|       0|     0|    0|    0|   0|       0|          0|          0|
+--------+------+-----+-----+----+--------+-----------+-----------+



In [27]:
# Encode Embarked

from pyspark.ml.feature import OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="embarked_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="embarked_encoded", outputCol="embarked_vec").fit(indexed)
df = encoder.transform(indexed)

In [28]:
# drop columns embarked and embarked_encoded, as we only wish to keep vectorized embarked data

df = df.drop('embarked')
df = df.drop('embarked_encoded')

In [29]:
df.show(5)

+--------+------+-----+-----+-------+-----------+-----------+-------------+
|Survived|Pclass|SibSp|Parch|   Fare|age_imputed|sex_encoded| embarked_vec|
+--------+------+-----+-----+-------+-----------+-----------+-------------+
|       0|     3|    1|    0|   7.25|       22.0|        0.0|(3,[0],[1.0])|
|       1|     1|    1|    0|71.2833|       38.0|        1.0|(3,[1],[1.0])|
|       1|     3|    0|    0|  7.925|       26.0|        1.0|(3,[0],[1.0])|
|       1|     1|    1|    0|   53.1|       35.0|        1.0|(3,[0],[1.0])|
|       0|     3|    0|    0|   8.05|       35.0|        0.0|(3,[0],[1.0])|
+--------+------+-----+-----+-------+-----------+-----------+-------------+
only showing top 5 rows



In [31]:
assemblerInputs = ['Pclass', 'SibSp', 'Parch', 'Fare', 'age_imputed', 'sex_encoded', 'embarked_vec']

In [32]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
df = assembler.transform(df)

In [33]:
df.show(5)

+--------+------+-----+-----+-------+-----------+-----------+-------------+--------------------+
|Survived|Pclass|SibSp|Parch|   Fare|age_imputed|sex_encoded| embarked_vec|            features|
+--------+------+-----+-----+-------+-----------+-----------+-------------+--------------------+
|       0|     3|    1|    0|   7.25|       22.0|        0.0|(3,[0],[1.0])|[3.0,1.0,0.0,7.25...|
|       1|     1|    1|    0|71.2833|       38.0|        1.0|(3,[1],[1.0])|[1.0,1.0,0.0,71.2...|
|       1|     3|    0|    0|  7.925|       26.0|        1.0|(3,[0],[1.0])|[3.0,0.0,0.0,7.92...|
|       1|     1|    1|    0|   53.1|       35.0|        1.0|(3,[0],[1.0])|[1.0,1.0,0.0,53.1...|
|       0|     3|    0|    0|   8.05|       35.0|        0.0|(3,[0],[1.0])|(9,[0,3,4,6],[3.0...|
+--------+------+-----+-----+-------+-----------+-----------+-------------+--------------------+
only showing top 5 rows



In [34]:
# transform to dense vector
from pyspark.ml.linalg import DenseVector
input_data = df.rdd.map(lambda x: (x["Survived"], DenseVector(x["features"])))

### Fit and Prediction - LR

In [35]:
# Split the data into train and test sets
train_data, test_data = df.randomSplit([.8,.2],seed=808)

In [36]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="Survived",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [37]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.28245622608820803,-0.06590005996362967,0.033547311904343645,0.002397111864947558,-0.008231932747059299,0.9936720840407006,-0.1861644324436586,0.22654112571168128,0.03577423765134244]
Intercept: 0.07838496194652372


In [38]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [40]:
selected = predictions.select("survived", "prediction", "probability")
selected.show(20)

+--------+----------+--------------------+
|survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.70156355193123...|
|       0|       0.0|[0.70300903057448...|
|       0|       0.0|[0.59478640032728...|
|       0|       0.0|[0.54430737577575...|
|       0|       0.0|[0.67049834347800...|
|       0|       0.0|[0.58187795902624...|
|       0|       0.0|[0.61898819410207...|
|       0|       0.0|[0.54981276166594...|
|       0|       0.0|[0.68853944905941...|
|       0|       0.0|[0.68853944905941...|
|       0|       0.0|[0.74316737238060...|
|       0|       0.0|[0.70000628457327...|
|       0|       0.0|[0.72363703581669...|
|       0|       0.0|[0.60186795409532...|
|       0|       0.0|[0.70725003832886...|
|       0|       0.0|[0.72235086827777...|
|       0|       0.0|[0.73855509207932...|
|       0|       0.0|[0.61273559127477...|
|       0|       0.0|[0.75128422672661...|
|       0|       0.0|[0.79551789111193...|
+--------+-

In [42]:
cm = predictions.select("survived", "prediction")

In [44]:
cm.groupby('survived').agg({'survived': 'count'}).show()

+--------+---------------+
|survived|count(survived)|
+--------+---------------+
|       1|             61|
|       0|            107|
+--------+---------------+



In [47]:
# accuracy
cm.filter(cm.survived == cm.prediction).count() / cm.count()

0.7857142857142857

In [49]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("survived", "prediction")
    acc = cm.filter(cm.survived == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

Model accuracy: 78.571%


### Fit and Prediction - Random Forest

In [63]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')

# Fit the data to the model
RFModel = rf.fit(train_data)

In [64]:
predictions_rf = RFModel.transform(test_data)

In [65]:
selected = predictions_rf.select("survived", "prediction", "probability")
selected.show(20)

+--------+----------+--------------------+
|survived|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.80884837119048...|
|       0|       0.0|[0.72414880521785...|
|       0|       0.0|[0.73590509766430...|
|       0|       0.0|[0.69470968777086...|
|       0|       0.0|[0.71565489530881...|
|       0|       0.0|[0.70866309572426...|
|       0|       0.0|[0.65000012396856...|
|       0|       0.0|[0.57941082209155...|
|       0|       0.0|[0.81984058468933...|
|       0|       0.0|[0.81984058468933...|
|       0|       0.0|[0.85378678394870...|
|       0|       0.0|[0.85187910910478...|
|       0|       0.0|[0.85378678394870...|
|       0|       0.0|[0.78596085493210...|
|       0|       0.0|[0.82739129625827...|
|       0|       0.0|[0.82739129625827...|
|       0|       0.0|[0.82739129625827...|
|       0|       0.0|[0.74924035293360...|
|       0|       0.0|[0.79453288698681...|
|       0|       0.0|[0.90234438272336...|
+--------+-

In [67]:
cm_rf = predictions_rf.select("survived", "prediction")

In [68]:
cm_rf.groupby('survived').agg({'survived': 'count'}).show()

+--------+---------------+
|survived|count(survived)|
+--------+---------------+
|       1|             61|
|       0|            107|
+--------+---------------+



In [69]:
accuracy_m(model = RFModel)

Model accuracy: 79.167%
