### **Problem** 
**Our goal is to create a predictive model that can answer the following question:**

**What kind of people had a better chance of surviving?**

**Data about passengers:**
*   Name
*   Age
*   Gender.


## Install and Import Libraries
Let's install PySpark:

In [None]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=c84dc718df9d35ef49861fe803b2ffa6e2ec905d502ef51ba3a0bdcd4512ccbf
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


## Build Spark Session

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Big Data").getOrCreate()

## Data Loading


You have two datasets: 
* Train  
* Test.

Read two datasets: 
* Train
* Test.



In [None]:
train = spark.read.csv("/content/train.csv", header="true", inferSchema="true")
test = spark.read.csv("/content/test.csv", header="true", inferSchema="true")

Let's work with train dataset:

**Confirm if this is a dataframe or not:**

In [None]:
type(train)

pyspark.sql.dataframe.DataFrame

**Show 5 rows.**

In [None]:
train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Display schema for the dataset:**

In [None]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



**Statistical summary:**

In [None]:
train.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

**Display count for the train dataset:**

In [None]:
train_count = train.count()
print(train_count)

891


**Can you answer this question:** 

**How many people survived, and how many didn't survive?** 

**Please save data in a variable.**

In [None]:
# Answer by code

survived_groupped_df = train.groupBy("Survived").count()

**Display your result:**

In [None]:
survived_groupped_df.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



**Can you display your answer in ratio form?(Hint: Use UDF. This is a hint you can use any method.)**






In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
getRatio = F.udf(lambda x: round(x/train_count,2), DoubleType())
survived_groupped_df = survived_groupped_df.withColumn("Ratio", getRatio('count'))

In [None]:
survived_groupped_df.show()

+--------+-----+-----+
|Survived|count|Ratio|
+--------+-----+-----+
|       1|  342| 0.38|
|       0|  549| 0.62|
+--------+-----+-----+



**Can you get the number of males and females?**


In [None]:
gender_grouped_df=train.groupBy("Sex").count()
gender_grouped_df.show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
getRatio = F.udf(lambda x: round(x/train_count,2), DoubleType())
gender_grouped_df = gender_grouped_df.withColumn("Ratio of Gender", getRatio('count'))

In [None]:
gender_grouped_df.show()

+------+-----+---------------+
|   Sex|count|Ratio of Gender|
+------+-----+---------------+
|female|  314|           0.35|
|  male|  577|           0.65|
+------+-----+---------------+



**1. What is the average number of survivors of each gender?**

**2. What is the number of survivors of each gender?**

(Hint: Group by the "sex" column. This is a hint you can use any method.)

In [None]:
train.groupBy("Sex").agg(F.mean('Survived'), F.sum('Survived')).show()

+------+-------------------+-------------+
|   Sex|      avg(Survived)|sum(Survived)|
+------+-------------------+-------------+
|female| 0.7420382165605095|          233|
|  male|0.18890814558058924|          109|
+------+-------------------+-------------+



**Create temporary view PySpark:**

In [None]:
train.createOrReplaceTempView("train")

**How many people survived, and how many didn't survive? By SQL:**

In [None]:
spark.sql("SELECT count(Survived) FROM train GROUP BY Survived").show()

+---------------+
|count(Survived)|
+---------------+
|            342|
|            549|
+---------------+



**Can you display the number of survivors from each gender as a ratio?**

(Hint: Group by "sex" column. This is a hint you can use any method.)

**Can you do this via SQL?**

In [None]:
spark.sql("SELECT Sex, round(SUM(Survived)/count(1),2) as ratio  FROM train GROUP BY Sex").show()

+------+-----+
|   Sex|ratio|
+------+-----+
|female| 0.74|
|  male| 0.19|
+------+-----+



**Display a ratio for "p-class": SUM(Survived)/count for p-class**


In [None]:
spark.sql("SELECT Pclass, round(SUM(Survived)/count(1),2) as ratio  FROM train GROUP BY Pclass").show()

+------+-----+
|Pclass|ratio|
+------+-----+
|     1| 0.63|
|     3| 0.24|
|     2| 0.47|
+------+-----+



