# **PySpark Demonstration - Machine Learning with the Titantic Dataset**

# 1. Running Pyspark in Colab & Some Basic Operations


To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab. One important note is that if you are new in Spark, it is better to avoid Spark 2.4.0 version since some people have already complained about its compatibility issue with python. Follow the steps to install the dependencies:

If link stops working go to: 
- https://downloads.apache.org/spark/ 
- look for the latest version of spark and download the file with the namign structure like - spark-X.X.X-bin-hadoopX.X.tgz
- change the URL on the !wget line and the !tar line to match the new URL and filename

In [1]:
# Install Pyspark & Setup our Java Environment - Takes less than a minute

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

spark-3.0.1-bin-hadoop3.2/
spark-3.0.1-bin-hadoop3.2/RELEASE
spark-3.0.1-bin-hadoop3.2/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/
spark-3.0.1-bin-hadoop3.2/examples/src/main/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/FPGrowthExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/GBTExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala
spark-3.0.1-bin-hadoop3.2/examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala
spark-3.0.1-bin-hadoop3.2/example

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Set the location of Java and Spark by running the following code:

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

Run a local spark session to test your installation:

In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

We first define the Spark Session using SparkSession. 
It is the entry point into all functionality in Spark is the SparkSession class.

*  **.builder** — gives access to Builder API which is used to configure the session .
*  .**master()** — determines where the program will run; "local[*]" sets it to run locally on all cores but you can use "local[1]" to run on one core for example. In this case, our programs will be run on Google’s servers.
*  **.appName()** — optional method to name the Spark Application
*  **.getOrCreate()** — gets an existing SparkSession or creates new one if none exists

In [5]:
spark = SparkSession.builder.master("local[*]").appName("Spark ML example on titanic data").getOrCreate()

Load our dataset. Note it needs to be uploaded here manually

In [6]:
!wget https://github.com/rajeevratan84/datascienceforbusiness/raw/master/titanic.csv

--2020-10-31 04:04:21--  https://github.com/rajeevratan84/datascienceforbusiness/raw/master/titanic.csv
Resolving github.com (github.com)... 13.114.40.48
Connecting to github.com (github.com)|13.114.40.48|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/rajeevratan84/datascienceforbusiness/master/titanic.csv [following]
--2020-10-31 04:04:22--  https://raw.githubusercontent.com/rajeevratan84/datascienceforbusiness/master/titanic.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 85153 (83K) [text/plain]
Saving to: ‘titanic.csv.3’


2020-10-31 04:04:22 (2.73 MB/s) - ‘titanic.csv.3’ saved [85153/85153]



In [7]:
titanic_df = spark.read.csv("titanic.csv", header = 'True', inferSchema='True')
titanic_df.head()

Row(pclass=1, name='Allen, Miss. Elisabeth Walton', sex='female', age=29.0, sibsp=0, parch=0, ticket='24160', fare=211.3375, cabin='B5', embarked='S', survived=1)

In [8]:
titanic_df.show(5)

+------+--------------------+------+------+-----+-----+------+--------+-------+--------+--------+
|pclass|                name|   sex|   age|sibsp|parch|ticket|    fare|  cabin|embarked|survived|
+------+--------------------+------+------+-----+-----+------+--------+-------+--------+--------+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0| 24160|211.3375|     B5|       S|       1|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|113781|  151.55|C22 C26|       S|       1|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|113781|  151.55|C22 C26|       S|       0|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|113781|  151.55|C22 C26|       S|       0|
|     1|Allison, Mrs. Hud...|female|  25.0|    1|    2|113781|  151.55|C22 C26|       S|       0|
+------+--------------------+------+------+-----+-----+------+--------+-------+--------+--------+
only showing top 5 rows



In [9]:
titanic_df.count()

1309

In [10]:
titanic_df.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- survived: integer (nullable = true)



In [11]:
titanic_df.select("Survived","Pclass","Embarked").show()

