### Importing Libraries

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark.sql.functions as fun

### Building a Spark Session

In [2]:
spark = SparkSession.builder.appName('myApp').getOrCreate()

### Data Loading

In [3]:
train = spark.read.csv('train.csv', header=True, inferSchema=True)
test = spark.read.csv('test.csv', header=True, inferSchema=True)

### Show 5 rows

In [4]:
train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

### Display schema for the dataset

In [5]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Statistical summary

In [6]:
train.summary().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

## EDA - Exploratory Data Analysis

### Display count for the train dataset

In [7]:
train.count()

891

### How many people survived, and how many didn't survive? 

In [8]:
n_survived = train.where('Survived = 1').count()
n_not_survived = train.where('Survived = 0').count()

### Display your result

In [9]:
print(n_survived,n_not_survived)

342 549


### The Ratio of survival (using UDF)

In [10]:
total = train.count()
grouped = train.groupby('Survived').count()

@udf(returnType=FloatType()) 
def ratio(x):
    return x/total

grouped.select('Survived',ratio(fun.col('count'))).show(truncate=False)

+--------+------------+
|Survived|ratio(count)|
+--------+------------+
|1       |0.3838384   |
|0       |0.61616164  |
+--------+------------+



### The number of males and females?

In [11]:
n_males = train.where('Sex = "male"').count()
n_females = train.where('Sex = "female"').count()
print(n_males, n_females)

577 314


### What is the average number of survivors of each gender?

In [12]:
train.select('Sex','Survived').groupBy("Sex").avg("Survived").show()

+------+-------------------+
|   Sex|      avg(Survived)|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



### What is the number of survivors of each gender?

In [13]:
train.select('Sex','Survived').where('Survived = 1').groupBy("Sex").count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  233|
|  male|  109|
+------+-----+



### Creating a temporary view

In [14]:
train.createOrReplaceTempView('myView')

### How many people survived, and how many didn't survive? (By SQL)

In [15]:
spark.sql("""SELECT Survived, COUNT(Survived) AS nSurvived
          FROM myView 
          GROUP BY Survived""").show()

+--------+---------+
|Survived|nSurvived|
+--------+---------+
|       1|      342|
|       0|      549|
+--------+---------+



### The number of survivors from each gender as a ratio (By SQL)

In [16]:
spark.sql("""
SELECT Sex,sum(Survived)/count(Survived) AS survivalRatio
FROM myView
GROUP BY Sex
""").show()

+------+-------------------+
|   Sex|      survivalRatio|
+------+-------------------+
|female| 0.7420382165605095|
|  male|0.18890814558058924|
+------+-------------------+



### Displaying a ratio for "p-class": SUM(Survived)/count for p-class

In [17]:
spark.sql("""
SELECT Pclass,sum(Survived)/count(Survived) AS survivalRatio
FROM myView
GROUP BY Pclass
""").show()

+------+-------------------+
|Pclass|      survivalRatio|
+------+-------------------+
|     1| 0.6296296296296297|
|     3|0.24236252545824846|
|     2|0.47282608695652173|
+------+-------------------+



### Data Cleaning

### Merging both the train and test datasets

In [18]:
all_data = train.union(test)
all_data.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

### Display count

In [19]:
all_data.count()

1329

### The number of null values in each column

In [20]:
all_data.select([fun.count(fun.when(fun.isnull(c),c)).alias(c) for c in all_data.columns]).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|265|    0|    0|     0|   0| 1021|       3|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



### Extracting only columns of null values alone and showing them

In [21]:
null_data = all_data.select([fun.count(fun.when(fun.isnull(c),c)).alias(c) for c in all_data.columns])
columns = []
for c in null_data.columns :
    if null_data.select(c).collect()[0][0]>0 :
        columns.append(c)
null_data = null_data.select(columns)
null_data.show()

+---+-----+--------+
|Age|Cabin|Embarked|
+---+-----+--------+
|265| 1021|       3|
+---+-----+--------+



## Preprocessing 

### Creating a Temporary view

