### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import *

## Build Spark Session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

21/10/05 07:47:23 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.153.130 instead (on interface ens33)
21/10/05 07:47:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/10/05 07:47:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/05 07:47:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/10/05 07:47:27 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [3]:
train_df = spark.read.csv('train.csv', header=True, inferSchema=True)
test_df = spark.read.csv('test.csv', header=True, inferSchema=True)

                                                                                

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [4]:
type(train_df)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [5]:
train_df.show(5, truncate=False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

**Display schema for the dataset:**

In [6]:
train_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [7]:
train_df.describe().show(truncate=False)

                                                                                

+-------+-----------------+-------------------+------------------+------------------------------------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|PassengerId      |Survived           |Pclass            |Name                                            |Sex   |Age               |SibSp             |Parch              |Ticket            |Fare             |Cabin|Embarked|
+-------+-----------------+-------------------+------------------+------------------------------------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|count  |891              |891                |891               |891                                             |891   |714               |891               |891                |891               |891              |204  |889     |
|mean   |446.0            |0.3838383838383838 |2.308641975308642 |nu

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [8]:
train_df.count()

891

**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [9]:
survived_vs_not_survived = train_df.groupBy('Survived').agg((count('Survived')).alias('Count'))

**Display your result:**

In [10]:
survived_vs_not_survived.show()



+--------+-----+
|Survived|Count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



                                                                                

**Can you display your answer in ratio form?(Hint: Use UDF.)**






In [11]:
survived_vs_not_survived.select(col('Survived'), (col('Count')/train_df.count()).alias('Ratio')).show()



+--------+------------------+
|Survived|             Ratio|
+--------+------------------+
|       1|0.3838383838383838|
|       0|0.6161616161616161|
+--------+------------------+



                                                                                

**Can you get the number of males and females?**


In [12]:
train_df.groupBy('Sex').agg((count('Sex')).alias('Count')).show()



+------+-----+
|   Sex|Count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



                                                                                

**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column.)

In [13]:
train_df.groupBy(['Sex', 'Survived']).agg((count('Survived')).alias('Number of Survivors')).filter(col('Survived') == 1).show()

                                                                                

+------+--------+-------------------+
|   Sex|Survived|Number of Survivors|
+------+--------+-------------------+
|female|       1|                233|
|  male|       1|                109|
+------+--------+-------------------+



**Create temporary view PySpark:**

In [14]:
train_df.createOrReplaceTempView("train_table")

**How many people survived, and how many didn't survive? By SQL:**

In [15]:
sql_statement_1 = '''select Survived, count(Survived) as Count
                    from train_table
                    group by Survived'''

spark.sql(sql_statement_1).show()

                                                                                

+--------+-----+
|Survived|Count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



                                                                                

**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column.)

**Can you do this via SQL?**

In [16]:
sql_statement_2 = '''select Sex, count(Sex) as Number_of_Survivors
                    from (select * from train_table where Survived == 1)
                    group by Sex'''

spark.sql(sql_statement_2).show()

                                                                                

+------+-------------------+
|   Sex|Number_of_Survivors|
+------+-------------------+
|female|                233|
|  male|                109|
+------+-------------------+



**Display a ratio for p-class:**


In [17]:
train_df.head()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')

In [18]:
sql_statement_3 = '''select Pclass, count(Pclass)/sum(count(*)) over() as Ratio
                    from train_table
                    group by Pclass
                    order by Pclass asc'''

spark.sql(sql_statement_3).show()

21/10/05 07:48:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+------+-------------------+
|Pclass|              Ratio|
+------+-------------------+
|     1|0.24242424242424243|
|     2|0.20650953984287318|
|     3| 0.5510662177328844|
+------+-------------------+



                                                                                

**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [19]:
train_and_test_df = train_df.union(test_df)
train_and_test_df.show(truncate=False)

+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                                   |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                                |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                                 |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)           |female|35.0|1  

**Display count:**

In [20]:
train_and_test_df.count()

1329

**Temporary view PySpark:**

In [21]:
train_and_test_df.createOrReplaceTempView('train_test_table')

**Can you define the number of null values in each column?**


In [22]:
nulls = train_and_test_df.select([count(when(isnull(col), col)).alias(col) for col in train_and_test_df.columns])
nulls.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [23]:
null_df = nulls.select('Age', 'Cabin', 'Embarked')
null_df.show()

+---+-----+--------+
|Age|Cabin|Embarked|
+---+-----+--------+
|265| 1021|       3|
+---+-----+--------+



## Preprocessing 

**Can you show me the name column from your temporary table?**

In [24]:
sql_statement_4 = '''select name
                    from train_test_table'''

spark.sql(sql_statement_4).show(truncate=False)