+--------+------+--------+
|Survived|Pclass|Embarked|
+--------+------+--------+
|       1|     1|       S|
|       1|     1|       S|
|       0|     1|       S|
|       0|     1|       S|
|       0|     1|       S|
|       1|     1|       S|
|       1|     1|       S|
|       0|     1|       S|
|       1|     1|       S|
|       0|     1|       C|
|       0|     1|       C|
|       1|     1|       C|
|       1|     1|       C|
|       1|     1|       S|
|       1|     1|       S|
|       0|     1|       S|
|       0|     1|       C|
|       1|     1|       C|
|       1|     1|       C|
|       0|     1|       C|
+--------+------+--------+
only showing top 20 rows



## Let's attempt some basic exploratory data analysis (EDA)

In [12]:
titanic_df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  500|
|       0|  809|
+--------+-----+



In [13]:
titanic_df.groupBy("Sex","Survived").count().show()

+------+--------+-----+
|   Sex|Survived|count|
+------+--------+-----+
|  male|       0|  682|
|female|       1|  339|
|female|       0|  127|
|  male|       1|  161|
+------+--------+-----+



In [14]:
titanic_df.groupBy("Pclass","Survived").count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|  123|
|     3|       1|  181|
|     1|       1|  200|
|     2|       1|  119|
|     2|       0|  158|
|     3|       0|  528|
+------+--------+-----+



In [58]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [59]:
# Calling function
null_columns_count_list = null_value_count(titanic_df)
null_columns_count_list

[('fare', 1)]

In [60]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                  fare|                1|
+----------------------+-----------------+



We have 263 null age values, still let's check our mean age 

In [18]:
mean_age = titanic_df.select(mean('Age')).collect()[0][0]
print(mean_age)

29.8811345124283


To replace these NaN values, we can assign them the mean age of the dataset.But the problem is, there were many people with many different ages. We just cant assign a 4 year kid with the mean age that is 29 years.

we can check the Name feature. Looking upon the feature, we can see that the names have a salutation like Mr or Mrs. Thus we can assign the mean values of Mr and Mrs to the respective groups

In [19]:
titanic_df.show()

+------+--------------------+------+------+-----+-----+--------+--------+-------+--------+--------+
|pclass|                name|   sex|   age|sibsp|parch|  ticket|    fare|  cabin|embarked|survived|
+------+--------------------+------+------+-----+-----+--------+--------+-------+--------+--------+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|     B5|       S|       1|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|C22 C26|       S|       1|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|
|     1|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|
|     1| Anderson, Mr. Harry|  male|  48.0|    0|    0|   19952|   26.55|    E12|       S|       1|
|     1|Andrews, Miss. Ko...|female|  63.0|    1|    0|   13502| 77.9583|     D7|       S|       1|


In [20]:
# Using the Regex ""[A-Za-z]+)." we extract the initials from the Name. 
# It looks for strings which lie between A-Z or a-z and followed by a .(dot).
titanic_df = titanic_df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

In [21]:
titanic_df.show()

+------+--------------------+------+------+-----+-----+--------+--------+-------+--------+--------+-------+
|pclass|                name|   sex|   age|sibsp|parch|  ticket|    fare|  cabin|embarked|survived|Initial|
+------+--------------------+------+------+-----+-----+--------+--------+-------+--------+--------+-------+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|     B5|       S|       1|   Miss|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|C22 C26|       S|       1| Master|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|   Miss|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|     Mr|
|     1|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|    Mrs|
|     1| Anderson, Mr. Harry|  male|  48.0|    0|    0|   19952|   26.55|    E12|       S|       1|     Mr|
|     1|Andrews, Miss. Ko...

In [22]:
titanic_df.select("Initial").distinct().show()

+--------+
| Initial|
+--------+
|    Dona|
|     Don|
|    Miss|
|Countess|
|     Col|
|    Lady|
|     Rev|
|  Master|
|     Mme|
|    Capt|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



We can see some misspelled initials like Mlle or Mme that stand for Miss. Let's replace them with Miss and same thing for other values.

In [23]:
titanic_df = titanic_df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Dona','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs','Mrs', 'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [24]:
titanic_df.select("Initial").distinct().show()

+-------+
|Initial|
+-------+
|   Miss|
|  Other|
| Master|
|     Mr|
|    Mrs|
+-------+



So now we can check the average age for each initial category