**Let's take a break and continue after this.**

## Data Cleaning

**First and foremost, we must merge both the train and test datasets. (Hint: The union function can do this.)**



In [None]:
combined = train.union(test)

**Display count:**

In [None]:
combined.count()

1329

**Can you define the number of null values in each column?**


In [None]:
null_columns = []
for col_name in combined.columns:
    null_values = combined.where(F.col(col_name).isNull()).count()
    if(null_values > 0):
        null_columns.append((col_name, null_values))
print(null_columns)

[('Age', 265), ('Cabin', 1021), ('Embarked', 3)]


**Create Dataframe for null values**

1. Column
2. Number of missing values.

In [None]:
spark.createDataFrame(null_columns, ['column', 'missing_value']).show()

+--------+-------------+
|  column|missing_value|
+--------+-------------+
|     Age|          265|
|   Cabin|         1021|
|Embarked|            3|
+--------+-------------+



## Preprocessing 

**Create Temporary view PySpark:**

In [None]:
combined.createOrReplaceTempView("combined")

**Can you show the "name" column from your temporary table?**

In [None]:
spark.sql("SELECT Name FROM combined").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



**Run this code:**

In [None]:
combined = combined.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
combined.createOrReplaceTempView('combined')

**Display the "title" column and count "Title" column:**

In [None]:
spark.sql("SELECT Title,count(1)  FROM combined GROUP BY Title").show()

+--------+--------+
|   Title|count(1)|
+--------+--------+
|     Don|       1|
|    Miss|     257|
|Countess|       2|
|     Col|       4|
|     Rev|       9|
|    Lady|       2|
|  Master|      56|
|     Mme|       1|
|    Capt|       2|
|      Mr|     786|
|      Dr|      11|
|     Mrs|     186|
|     Sir|       2|
|Jonkheer|       2|
|    Mlle|       4|
|   Major|       3|
|      Ms|       1|
+--------+--------+



**We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so create Dictionary and set the value to "rare".**

In [None]:
titles_map = {
 'Capt': 'Rare',
 'Col': 'Rare',
 'Don': 'Rare',
 'Dona': 'Rare',
 'Dr': 'Rare',
 'Jonkheer':'Rare' ,
 'Lady': 'Rare',
 'Major': 'Rare',
 'Master': 'Master',
 'Miss': 'Miss',
 'Mlle': 'Rare',
 'Mme': 'Rare',
 'Mr': 'Mr',
 'Mrs': 'Mrs',
 'Ms': 'Rare',
 'Rev': 'Rare',
 'Sir': 'Rare',
 'Countess': 'Rare'
}

**Run the function:**

In [None]:
def impute_title(title):
    return titles_map[title]# Title_map is your dictionary. please change this name with your dictionary name.

**Apply the function on "Title" column using UDF:**

In [None]:
title_map_func = F.udf(lambda x: impute_title(x), StringType())
combined = combined.withColumn('Title', title_map_func('Title'))

**Display "Title" from table and group by "Title" column:**

In [None]:
combined.createOrReplaceTempView('combined')
spark.sql("SELECT Title FROM combined GROUP BY Title").show()

+------+
| Title|
+------+
|  Miss|
|Master|
|    Mr|
|   Mrs|
|  Rare|
+------+



## **Preprocessing Age**

**Based on the age mean, you will fill in the missing age values:**

In [None]:
round(spark.sql("SELECT AVG(Age) FROM combined").collect()[0][0])

30

**Fill missing age with age mean:**

In [None]:
combined = combined.fillna(30, subset=['Age'])

## **Preprocessing Embarked**

**Select Embarked, count them, order by count Desc, and save in grouped_Embarked variable:**




In [None]:
grouped_Embarked = spark.sql("SELECT Embarked,count(1) as count_it FROM combined GROUP BY Embarked ORDER BY count_it DESC")

**Show groupped_Embarked:**

In [None]:
grouped_Embarked.show()

+--------+--------+
|Embarked|count_it|
+--------+--------+
|       S|     962|
|       C|     253|
|       Q|     111|
|    null|       3|
+--------+--------+