+-------------------------------------------------------+
|name                                                   |
+-------------------------------------------------------+
|Braund, Mr. Owen Harris                                |
|Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |
|Heikkinen, Miss. Laina                                 |
|Futrelle, Mrs. Jacques Heath (Lily May Peel)           |
|Allen, Mr. William Henry                               |
|Moran, Mr. James                                       |
|McCarthy, Mr. Timothy J                                |
|Palsson, Master. Gosta Leonard                         |
|Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)      |
|Nasser, Mrs. Nicholas (Adele Achem)                    |
|Sandstrom, Miss. Marguerite Rut                        |
|Bonnell, Miss. Elizabeth                               |
|Saundercock, Mr. William Henry                         |
|Andersson, Mr. Anders Johan                            |
|Vestrom, Miss

**Run this code:**

In [25]:
combined = train_and_test_df.withColumn('Title',regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display the title and count "Title" column:**

In [26]:
combined.select('Title').count()


sql_statement_5 = '''
                    select Title, count(Title) as Count
                    from combined
                    group By Title
                    '''
titles_df = spark.sql(sql_statement_5)
titles_df.show()

                                                                                

+--------+-----+
|   Title|Count|
+--------+-----+
|     Don|    1|
|    Miss|  257|
|Countess|    2|
|     Col|    4|
|     Rev|    9|
|    Lady|    2|
|  Master|   56|
|     Mme|    1|
|    Capt|    2|
|      Mr|  786|
|      Dr|   11|
|     Mrs|  186|
|     Sir|    2|
|Jonkheer|    2|
|    Mlle|    4|
|   Major|    3|
|      Ms|    1|
+--------+-----+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [27]:
titles = titles_df.collect()
titles

                                                                                

[Row(Title='Don', Count=1),
 Row(Title='Miss', Count=257),
 Row(Title='Countess', Count=2),
 Row(Title='Col', Count=4),
 Row(Title='Rev', Count=9),
 Row(Title='Lady', Count=2),
 Row(Title='Master', Count=56),
 Row(Title='Mme', Count=1),
 Row(Title='Capt', Count=2),
 Row(Title='Mr', Count=786),
 Row(Title='Dr', Count=11),
 Row(Title='Mrs', Count=186),
 Row(Title='Sir', Count=2),
 Row(Title='Jonkheer', Count=2),
 Row(Title='Mlle', Count=4),
 Row(Title='Major', Count=3),
 Row(Title='Ms', Count=1)]

In [28]:
rare_dict = {
    titles[i][0] : 'rare'
    if titles[i][1] <= 11 
    else titles[i][0]
    for i in range(len(titles))
}


rare_dict

{'Don': 'rare',
 'Miss': 'Miss',
 'Countess': 'rare',
 'Col': 'rare',
 'Rev': 'rare',
 'Lady': 'rare',
 'Master': 'Master',
 'Mme': 'rare',
 'Capt': 'rare',
 'Mr': 'Mr',
 'Dr': 'rare',
 'Mrs': 'Mrs',
 'Sir': 'rare',
 'Jonkheer': 'rare',
 'Mlle': 'rare',
 'Major': 'rare',
 'Ms': 'rare'}

**Run the function:**

In [29]:
def impute_title(title):
    return rare_dict[title]

**Apply the function on "Title" column using UDF:**

In [30]:
replaceUDF = udf(lambda z: impute_title(z))
combined = combined.withColumn("Title", replaceUDF(col("Title")))
combined.createOrReplaceTempView("combined")

**Display "Title" from table and group by "Title" column:**

In [31]:
combined.select('Title').show(truncate=False)


sql_statement_6 = '''
                    select Title
                    from combined
                    group By Title
                    '''
spark.sql(sql_statement_6).show()

                                                                                

+------+
|Title |
+------+
|Mr    |
|Mrs   |
|Miss  |
|Mrs   |
|Mr    |
|Mr    |
|Mr    |
|Master|
|Mrs   |
|Mrs   |
|Miss  |
|Miss  |
|Mr    |
|Mr    |
|Miss  |
|Mrs   |
|Master|
|Mr    |
|Mrs   |
|Mrs   |
+------+
only showing top 20 rows



                                                                                

+------+
| Title|
+------+
|  rare|
|  Miss|
|Master|
|    Mr|
|   Mrs|
+------+



## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [32]:
mean_age = int(combined.select('Age').agg({'Age':'mean'}).collect()[0][0])
mean_age

30

**Fill missing age with age mean:**

In [33]:
combined = combined.na.fill(mean_age, subset='Age')
combined.select('Age').show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|30.0|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|30.0|
|31.0|
|30.0|
+----+
only showing top 20 rows



## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [34]:
combined_Embarked = combined.select('Embarked').groupBy('Embarked').count().orderBy('count',ascending=False)

**Show groupped_Embarked:**

In [35]:
combined_Embarked.show()

                                                                                

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  962|
|       C|  253|
|       Q|  111|
|    null|    3|
+--------+-----+



**Get the top groupped_Embarked:** 

In [36]:
top_groupped_Embarked = combined_Embarked.select('Embarked').where(col('count') == 962)
top_groupped_Embarked.show()

                                                                                

+--------+
|Embarked|
+--------+
|       S|
+--------+



In [37]:
combined_Embarked.agg(max('count')).show()



+----------+
|max(count)|
+----------+
|       962|
+----------+



                                                                                

**Fill missing values with Top 'S' of grouped_Embarked:**

In [38]:
combined = combined.na.fill('S', subset='Embarked')

In [50]:
combined.select('Embarked').groupBy('Embarked').count().orderBy('count',ascending=False).show()



+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  965|
|       C|  253|
|       Q|  111|
+--------+-----+



                                                                                

## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [39]:
combined = combined.withColumn('Cabin', split(col('Cabin'),"").getItem(0))

**Show the result:**

In [40]:
combined.select('Cabin').show()

+-----+
|Cabin|
+-----+
| null|
|    C|
| null|
|    C|
| null|
| null|
|    E|
| null|
| null|
| null|
|    G|
|    C|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
| null|
+-----+
only showing top 20 rows



**Create the temporary view:**

In [41]:
combined.createOrReplaceTempView('combined')

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [42]:
combined.select('Cabin').groupBy('Cabin').count().orderBy('count', ascending=False).show()



+-----+-----+
|Cabin|count|
+-----+-----+
| null| 1021|
|    C|   82|
|    B|   77|
|    D|   52|
|    E|   51|
|    A|   23|
|    F|   18|
|    G|    4|
|    T|    1|
+-----+-----+



                                                                                

**Fill missing values with "U":**

In [43]:
combined = combined.na.fill('U', subset='Cabin')
combined.select('Cabin').show(10)

+-----+
|Cabin|
+-----+
|    U|
|    C|
|    U|
|    C|
|    U|
|    U|
|    E|
|    U|
|    U|
|    U|
+-----+
only showing top 10 rows



**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

____________________________________________

**Use Pipline to fit and transform:**

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol=None) A feature transformer that merges multiple columns into a vector column.**