In [25]:
titanic_df.groupby('Initial').avg('Age').collect()

[Row(Initial='Miss', avg(Age)=21.83450186915888),
 Row(Initial='Other', avg(Age)=44.92307692307692),
 Row(Initial='Master', avg(Age)=5.482703773584906),
 Row(Initial='Mr', avg(Age)=32.545531197301855),
 Row(Initial='Mrs', avg(Age)=37.04624277456647)]

# **2. Let's do some Transformations** 

### Let's impute missing values in age feature based on average age of Initials

In [26]:
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Miss") & (titanic_df["Age"].isNull()), 22).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Other") & (titanic_df["Age"].isNull()), 44).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Master") & (titanic_df["Age"].isNull()), 5).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mr") & (titanic_df["Age"].isNull()), 33).otherwise(titanic_df["Age"]))
titanic_df = titanic_df.withColumn("Age",when((titanic_df["Initial"] == "Mrs") & (titanic_df["Age"].isNull()), 37).otherwise(titanic_df["Age"]))

In [27]:
# Check the imputation
titanic_df.filter(titanic_df.Age==5).select("Initial").show()

+-------+
|Initial|
+-------+
|   Miss|
| Master|
|   Miss|
|   Miss|
| Master|
|   Miss|
| Master|
| Master|
| Master|
| Master|
| Master|
| Master|
| Master|
+-------+



In [28]:
titanic_df.show()

+------+--------------------+------+------+-----+-----+--------+--------+-------+--------+--------+-------+
|pclass|                name|   sex|   Age|sibsp|parch|  ticket|    fare|  cabin|embarked|survived|Initial|
+------+--------------------+------+------+-----+-----+--------+--------+-------+--------+--------+-------+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|     B5|       S|       1|   Miss|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|C22 C26|       S|       1| Master|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|   Miss|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|     Mr|
|     1|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|C22 C26|       S|       0|    Mrs|
|     1| Anderson, Mr. Harry|  male|  48.0|    0|    0|   19952|   26.55|    E12|       S|       1|     Mr|
|     1|Andrews, Miss. Ko...

In [29]:
# Now let's look at Embarked

titanic_df.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|  123|
|    null|    2|
|       C|  270|
|       S|  914|
+--------+-----+



Embarked feature has only two missining values. 
Let's check values within Embarked
Majority Passengers boarded from "S". We can impute with "S"

In [30]:
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})

In [31]:
# Let's drop Cabin features as it has lots of null values
titanic_df = titanic_df.drop("Cabin")

In [32]:
titanic_df.show()

+------+--------------------+------+------+-----+-----+--------+--------+--------+--------+-------+
|pclass|                name|   sex|   Age|sibsp|parch|  ticket|    fare|embarked|survived|Initial|
+------+--------------------+------+------+-----+-----+--------+--------+--------+--------+-------+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|       S|       1|   Miss|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|       S|       1| Master|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|       S|       0|   Miss|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|       S|       0|     Mr|
|     1|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|       S|       0|    Mrs|
|     1| Anderson, Mr. Harry|  male|  48.0|    0|    0|   19952|   26.55|       S|       1|     Mr|
|     1|Andrews, Miss. Ko...|female|  63.0|    1|    0|   13502| 77.9583|       S|       1|   Miss|


In [33]:
titanic_df.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- embarked: string (nullable = false)
 |-- survived: integer (nullable = true)
 |-- Initial: string (nullable = true)



We can create a new feature called "Family_size" and "Alone" and analyse it. This feature is the summation of Parch(parents/children) and SibSp(siblings/spouses). It gives us a combined data so that we can check if survival rate have anything to do with family size of the passengers



In [34]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))
titanic_df.groupBy("Family_Size").count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  235|
|          6|   16|
|          3|   43|
|          5|   25|
|          4|   22|
|          7|    8|
|         10|   11|
|          2|  159|
|          0|  790|
+-----------+-----+



Let's create a column for Alone, with a 0 or 1 value with 1 meaning that person was alone

In [35]:
titanic_df = titanic_df.withColumn('Alone',lit(0))
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