**Get max of groupped_Embarked:** 

In [None]:
embarked_mode = grouped_Embarked.collect()[0][0]

In [None]:
print(embarked_mode)

S


**Fill missing values with max 'S' of grouped_Embarked:**

In [None]:
combined = combined.fillna(embarked_mode, subset=['Embarked'])

## **Preprocessing Cabin**

**Replace "cabin" column with first char from the string:**



In [None]:
combined = combined.withColumn("Cabin", combined.Cabin.substr(0, 1))

**Show the result:**

In [None]:
combined.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|    Mr|
|          6|       0|  

**Create the temporary view:**

In [None]:
combined.createOrReplaceTempView('combined')

**Select "Cabin" column, count Cabin column, Group by "Cabin" column, Order By count DESC**  

In [None]:
groupped_cabin = spark.sql("SELECT Cabin,count(1) as count_it FROM combined GROUP BY Cabin ORDER BY count_it DESC")
groupped_cabin.show()

+-----+--------+
|Cabin|count_it|
+-----+--------+
| null|    1021|
|    C|      82|
|    B|      77|
|    D|      52|
|    E|      51|
|    A|      23|
|    F|      18|
|    G|       4|
|    T|       1|
+-----+--------+



**Fill missing values with "U":**

In [None]:
combined = combined.fillna('U', subset=['Cabin'])

**StringIndexer: A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels). By default, this is ordered by label frequencies so the most frequent label gets index 0. The ordering behavior is controlled by setting stringOrderType. Its default value is ‘frequencyDesc’.**

**StringIndexer(inputCol=None, outputCol=None)**

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler,OneHotEncoder
from pyspark.ml import Pipeline

In [None]:
categoricalCols = [field for (field, dataType) in combined.dtypes
                   if dataType == "string"]
categoricalCols
            

['Name', 'Sex', 'Ticket', 'Cabin', 'Embarked', 'Title']

In [None]:
indexOutputCols = [x + "_Index" for x in categoricalCols]
indexOutputCols
oheOutputCols = [x + "_OHE" for x in categoricalCols]
oheOutputCols

['Name_OHE', 'Sex_OHE', 'Ticket_OHE', 'Cabin_OHE', 'Embarked_OHE', 'Title_OHE']

**OneHotEncoder(inputCols=None, outputCols=None)**

A one-hot encoder that maps a column of category indices to a column of binary vectors, with at most a single one-value per row that indicates the input category index. For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].

In [None]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

**VectorAssembler: VectorAssembler(*, inputCols=None, outputCol="features")**



In [None]:
numericCols = [field for (field,dataType) in combined.dtypes
              if ((dataType=='double')& (field!='Survived'))]
numericCols

['Age', 'Fare']

In [None]:
assemblerInputs = oheOutputCols + numericCols
assemblerInputs

['Name_OHE',
 'Sex_OHE',
 'Ticket_OHE',
 'Cabin_OHE',
 'Embarked_OHE',
 'Title_OHE',
 'Age',
 'Fare']

In [None]:
vecAssembler = VectorAssembler(inputCols=numericCols,outputCol='features')

**Use randomSplit function and split data to x_train, and X_test with 80% and 20% Consecutive**

In [None]:
X_train, X_test = combined.randomSplit([0.8, 0.2],seed = 11)

**Pipeline: ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines.**

**Build RandomForestClassifier model and use pipeline to fit and transform then display "prediction, Survived, features" columns**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')
pipeline = Pipeline(stages=[stringIndexer,oheEncoder,vecAssembler,rf])

predictions = pipeline.fit(X_train).transform(X_test)

combined.show()
predictions.select("prediction", "Survived", "features").show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|    U|       S|    Mr|
|          6|       0|  

**Use MulticlassClassificationEvaluator and set the "labelCol" to "Survived",  "predictionCol" to "prediction", "metricName" to "accuracy"** 

In [2]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
print("Accuracy : " + str(evaluator.evaluate(predictions)))

ModuleNotFoundError: ignored

In [1]:
test = test.drop('Survived')

NameError: ignored