In [22]:
train.createOrReplaceTempView('myView2')

### Showing the "name" column from your temporary table

In [23]:
spark.sql("""
SELECT Name
FROM myView2
""").show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



### Using Regex for Pattern matching to create "Title" column

In [24]:
import pyspark.sql.functions as F
all_data = all_data.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))
all_data.createOrReplaceTempView('combined')

### Displaying "Title" column and count "Title" column

In [25]:
titles = spark.sql("""
SELECT Title,count(Title) as counts
FROM combined
GROUP BY Title
ORDER BY counts
""")
titles.show()

+--------+------+
|   Title|counts|
+--------+------+
|     Mme|     1|
|      Ms|     1|
|     Don|     1|
|    Capt|     2|
|    Lady|     2|
|     Sir|     2|
|Jonkheer|     2|
|Countess|     2|
|   Major|     3|
|     Col|     4|
|    Mlle|     4|
|     Rev|     9|
|      Dr|    11|
|  Master|    56|
|     Mrs|   186|
|    Miss|   257|
|      Mr|   786|
+--------+------+



#### We can see that Dr, Rev, Major, Col, Mlle, Capt, Don, Jonkheer, Countess, Ms, Sir, Lady, and Mme are really rare titles, so we'll create a Dictionary and set their values to "rare"

In [26]:
titles_data = titles.collect()
titles_map = {}
for i in range(titles.count()):
    if titles_data[i][1]<=11:
        titles_map[titles_data[i][0]] = 'rare'
    else:
        titles_map[titles_data[i][0]] = titles_data[i][0]

In [27]:
titles_map

{'Don': 'rare',
 'Mme': 'rare',
 'Ms': 'rare',
 'Countess': 'rare',
 'Lady': 'rare',
 'Capt': 'rare',
 'Sir': 'rare',
 'Jonkheer': 'rare',
 'Major': 'rare',
 'Col': 'rare',
 'Mlle': 'rare',
 'Rev': 'rare',
 'Dr': 'rare',
 'Master': 'Master',
 'Mrs': 'Mrs',
 'Miss': 'Miss',
 'Mr': 'Mr'}

### Using a UDF to do the task

In [28]:
@udf(returnType=StringType()) 
def impute_title(title):
    return titles_map[title]# Title_map is your dictionary. please change this name with your dictionary name.

### Applying the function on "Title" column using UDF

In [29]:
all_data = all_data.withColumn("Title", impute_title(F.col("Title")))
all_data.createOrReplaceTempView('myView3')

### Displaying "Title" from table and group by "Title" column

In [30]:
spark.sql("""SELECT count(*) as count,Title
             FROM myView3
             GROUP BY Title
             ORDER BY count
          """).show()

+-----+------+
|count| Title|
+-----+------+
|   44|  rare|
|   56|Master|
|  186|   Mrs|
|  257|  Miss|
|  786|    Mr|
+-----+------+



## **Preprocessing Age**

### Based on the "age" column mean, we will fill in the missing age values

In [31]:
mean_age = all_data.select(fun.mean(fun.col('Age'))).collect()[0][0]

### Fill missing with "age" mean

In [32]:
all_data = all_data.na.fill(mean_age,subset=['Age'])
all_data.show()

+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex|               Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+------------------+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|              22.0|    1|    0|       A/5 21171|   7.25| null|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|              38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|              26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|              35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia..

## **Preprocessing Embarked**

### Selecting "Embarked" column, counting them, ordering by count Desc, and saving in a variable

In [33]:
grouped_Embarked = spark.sql("""SELECT count(*) as count,Embarked
             FROM myView3
             GROUP BY Embarked
             ORDER BY Embarked Desc
          """)

### Showing the result

In [34]:
grouped_Embarked.show()

+-----+--------+
|count|Embarked|
+-----+--------+
|  962|       S|
|  111|       Q|
|  253|       C|
|    3|    null|
+-----+--------+



### Filling missing values with max 'S' of grouped_Embarked

