<a href="https://colab.research.google.com/github/onemarcos/Data-Science-Projects/blob/main/Spark-Titanic/spark_titanic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# installing dependencies 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
!tar xf spark-3.3.0-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
# setting necessary variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.0-bin-hadoop3"

# importing pyspark
import findspark
findspark.init('spark-3.3.0-bin-hadoop3')

In [10]:
# starting a spark session
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
sc = SparkSession.builder.master('local[*]').getOrCreate()

# loading csv data
df = sc.read.csv("train.csv", inferSchema=True, header=True)

# checking schema data types
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [None]:
df.count()

891

In [12]:
df.groupBy("Sex","Survived").count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  468|
|female|       1|  233|
|female|       0|   81|
|  male|       1|  109|
+------+--------+-----+



In [5]:
df.select("PassengerId","Sex","Age","Ticket").show(5)

+-----------+------+----+----------------+
|PassengerId|   Sex| Age|          Ticket|
+-----------+------+----+----------------+
|          1|  male|22.0|       A/5 21171|
|          2|female|38.0|        PC 17599|
|          3|female|26.0|STON/O2. 3101282|
|          4|female|35.0|          113803|
|          5|  male|35.0|          373450|
+-----------+------+----+----------------+
only showing top 5 rows



In [6]:
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [7]:
df.select("Pclass").distinct().show()

+------+
|Pclass|
+------+
|     1|
|     3|
|     2|
+------+



In [8]:
from pyspark.sql import functions as F

# Counting and Removing Null values
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [11]:
mean_age = df.select(mean('Age')).collect()[0][0]
print(mean_age)

29.69911764705882


In [13]:
df.select("Name").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



Assigning the mean values of Mr and Mrs to the respective groups
Using the Regex ""[A-Za-z]+)." we extract the initials from the Name. It looks for strings which lie between A-Z or a-z and followed by a .(dot).

In [14]:
df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

In [15]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Initial|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|     Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|    Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|   Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|    Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|     Mr|
|          6|   

In [16]:
df.select("Initial").distinct().show()

+--------+
| Initial|
+--------+
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



In [17]:
# Fixing misspelled Initials like Mlle or Mme that stand for Miss
df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])


In [18]:
df.select("Initial").distinct().show()

+-------+
|Initial|
+-------+
|   Miss|
|  Other|
| Master|
|     Mr|
|    Mrs|
+-------+



In [19]:
# Imputing missing values in age feature based on average age of Initials
df.groupby('Initial').avg('Age').collect()

[Row(Initial='Miss', avg(Age)=21.86),
 Row(Initial='Other', avg(Age)=45.888888888888886),
 Row(Initial='Master', avg(Age)=4.574166666666667),
 Row(Initial='Mr', avg(Age)=32.73960880195599),
 Row(Initial='Mrs', avg(Age)=35.981818181818184)]

In [20]:
df = df.withColumn("Age",when((df["Initial"] == "Miss") & (df["Age"].isNull()), 22).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Other") & (df["Age"].isNull()), 46).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Master") & (df["Age"].isNull()), 5).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Mr") & (df["Age"].isNull()), 33).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Mrs") & (df["Age"].isNull()), 36).otherwise(df["Age"]))

In [21]:
df.filter(df.Age==46).select("Initial").show()

+-------+
|Initial|
+-------+
|     Mr|
|     Mr|
|     Mr|
+-------+



In [22]:
df.select("Age").show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|33.0|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|33.0|
|31.0|
|36.0|
+----+
only showing top 20 rows



In [23]:
#Embarked feature has only two missining values
df.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [25]:
# Dropping unnecessary columns
df = df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

In [26]:
# Puting all features into vector
feature = VectorAssembler(inputCols=df.columns[1:],outputCol="features")
feature_vector= feature.transform(df)

In [27]:
feature_vector.show()