In [44]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [45]:
X_train, X_test = combined.randomSplit([0.8,0.2],seed=0)
print(f"There are {X_train.count()} rows in the training set, and {X_test.count()} in the test set.")

There are 1060 rows in the training set, and 269 in the test set.


In [48]:
categoricalColumns = [col for (col, dtype) in X_train.dtypes
                   if dtype == "string"]
categoricalColumns           

['Name', 'Sex', 'Ticket', 'Cabin', 'Embarked', 'Title']

In [51]:
categoricalColumns.remove("Name")
categoricalColumns

['Sex', 'Ticket', 'Cabin', 'Embarked', 'Title']

In [52]:
indexOutputColumns = [x + "_Index" for x in categoricalColumns]
indexOutputColumns

['Sex_Index', 'Ticket_Index', 'Cabin_Index', 'Embarked_Index', 'Title_Index']

In [53]:
oheOutputColumns = [x + "_OHE" for x in categoricalColumns]
oheOutputColumns

['Sex_OHE', 'Ticket_OHE', 'Cabin_OHE', 'Embarked_OHE', 'Title_OHE']

In [54]:
stringIndexer = StringIndexer(inputCols=categoricalColumns,
                             outputCols=indexOutputColumns,
                             handleInvalid='skip')

oheEncoder = OneHotEncoder(inputCols=indexOutputColumns,
                          outputCols=oheOutputColumns)

In [55]:
numericColumns = [field for (field,dataType) in X_train.dtypes
              if ((dataType=='double'or dataType=='int')& (field!='Survived'))]
numericColumns.remove('PassengerId')
numericColumns

['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']

In [56]:
assemblerInputs = oheOutputColumns + numericColumns
assemblerInputs

['Sex_OHE',
 'Ticket_OHE',
 'Cabin_OHE',
 'Embarked_OHE',
 'Title_OHE',
 'Pclass',
 'Age',
 'SibSp',
 'Parch',
 'Fare']

In [57]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [58]:
rf = RandomForestClassifier(featuresCol='features', labelCol='Survived', predictionCol='prediction', maxDepth=5)
pipeline = Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,rf])

In [59]:
pipelineModel = pipeline.fit(X_train)

                                                                                

In [60]:
predDF = pipelineModel.transform(X_test)

In [61]:
predDF.select('features','Survived','prediction').show(5)

21/10/05 07:50:22 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+--------+----------+
|            features|Survived|prediction|
+--------------------+--------+----------+
|(638,[0,119,619,6...|       0|       0.0|
|(638,[381,619,628...|       1|       0.0|
|(638,[480,619,627...|       0|       0.0|
|(638,[309,621,627...|       1|       1.0|
|(638,[0,382,619,6...|       1|       0.0|
+--------------------+--------+----------+
only showing top 5 rows



**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [62]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MCE = MulticlassClassificationEvaluator(predictionCol='prediction',
                                        labelCol='Survived',
                                        metricName='accuracy')

In [63]:
accuracy = MCE.evaluate(predDF)
accuracy

0.7346938775510204