In [35]:
all_data=all_data.na.fill(value="S",subset=["Embarked"])
all_data.createOrReplaceTempView('myView4')
grouped_Embarked=spark.sql("""SELECT count(*) as count,Embarked
             FROM myView4
             GROUP BY Embarked
             ORDER BY Embarked Desc
          """)
grouped_Embarked.show()

+-----+--------+
|count|Embarked|
+-----+--------+
|  965|       S|
|  111|       Q|
|  253|       C|
+-----+--------+



## **Preprocessing Cabin**

### Replacing "cabin" column with first char from the string

In [36]:
first_letter = udf(lambda z: z[0] )
all_data=all_data.withColumn("Cabin", all_data.Cabin.substr(0, 1))

### Showing the result

In [37]:
all_data.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|   Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|  Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S| Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|  Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|   Mr|
+-----------+--------+------+---

### Creating the temporary view

In [38]:
all_data.createOrReplaceTempView('myView5')

### Counting each cabin and ordering them by cabin name descending

In [39]:
spark.sql("""SELECT count(*) as count,Cabin
             FROM myView5
             GROUP BY Cabin
             ORDER BY Cabin Desc
          """).show()

+-----+-----+
|count|Cabin|
+-----+-----+
|    1|    T|
|    4|    G|
|   18|    F|
|   51|    E|
|   52|    D|
|   82|    C|
|   77|    B|
|   23|    A|
| 1021| null|
+-----+-----+



### Filling missing values with "U"

In [40]:
all_data = all_data.na.fill(value="U",subset=["Cabin"])
all_data.createOrReplaceTempView('myView6')
spark.sql("""SELECT count(*) as count,Cabin
             FROM myView6
             GROUP BY Cabin
             ORDER BY Cabin Desc
          """).show()

+-----+-----+
|count|Cabin|
+-----+-----+
| 1021|    U|
|    1|    T|
|    4|    G|
|   18|    F|
|   51|    E|
|   52|    D|
|   82|    C|
|   77|    B|
|   23|    A|
+-----+-----+



### Using String indexer and OneHotEncoder for the data 

In [41]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categoricalCols = [field for (field, dataType) in all_data.dtypes
                   if dataType == "string"]
indexOutputCols = [x + "_Index" for x in categoricalCols]

oheOutputCols = [x + "_OHE" for x in categoricalCols]

In [42]:
stringIndexer = StringIndexer(inputCols=categoricalCols,
                             outputCols=indexOutputCols,
                             handleInvalid='skip')
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
                          outputCols=oheOutputCols)

numericCols = [field for (field,dataType) in all_data.dtypes
              if ((dataType!='string') & (field!='Survived') & (field!='crew'))]

### Using Vector Assembler to combine Features

In [43]:
from pyspark.ml.feature import VectorAssembler

assemblerInputs = oheOutputCols + numericCols

vecAssembler = VectorAssembler(inputCols=assemblerInputs,outputCol='features')

### Use randomSplit function and split data to trainDF, and testDF with 80% and 20% Consecutive

In [44]:
trainDF, testDF = all_data.randomSplit([.8,.2],seed=3)

### Building a RandomForestClassifier model and using pipeline to fit and transform then display "prediction, Survived, features" columns

In [45]:
from pyspark.ml.classification import RandomForestClassifier 

randomforest = RandomForestClassifier(featuresCol='features',labelCol='Survived')

In [46]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [stringIndexer,oheEncoder,vecAssembler,randomforest])

In [47]:
predictions = pipeline.fit(trainDF).transform(testDF)

predictions.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|(1378,[629,761,12...|
|       0.0|       1|(1378,[506,761,12...|
|       1.0|       1|(1378,[528,1085,1...|
|       0.0|       0|(1378,[445,761,10...|
|       0.0|       0|(1378,[526,761,11...|
+----------+--------+--------------------+
only showing top 5 rows



### Using MulticlassClassificationEvaluator to get the log Loss

In [48]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="logLoss")
print('log Loss is',evaluator.evaluate(predictions))

log Loss is 0.536439913851112