+--------+------+----+-----+-----+-------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|            features|
+--------+------+----+-----+-----+-------+--------------------+
|       0|     3|22.0|    1|    0|   7.25|[3.0,22.0,1.0,0.0...|
|       1|     1|38.0|    1|    0|71.2833|[1.0,38.0,1.0,0.0...|
|       1|     3|26.0|    0|    0|  7.925|[3.0,26.0,0.0,0.0...|
|       1|     1|35.0|    1|    0|   53.1|[1.0,35.0,1.0,0.0...|
|       0|     3|35.0|    0|    0|   8.05|[3.0,35.0,0.0,0.0...|
|       0|     3|33.0|    0|    0| 8.4583|[3.0,33.0,0.0,0.0...|
|       0|     1|54.0|    0|    0|51.8625|[1.0,54.0,0.0,0.0...|
|       0|     3| 2.0|    3|    1| 21.075|[3.0,2.0,3.0,1.0,...|
|       1|     3|27.0|    0|    2|11.1333|[3.0,27.0,0.0,2.0...|
|       1|     2|14.0|    1|    0|30.0708|[2.0,14.0,1.0,0.0...|
|       1|     3| 4.0|    1|    1|   16.7|[3.0,4.0,1.0,1.0,...|
|       1|     1|58.0|    0|    0|  26.55|[1.0,58.0,0.0,0.0...|
|       0|     3|20.0|    0|    0|   8.0

In [28]:
# Spliting it into training and test
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

# Modelling

## LogisticRegression

In [29]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
# Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,22.0,0.0,0.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       1.0|       0|[1.0,30.0,0.0,0.0...|
|       1.0|       0|[1.0,31.0,1.0,0.0...|
|       1.0|       0|(5,[0,1],[1.0,33.0])|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,37.0,1.0,0.0...|
|       1.0|       0|[1.0,38.0,0.0,1.0...|
|       1.0|       0|[1.0,45.0,1.0,0.0...|
|       1.0|       0|[1.0,47.0,0.0,0.0...|
|       1.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|[1.0,62.0,0.0,0.0...|
|       0.0|       0|[1.0,65.0,0.0,0.0...|
|       0.0|       0|[1.0,71.0,0.0,0.0...|
|       1.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [30]:
# Evaluating accuracy of LogisticRegression
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

Accuracy of LogisticRegression is = 0.691489
Test Error of LogisticRegression = 0.308511 


## Decision Tree Classifier

In [31]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "Survived", "features").show()


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,22.0,0.0,0.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       1.0|       0|[1.0,30.0,0.0,0.0...|
|       1.0|       0|[1.0,31.0,1.0,0.0...|
|       0.0|       0|(5,[0,1],[1.0,33.0])|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,37.0,1.0,0.0...|
|       1.0|       0|[1.0,38.0,0.0,1.0...|
|       1.0|       0|[1.0,45.0,1.0,0.0...|
|       0.0|       0|[1.0,47.0,0.0,0.0...|
|       1.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|[1.0,62.0,0.0,0.0...|
|       0.0|       0|[1.0,65.0,0.0,0.0...|
|       0.0|       0|[1.0,71.0,0.0,0.0...|
|       0.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [32]:
# Evaluating accuracy of DecisionTreeClassifier
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))

Accuracy of DecisionTreeClassifier is = 0.691489
Test Error of DecisionTreeClassifier = 0.308511 


## Random Forest Classifier

In [34]:
from pyspark.ml.classification import RandomForestClassifier
rf = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,22.0,0.0,0.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       1.0|       0|[1.0,30.0,0.0,0.0...|
|       1.0|       0|[1.0,31.0,1.0,0.0...|
|       0.0|       0|(5,[0,1],[1.0,33.0])|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,37.0,1.0,0.0...|
|       1.0|       0|[1.0,38.0,0.0,1.0...|
|       1.0|       0|[1.0,45.0,1.0,0.0...|
|       0.0|       0|[1.0,47.0,0.0,0.0...|
|       1.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|[1.0,62.0,0.0,0.0...|
|       0.0|       0|[1.0,65.0,0.0,0.0...|
|       0.0|       0|[1.0,71.0,0.0,0.0...|
|       0.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [35]:
# Evaluating accuracy of RandomForestClassifier
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

Accuracy of RandomForestClassifier is = 0.691489
Test Error of RandomForestClassifier  = 0.308511 


## Gradient-boosted tree classifier

In [36]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,22.0,0.0,0.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       1.0|       0|[1.0,30.0,0.0,0.0...|
|       1.0|       0|[1.0,31.0,1.0,0.0...|
|       0.0|       0|(5,[0,1],[1.0,33.0])|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,37.0,1.0,0.0...|
|       1.0|       0|[1.0,38.0,0.0,1.0...|
|       1.0|       0|[1.0,45.0,1.0,0.0...|
|       0.0|       0|[1.0,47.0,0.0,0.0...|
|       1.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|[1.0,62.0,0.0,0.0...|
|       0.0|       0|[1.0,65.0,0.0,0.0...|
|       0.0|       0|[1.0,71.0,0.0,0.0...|
|       1.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [37]:
# Evaluate accuracy of Gradient-boosted
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))

Accuracy of Gradient-boosted tree classifie is = 0.68617
Test Error of Gradient-boosted tree classifie 0.31383


## Naive Bayes

In [39]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
nb_model = nb.fit(trainingData)
nb_prediction = nb_model.transform(testData)
nb_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,22.0,0.0,0.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|[1.0,29.0,0.0,0.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       0.0|       0|[1.0,30.0,0.0,0.0...|
|       1.0|       0|[1.0,31.0,1.0,0.0...|
|       0.0|       0|(5,[0,1],[1.0,33.0])|
|       0.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,33.0,0.0,0.0...|
|       1.0|       0|[1.0,37.0,1.0,0.0...|
|       1.0|       0|[1.0,38.0,0.0,1.0...|
|       1.0|       0|[1.0,45.0,1.0,0.0...|
|       0.0|       0|[1.0,47.0,0.0,0.0...|
|       1.0|       0|[1.0,58.0,0.0,2.0...|
|       0.0|       0|[1.0,62.0,0.0,0.0...|
|       0.0|       0|[1.0,65.0,0.0,0.0...|
|       0.0|       0|[1.0,71.0,0.0,0.0...|
|       1.0|       0|[2.0,19.0,1.0,1.0...|
+----------

In [40]:
# Evaluating accuracy of Naive Bayes
nb_accuracy = evaluator.evaluate(nb_prediction)
print("Accuracy of NaiveBayes is  = %g"% (nb_accuracy))
print("Test Error of NaiveBayes  = %g " % (1.0 - nb_accuracy))

Accuracy of NaiveBayes is  = 0.723404
Test Error of NaiveBayes  = 0.276596 