In [36]:
# Sanity check to see if our column Alone is there
titanic_df.columns

['pclass',
 'name',
 'sex',
 'Age',
 'sibsp',
 'parch',
 'ticket',
 'fare',
 'embarked',
 'survived',
 'Initial',
 'Family_Size',
 'Alone']

In [37]:
titanic_df.show()

+------+--------------------+------+------+-----+-----+--------+--------+--------+--------+-------+-----------+-----+
|pclass|                name|   sex|   Age|sibsp|parch|  ticket|    fare|embarked|survived|Initial|Family_Size|Alone|
+------+--------------------+------+------+-----+-----+--------+--------+--------+--------+-------+-----------+-----+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|       S|       1|   Miss|          0|    1|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|       S|       1| Master|          3|    0|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|       S|       0|   Miss|          3|    0|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|  113781|  151.55|       S|       0|     Mr|          3|    0|
|     1|Allison, Mrs. Hud...|female|  25.0|    1|    2|  113781|  151.55|       S|       0|    Mrs|          3|    0|
|     1| Anderson, Mr. Harry|  male|  48.0|    0|    0| 

Lets convert Sex, Embarked & Initial columns from string to number using StringIndexer

In [38]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["sex","embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

In [39]:
titanic_df.show()

+------+--------------------+------+------+-----+-----+--------+--------+--------+--------+-------+-----------+-----+---------+--------------+-------------+
|pclass|                name|   sex|   Age|sibsp|parch|  ticket|    fare|embarked|survived|Initial|Family_Size|Alone|sex_index|embarked_index|Initial_index|
+------+--------------------+------+------+-----+-----+--------+--------+--------+--------+-------+-----------+-----+---------+--------------+-------------+
|     1|Allen, Miss. Elis...|female|  29.0|    0|    0|   24160|211.3375|       S|       1|   Miss|          0|    1|      1.0|           0.0|          1.0|
|     1|Allison, Master. ...|  male|0.9167|    1|    2|  113781|  151.55|       S|       1| Master|          3|    0|      0.0|           0.0|          3.0|
|     1|Allison, Miss. He...|female|   2.0|    1|    2|  113781|  151.55|       S|       0|   Miss|          3|    0|      1.0|           0.0|          1.0|
|     1|Allison, Mr. Huds...|  male|  30.0|    1|    2|  1

In [40]:
titanic_df.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- embarked: string (nullable = false)
 |-- survived: integer (nullable = true)
 |-- Initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- sex_index: double (nullable = false)
 |-- embarked_index: double (nullable = false)
 |-- Initial_index: double (nullable = false)



Now let's remove the columns we won't be using for our ML Classifier

In [41]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

In [48]:
titanic_df.show()

+------+------+-----+-----+--------+--------+-----------+-----+---------+--------------+-------------+
|pclass|   Age|sibsp|parch|    fare|survived|Family_Size|Alone|sex_index|embarked_index|Initial_index|
+------+------+-----+-----+--------+--------+-----------+-----+---------+--------------+-------------+
|     1|  29.0|    0|    0|211.3375|       1|          0|    1|      1.0|           0.0|          1.0|
|     1|0.9167|    1|    2|  151.55|       1|          3|    0|      0.0|           0.0|          3.0|
|     1|   2.0|    1|    2|  151.55|       0|          3|    0|      1.0|           0.0|          1.0|
|     1|  30.0|    1|    2|  151.55|       0|          3|    0|      0.0|           0.0|          0.0|
|     1|  25.0|    1|    2|  151.55|       0|          3|    0|      1.0|           0.0|          2.0|
|     1|  48.0|    0|    0|   26.55|       1|          0|    1|      0.0|           0.0|          0.0|
|     1|  63.0|    1|    0| 77.9583|       1|          1|    0|      1.0|

In [62]:
# Calling function to check for missing values one last time
# fare is missing from a 3rd class passenger
null_columns_count_list = null_value_count(titanic_df)
null_columns_count_list
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                  fare|                1|
+----------------------+-----------------+



In [63]:
# Get the average fares per class
titanic_df.groupby('pclass').avg('fare').collect()

[Row(pclass=1, avg(fare)=87.50899164086687),
 Row(pclass=3, avg(fare)=13.302888700564957),
 Row(pclass=2, avg(fare)=21.1791963898917)]

In [65]:
# Fill blank fare
titanic_df = titanic_df.na.fill({"fare" : 13})

Let's put all features into a vector

In [66]:
feature = VectorAssembler(inputCols=titanic_df.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_df)

In [67]:
feature_vector.show()

+------+------+-----+-----+--------+--------+-----------+-----+---------+--------------+-------------+--------------------+
|pclass|   Age|sibsp|parch|    fare|survived|Family_Size|Alone|sex_index|embarked_index|Initial_index|            features|
+------+------+-----+-----+--------+--------+-----------+-----+---------+--------------+-------------+--------------------+
|     1|  29.0|    0|    0|211.3375|       1|          0|    1|      1.0|           0.0|          1.0|[29.0,0.0,0.0,211...|
|     1|0.9167|    1|    2|  151.55|       1|          3|    0|      0.0|           0.0|          3.0|[0.9167,1.0,2.0,1...|
|     1|   2.0|    1|    2|  151.55|       0|          3|    0|      1.0|           0.0|          1.0|[2.0,1.0,2.0,151....|
|     1|  30.0|    1|    2|  151.55|       0|          3|    0|      0.0|           0.0|          0.0|(10,[0,1,2,3,5],[...|
|     1|  25.0|    1|    2|  151.55|       0|          3|    0|      1.0|           0.0|          2.0|[25.0,1.0,2.0,151...|
|     1|

Now let's split our dataset into our training and test data

In [68]:
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

# **3. Machine Learning in Spark using SparkML**

Spark supports several ML Classifiers in SparkML, these are:

*   LogisticRegression
*   DecisionTreeClassifier
*   RandomForestClassifier
*   Gradient-boosted tree classifier
*   NaiveBayes
*   Support Vector Machine
*   LogisticRegression

In [69]:
# Let's use Logistic Regression First
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="survived", featuresCol="features")

# Training our Model
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="survived", predictionCol="prediction", metricName="accuracy")

+----------+--------+--------------------+
|prediction|survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[13.0,2.0,2.0,262...|
|       1.0|       1|[14.0,1.0,2.0,120...|
|       1.0|       1|[17.0,0.0,2.0,110...|
|       1.0|       1|[17.0,1.0,0.0,57....|
|       1.0|       1|[17.0,1.0,0.0,108...|
|       1.0|       1|[18.0,1.0,0.0,53....|
|       1.0|       1|[18.0,1.0,0.0,60....|
|       1.0|       1|[19.0,0.0,0.0,30....|
|       1.0|       1|[21.0,0.0,0.0,77....|
|       0.0|       0|(10,[0,2,3,5],[21...|
|       0.0|       0|(10,[0,3,6,8],[22...|
|       1.0|       1|[22.0,0.0,2.0,49....|
|       0.0|       0|(10,[0,3,6],[23.0...|
|       1.0|       1|[24.0,0.0,0.0,69....|
|       0.0|       0|(10,[0,1,3,5],[24...|
|       1.0|       1|[27.0,1.0,1.0,247...|
|       0.0|       0|(10,[0,3,6],[28.0...|
|       0.0|       0|(10,[0,3,6],[29.0...|
|       0.0|       0|(10,[0,1,3,5],[29...|
|       1.0|       1|[30.0,1.0,0.0,57....|
+----------

In [70]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

Accuracy of LogisticRegression is = 1
Test Error of LogisticRegression = 0 


In [71]:
# Let's try a Decision Tree Classifier 
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="survived", featuresCol="features")
dt_model = dt.fit(trainingData)
dt_prediction = dt_model.transform(testData)
dt_prediction.select("prediction", "survived", "features").show()

+----------+--------+--------------------+
|prediction|survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[13.0,2.0,2.0,262...|
|       1.0|       1|[14.0,1.0,2.0,120...|
|       1.0|       1|[17.0,0.0,2.0,110...|
|       1.0|       1|[17.0,1.0,0.0,57....|
|       1.0|       1|[17.0,1.0,0.0,108...|
|       1.0|       1|[18.0,1.0,0.0,53....|
|       1.0|       1|[18.0,1.0,0.0,60....|
|       1.0|       1|[19.0,0.0,0.0,30....|
|       1.0|       1|[21.0,0.0,0.0,77....|
|       0.0|       0|(10,[0,2,3,5],[21...|
|       0.0|       0|(10,[0,3,6,8],[22...|
|       1.0|       1|[22.0,0.0,2.0,49....|
|       0.0|       0|(10,[0,3,6],[23.0...|
|       1.0|       1|[24.0,0.0,0.0,69....|
|       0.0|       0|(10,[0,1,3,5],[24...|
|       1.0|       1|[27.0,1.0,1.0,247...|
|       0.0|       0|(10,[0,3,6],[28.0...|
|       0.0|       0|(10,[0,3,6],[29.0...|
|       0.0|       0|(10,[0,1,3,5],[29...|
|       1.0|       1|[30.0,1.0,0.0,57....|
+----------

In [72]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))

Accuracy of DecisionTreeClassifier is = 1
Test Error of DecisionTreeClassifier = 0 


In [73]:
# Let's try a Random Forest Classifier 
from pyspark.ml.classification import RandomForestClassifier

rf = DecisionTreeClassifier(labelCol="survived", featuresCol="features")
rf_model = rf.fit(trainingData)
rf_prediction = rf_model.transform(testData)
rf_prediction.select("prediction", "survived", "features").show()

+----------+--------+--------------------+
|prediction|survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[13.0,2.0,2.0,262...|
|       1.0|       1|[14.0,1.0,2.0,120...|
|       1.0|       1|[17.0,0.0,2.0,110...|
|       1.0|       1|[17.0,1.0,0.0,57....|
|       1.0|       1|[17.0,1.0,0.0,108...|
|       1.0|       1|[18.0,1.0,0.0,53....|
|       1.0|       1|[18.0,1.0,0.0,60....|
|       1.0|       1|[19.0,0.0,0.0,30....|
|       1.0|       1|[21.0,0.0,0.0,77....|
|       0.0|       0|(10,[0,2,3,5],[21...|
|       0.0|       0|(10,[0,3,6,8],[22...|
|       1.0|       1|[22.0,0.0,2.0,49....|
|       0.0|       0|(10,[0,3,6],[23.0...|
|       1.0|       1|[24.0,0.0,0.0,69....|
|       0.0|       0|(10,[0,1,3,5],[24...|
|       1.0|       1|[27.0,1.0,1.0,247...|
|       0.0|       0|(10,[0,3,6],[28.0...|
|       0.0|       0|(10,[0,3,6],[29.0...|
|       0.0|       0|(10,[0,1,3,5],[29...|
|       1.0|       1|[30.0,1.0,0.0,57....|
+----------

In [74]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

Accuracy of RandomForestClassifier is = 1
Test Error of RandomForestClassifier  = 0 


In [75]:
# Let's try a Gradient-boosted tree(GBT) Classifier 
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "survived", "features").show()

+----------+--------+--------------------+
|prediction|survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[13.0,2.0,2.0,262...|
|       1.0|       1|[14.0,1.0,2.0,120...|
|       1.0|       1|[17.0,0.0,2.0,110...|
|       1.0|       1|[17.0,1.0,0.0,57....|
|       1.0|       1|[17.0,1.0,0.0,108...|
|       1.0|       1|[18.0,1.0,0.0,53....|
|       1.0|       1|[18.0,1.0,0.0,60....|
|       1.0|       1|[19.0,0.0,0.0,30....|
|       1.0|       1|[21.0,0.0,0.0,77....|
|       0.0|       0|(10,[0,2,3,5],[21...|
|       0.0|       0|(10,[0,3,6,8],[22...|
|       1.0|       1|[22.0,0.0,2.0,49....|
|       0.0|       0|(10,[0,3,6],[23.0...|
|       1.0|       1|[24.0,0.0,0.0,69....|
|       0.0|       0|(10,[0,1,3,5],[24...|
|       1.0|       1|[27.0,1.0,1.0,247...|
|       0.0|       0|(10,[0,3,6],[28.0...|
|       0.0|       0|(10,[0,3,6],[29.0...|
|       0.0|       0|(10,[0,1,3,5],[29...|
|       1.0|       1|[30.0,1.0,0.0,57....|
+----------

In [76]:
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))

Accuracy of Gradient-boosted tree classifie is = 1
Test Error of Gradient-boosted tree classifie 0


In [77]:
# Let's try a Naive Bayes Classifier 
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(labelCol="survived", featuresCol="features")
nb_model = nb.fit(trainingData)
nb_prediction = nb_model.transform(testData)
nb_prediction.select("prediction", "survived", "features").show()

+----------+--------+--------------------+
|prediction|survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[13.0,2.0,2.0,262...|
|       1.0|       1|[14.0,1.0,2.0,120...|
|       1.0|       1|[17.0,0.0,2.0,110...|
|       1.0|       1|[17.0,1.0,0.0,57....|
|       1.0|       1|[17.0,1.0,0.0,108...|
|       1.0|       1|[18.0,1.0,0.0,53....|
|       1.0|       1|[18.0,1.0,0.0,60....|
|       1.0|       1|[19.0,0.0,0.0,30....|
|       1.0|       1|[21.0,0.0,0.0,77....|
|       1.0|       0|(10,[0,2,3,5],[21...|
|       1.0|       0|(10,[0,3,6,8],[22...|
|       1.0|       1|[22.0,0.0,2.0,49....|
|       1.0|       0|(10,[0,3,6],[23.0...|
|       1.0|       1|[24.0,0.0,0.0,69....|
|       1.0|       0|(10,[0,1,3,5],[24...|
|       1.0|       1|[27.0,1.0,1.0,247...|
|       1.0|       0|(10,[0,3,6],[28.0...|
|       0.0|       0|(10,[0,3,6],[29.0...|
|       1.0|       0|(10,[0,1,3,5],[29...|
|       1.0|       1|[30.0,1.0,0.0,57....|
+----------

In [78]:
nb_accuracy = evaluator.evaluate(nb_prediction)
print("Accuracy of NaiveBayes is  = %g"% (nb_accuracy))
print("Test Error of NaiveBayes  = %g " % (1.0 - nb_accuracy))

Accuracy of NaiveBayes is  = 0.766667
Test Error of NaiveBayes  = 0.233333 


In [79]:
# Let's try a Support Vector Machines (SVM) Classifier 

from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="survived", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "survived", "features").show()

+----------+--------+--------------------+
|prediction|survived|            features|
+----------+--------+--------------------+
|       1.0|       1|[13.0,2.0,2.0,262...|
|       1.0|       1|[14.0,1.0,2.0,120...|
|       1.0|       1|[17.0,0.0,2.0,110...|
|       1.0|       1|[17.0,1.0,0.0,57....|
|       1.0|       1|[17.0,1.0,0.0,108...|
|       1.0|       1|[18.0,1.0,0.0,53....|
|       1.0|       1|[18.0,1.0,0.0,60....|
|       1.0|       1|[19.0,0.0,0.0,30....|
|       1.0|       1|[21.0,0.0,0.0,77....|
|       0.0|       0|(10,[0,2,3,5],[21...|
|       0.0|       0|(10,[0,3,6,8],[22...|
|       1.0|       1|[22.0,0.0,2.0,49....|
|       0.0|       0|(10,[0,3,6],[23.0...|
|       1.0|       1|[24.0,0.0,0.0,69....|
|       0.0|       0|(10,[0,1,3,5],[24...|
|       1.0|       1|[27.0,1.0,1.0,247...|
|       0.0|       0|(10,[0,3,6],[28.0...|
|       0.0|       0|(10,[0,3,6],[29.0...|
|       0.0|       0|(10,[0,1,3,5],[29...|
|       1.0|       1|[30.0,1.0,0.0,57....|
+----------

In [80]:
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine is = %g"% (svm_accuracy))
print("Test Error of Support Vector Machine = %g " % (1.0 - svm_accuracy))

Accuracy of Support Vector Machine is = 1
Test Error of Support Vector Machine = 0 
