This kernel will give a tutorial for starting out with PySpark using Titanic dataset. Let's get started. 


### Kernel Goals
<a id="aboutthiskernel"></a>
***
There are three primary goals of this kernel.
- <b>Provide a tutorial for someone who is starting out with pyspark.
- <b>Do an exploratory data analysis(EDA)</b> of titanic with visualizations and storytelling.  
- <b>Predict</b>: Use machine learning classification models to predict the chances of passengers survival.

### What is Spark, anyway?
Spark is a platform for cluster computing. Spark lets us spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up data makes it easier to work with very large datasets because each node only works with a small amount of data.
As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

Deciding whether or not Spark is the best solution for your problem takes some experience, but you can consider questions like:
* Is my data too big to work with on a single machine?
* Can my calculations be easily parallelized?



In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('../input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
## installing pyspark
!pip install pyspark

The first step in using Spark is connecting to a cluster. In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called worker. The master sends the workers data and calculations to run, and they send their results back to the master.

We definitely don't need may clusters for Titanic dataset. In addition to that, the syntax for running locally or using many clusters are pretty similar. To start working with Spark DataFrames, we first have to create a SparkSession object from SparkContext. We can think of the SparkContext as the connection to the cluster and SparkSession as the interface with that connection. Let's create a SparkSession. 

# Beginner Tutorial
This part is solely for beginners. I recommend starting from to get a good understanding of the flow. 

In [3]:
## creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tutorial').getOrCreate()

Let's read the dataset. 

In [4]:
df = spark.read.csv('../input/titanic/train.csv', header = True, inferSchema=True)
df_test = spark.read.csv('../input/titanic/test.csv', header = True, inferSchema=True)

In [5]:
## So, what is df?
type(df)

In [6]:
## As you can see it's a Spark dataframe. Let's take a look at the preview of the dataset. 
df.show()

In [7]:
## It looks a bit messi. See what I did there? ;). Anyway, how about using .toPandas() for change. 
df.toPandas()

In [8]:
# I use the toPandas() in a riddiculous amount as you will see in this kernel. 
# It is just convenient and doesn't put a lot of constran in my eye. 
## in addition to that if you know pandas, this can be very helpful 
## for checking your work.
## how about a summary. 
result = df.describe().toPandas()

In [9]:
result

In [10]:
# We can also convert a pandas dataframe to spark dataframe. Here is how we do it. 
print(f"Before: {type(result)}")
spark_temp = spark.createDataFrame(result)
print(f"After: {type(spark_temp)}")

In [11]:
# Cool, Let's print the schema of the df using .printSchema()
df.printSchema()

In [12]:
# similar approach
df.dtypes

The data in the real world is not this clean. We often have to create our own schema and implement it. We will describe more about it in the future. Since we are talking about schema, are you wondering if you would be able to implement sql with Spark?. Yes, you can. 

One of the best advantage of Spark is that you can run sql commands to do analysis. If you are like that nifty co-worker of mine, you would probably want to use sql with spark. Let's do an example. 

In [13]:
## First, we need to register a sql temporary view.
df.createOrReplaceTempView("mytable");

## Then, we use spark.sql and write sql inside it. 
result = spark.sql("SELECT * FROM mytable ORDER BY Fare DESC LIMIT 10")
result.toPandas()

Similarly we can also register another sql temp view. 

In [14]:
df_test.createOrReplaceTempView("df_test")

Now that we have registered two tables with in this spark session, wondering how we can see which once are registered?

In [15]:
spark.catalog.listTables()

In [16]:
# We can also create spark dataframe out of these tables, Here is how we do it.
temp_table = spark.table("df_test")
print(type(temp_table))
temp_table.show(5)

In [17]:
# pretty cool, We will dive deep in sql later. 
# Let's go back to dataFrame and do some nitty-gritty stuff. 
# What if want the column names only. 
df.columns

In [18]:
# What about just a column?
df['Age']

In [19]:
type(df['Age'])

In [20]:
# Well, that's not what we pandas users have expected. 
# Yes, in order to get a column we need to use select().  
# df.select(df['Age']).show()
df.select('Age').show()

In [21]:
## What if we want multiple columns?
df.select(['Age', 'Fare']).show()

In [22]:
# that's more like it. 
# What about accessing a row
df.head(1)

In [23]:
type(df.head(1))

In [24]:
## returns a list. let's get the item in the list
row = df.head(1)[0]
row

In [25]:
type(row)

In [26]:
## row can be converted into dict using .asDict()
row.asDict()

In [27]:
## Then the value can be accessed from the row dictionaly. 
row.asDict()['PassengerId']

In [28]:
## similarly
row.asDict()['Name']

In [29]:
## let's say we want to change the name of a column. we can use withColumnRenamed
# df.withColumnRenamed('exsisting name', 'anticipated name');
df.withColumnRenamed("Age", "newA").limit(5).toPandas()

In [30]:
# Let's say we want to modify a column, for example, add in this case, adding $20 with every fare. 
## df.withColumn('existing column', 'calculation with the column(we have to put df not just column)')
## so not df.withColumn('Fare', 'Fare' +20).show()
df.withColumn('Fare', df['Fare']+20).limit(5).show()

In [31]:
## let's say we want get the average fare.
# we will use the "mean" function from pyspark.sql.functions(this is where all the functions are stored) and
# collect the data using ".collect()" instead of using .show()
# collect returns a list so we need to get the value from the list using index

In [32]:
from pyspark.sql.functions import mean
fare_mean = df.select(mean("Fare")).collect()
fare_mean[0][0]

In [33]:
fare_mean = fare_mean[0][0]
fare_mean

#### Filter

In [34]:
# What if we want to filter data and see all datapoints above average. 
# there are two approaches of this, we can use sql syntex/passing a string
# or just dataframe approach. 
df.filter("Fare > 32.20" ).limit(3).show()

In [35]:
# or we can use the dataframe approach
df.filter(df['Fare']> fare_mean).limit(3).show()

In [36]:
## What if we want to filter by multiple columns.
# passenger with below average fare with a Pclass equals 3
df.filter((df['Fare'] < fare_mean) &
          (df['Pclass'] ==  3)
         ).show(10)

In [37]:
# passenger with below fare and are not male
filter1_less_than_mean_fare = df['Fare'] < fare_mean
filter2_sex_not_male = df['Sex'] != "male"
df.filter((filter1_less_than_mean_fare) &
          (filter2_sex_not_male)).show(10)

In [38]:
# We can also apply it this way
# passenger with below fare and are not male
# creating filters
filter1_less_than_mean_fare = df['Fare'] < fare_mean
filter2_sex_not_male = df['Sex'] != "male"
# applying filters
df.filter(filter1_less_than_mean_fare).filter(filter2_sex_not_male).show(10)

#### GroupBy

In [39]:
## Let's group by Pclass and see how the average fare price. 
df.groupBy("Pclass").mean().toPandas()

In [40]:
## let's just look at the Pclass and avg(Fare)
df.groupBy("Pclass").mean().select('Pclass', 'avg(Fare)').show()

In [41]:
# Alternative way
df.groupBy("Pclass").mean("Fare").show()

In [42]:
## What if we want just the average of all fare, we can use .agg with the dataframe. 
df.agg({'Fare':'mean'}).show()

In [43]:
## another way this can be done is by importing "mean" funciton from pyspark.sql.functions
from pyspark.sql.functions import mean
df.select(mean("Fare")).show()

In [44]:
## we can also combine the few previous approaches to get similar results. 
temp = df.groupBy("Pclass")
temp.agg({"Fare": 'mean'}).show()

In [45]:
# What if we want to format the results. 
# for example,
# I want to rename the column. this will be accomplished using .alias() method.  
# I want to format the number with only two decimals. this can be done using "format_number"
from pyspark.sql.functions import format_number
temp = df.groupBy("Pclass")
temp = temp.agg({"Fare": 'mean'})
temp.select('Pclass', format_number("avg(Fare)", 2).alias("average fare")).show()

In [46]:
## What if I want to order by Fare in ascending order. 
df.orderBy("Fare").limit(20).toPandas()

In [47]:
## What about descending order
df.orderBy(df['Fare'].desc()).limit(5).toPandas()

In [48]:
## How do we deal with missing values. 
# df.na.drop(how=("any"/"all"), thresh=(1,2,3,4,5...))
df.na.drop(how="any").limit(5).toPandas()

# Advanced Tutorial
## Dealing with Missing Values
### Cabin

In [49]:
# filling the null values in cabin with "N".
# df.fillna(value, subset=[]);
df = df.na.fill('N', subset=['Cabin'])
df_test = df_test.na.fill('N', subset=['Cabin'])

### Fare

In [50]:
## how do we find out the rows with missing values?
# we can use .where(condition) with .isNull()
df_test.where(df_test['Fare'].isNull()).show()

Here, We can take the average of the **Fare** column to fill in the NaN value. However, for the sake of learning and practicing, we will try something else. We can take the average of the values where **Pclass** is ***3***, **Sex** is ***male*** and **Embarked** is ***S***

In [51]:
missing_value = df_test.filter(
    (df_test['Pclass'] == 3) &
    (df_test.Embarked == 'S') &
    (df_test.Sex == "male")
)
## filling in the null value in the fare column using Fare mean. 
df_test = df_test.na.fill(
    missing_value.select(mean('Fare')).collect()[0][0],
    subset=['Fare']
)

In [52]:
# Checking
df_test.where(df_test['Fare'].isNull()).show()

### Embarked

In [53]:
df.where(df['Embarked'].isNull()).show()

In [54]:
## Replacing the null values in the Embarked column with the mode. 
df = df.na.fill('C', subset=['Embarked'])

In [55]:
## checking
df.where(df['Embarked'].isNull()).show()

In [56]:
df_test.where(df_test.Embarked.isNull()).show()

## Feature Engineering
### Cabin

In [57]:
## this is a code to create a wrapper for function, that works for both python and Pyspark.
from typing import Callable
from pyspark.sql import Column
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType, ArrayType, DataType
class py_or_udf:
    def __init__(self, returnType : DataType=StringType()):
        self.spark_udf_type = returnType
        
    def __call__(self, func : Callable):
        def wrapped_func(*args, **kwargs):
            if any([isinstance(arg, Column) for arg in args]) or \
                any([isinstance(vv, Column) for vv in kwargs.values()]):
                return udf(func, self.spark_udf_type)(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        return wrapped_func

    
@py_or_udf(returnType=StringType())
def first_char(col):
    return col[0]
    

In [58]:
df = df.withColumn('Cabin', first_char(df['Cabin']))

In [59]:
df_test = df_test.withColumn('Cabin', first_char(df_test['Cabin']))

In [60]:
df.limit(5).toPandas()

We can use the average of the fare column We can use pyspark's ***groupby*** function to get the mean fare of each cabin letter.

In [61]:
df.groupBy('Cabin').mean("Fare").show()

Now, these mean can help us determine the unknown cabins, if we compare each unknown cabin rows with the given mean's above. Let's write a simple function so that we can give cabin names based on the means. 

In [62]:
@py_or_udf(returnType=StringType())
def cabin_estimator(i):
    """Grouping cabin feature by the first letter"""
    a = 0
    if i<16:
        a = "G"
    elif i>=16 and i<27:
        a = "F"
    elif i>=27 and i<38:
        a = "T"
    elif i>=38 and i<47:
        a = "A"
    elif i>= 47 and i<53:
        a = "E"
    elif i>= 53 and i<54:
        a = "D"
    elif i>=54 and i<116:
        a = 'C'
    else:
        a = "B"
    return a

In [63]:
## separating data where Cabin == 'N', remeber we used 'N' for Null. 
df_withN = df.filter(df['Cabin'] == 'N')
df2 = df.filter(df['Cabin'] != 'N')

## replacing 'N' using cabin estimated function. 
df_withN = df_withN.withColumn('Cabin', cabin_estimator(df_withN['Fare']))

# putting the dataframe back together. 
df = df_withN.union(df2).orderBy('PassengerId') 

In [64]:
#let's do the same for test set
df_testN = df_test.filter(df_test['Cabin'] == 'N')
df_testNoN = df_test.filter(df_test['Cabin'] != 'N')
df_testN = df_testN.withColumn('Cabin', cabin_estimator(df_testN['Fare']))
df_test = df_testN.union(df_testNoN).orderBy('PassengerId')

### Name

In [65]:
## creating UDF functions
@py_or_udf(returnType=IntegerType())
def name_length(name):
    return len(name)


@py_or_udf(returnType=StringType())
def name_length_group(size):
    a = ''
    if (size <=20):
        a = 'short'
    elif (size <=35):
        a = 'medium'
    elif (size <=45):
        a = 'good'
    else:
        a = 'long'
    return a

In [66]:
## getting the name length from name. 
df = df.withColumn("name_length", name_length(df['Name']))

## grouping based on name length. 
df = df.withColumn("nLength_group", name_length_group(df['name_length']))

In [67]:
## Let's do the same for test set. 
df_test = df_test.withColumn("name_length", name_length(df_test['Name']))

df_test = df_test.withColumn("nLength_group", name_length_group(df_test['name_length']))

### Title

In [68]:
## this function helps getting the title from the name. 
@py_or_udf(returnType=StringType())
def get_title(name):
    return name.split('.')[0].split(',')[1].strip()

df = df.withColumn("title", get_title(df['Name']))
df_test = df_test.withColumn('title', get_title(df_test['Name']))

In [69]:
## we are writing a function that can help us modify title column
@py_or_udf(returnType=StringType())
def fuse_title1(feature):
    """
    This function helps modifying the title column
    """
    if feature in ['the Countess','Capt','Lady','Sir','Jonkheer','Don','Major','Col', 'Rev', 'Dona', 'Dr']:
        return 'rare'
    elif feature in ['Ms', 'Mlle']:
        return 'Miss'
    elif feature == 'Mme':
        return 'Mrs'
    else:
        return feature

In [70]:
df = df.withColumn("title", fuse_title1(df["title"]))

In [71]:
df_test = df_test.withColumn("title", fuse_title1(df_test['title']))

In [72]:
print(df.toPandas()['title'].unique())
print(df_test.toPandas()['title'].unique())

### family_size

In [73]:
df = df.withColumn("family_size", df['SibSp']+df['Parch'])
df_test = df_test.withColumn("family_size", df_test['SibSp']+df_test['Parch'])

In [74]:
## bin the family size. 
@py_or_udf(returnType=StringType())
def family_group(size):
    """
    This funciton groups(loner, small, large) family based on family size
    """
    
    a = ''
    if (size <= 1):
        a = 'loner'
    elif (size <= 4):
        a = 'small'
    else:
        a = 'large'
    return a

In [75]:
df = df.withColumn("family_group", family_group(df['family_size']))
df_test = df_test.withColumn("family_group", family_group(df_test['family_size']))


### is_alone

In [76]:
@py_or_udf(returnType=IntegerType())
def is_alone(num):
    if num<2:
        return 1
    else:
        return 0

In [77]:
df = df.withColumn("is_alone", is_alone(df['family_size']))
df_test = df_test.withColumn("is_alone", is_alone(df_test["family_size"]))

### ticket

In [78]:
## dropping ticket column
df = df.drop('ticket')
df_test = df_test.drop("ticket")

### calculated_fare

In [79]:
from pyspark.sql.functions import expr, col, when, coalesce, lit

In [80]:
## here I am using a something similar to if and else statement, 
#when(condition, value_when_condition_met).otherwise(alt_condition)
df = df.withColumn(
    "calculated_fare", 
    when((col("Fare")/col("family_size")).isNull(), col('Fare'))
    .otherwise((col("Fare")/col("family_size"))))

In [81]:
df_test = df_test.withColumn(
    "calculated_fare", 
    when((col("Fare")/col("family_size")).isNull(), col('Fare'))
    .otherwise((col("Fare")/col("family_size"))))

### fare_group

In [82]:
@py_or_udf(returnType=StringType())
def fare_group(fare):
    """
    This function creates a fare group based on the fare provided
    """
    
    a= ''
    if fare <= 4:
        a = 'Very_low'
    elif fare <= 10:
        a = 'low'
    elif fare <= 20:
        a = 'mid'
    elif fare <= 45:
        a = 'high'
    else:
        a = "very_high"
    return a

In [83]:
df = df.withColumn("fare_group", fare_group(col("Fare")))
df_test = df_test.withColumn("fare_group", fare_group(col("Fare")))

# That's all for today. Let's come back tomorrow when we will learn how to apply machine learning with Pyspark

In [84]:
# Binarizing, Bucketing & Encoding

In [85]:
train = spark.read.csv('../input/titanic/train.csv', header = True, inferSchema=True)
test = spark.read.csv('../input/titanic/test.csv', header = True, inferSchema=True)

In [86]:
train.show()

In [87]:
# Binarzing
from pyspark.ml.feature import Binarizer
# Cast the data type to double
train = train.withColumn('SibSp', train['SibSp'].cast('double'))
# Create binarzing transform
bin = Binarizer(threshold=0.0, inputCol='SibSp', outputCol='SibSpBin')
# Apply the transform
train = bin.transform(train)

In [88]:
train.select('SibSp', 'SibSpBin').show(10)

In [89]:
# Bucketing
from pyspark.ml.feature import Bucketizer
# We are going to bucket the fare column
# Define the split
splits = [0,4,10,20,45, float('Inf')]

# Create bucketing transformer
buck = Bucketizer(splits=splits, inputCol='Fare', outputCol='FareB')

# Apply transformer
train = buck.transform(train)

In [90]:
train.toPandas().head(10)

In [91]:
# One Hot Encoding
# it is a two step process
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# Create indexer transformer for Sex Column

# Step 1: Create indexer for texts
stringIndexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

# fit transform
model = stringIndexer.fit(train)

# Apply transform
indexed = model.transform(train)

In [92]:
# Step 2: One Hot Encode
# Create encoder transformer
encoder = OneHotEncoder(inputCol='SexIndex', outputCol='Sex_Vec')

# fit model
model = encoder.fit(indexed)

# apply transform
encoded_df = model.transform(indexed)

encoded_df.toPandas().head()

<div class="alert alert-info">
    <h1>Resources</h1>
    <ul>
        <li><a href="https://docs.databricks.com/spark/latest/spark-sql/udf-python.html">User-defined functions - Python</a></li>
        <li><a href="https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87">Developing PySpark UDFs</a></li>
    </ul>
        <h1>Credits</h1>
    <ul>
        <li>To DataCamp, I have learned so much from DataCamp.</li>
        <li>To Jose Portilla, Such an amazing teacher with all of his resources</li>
    </ul>
    
</div>

<div class="alert alert-info">
<h4>If you like to discuss any other projects or just have a chat about data science topics, I'll be more than happy to connect with you on:</h4>
    <ul>
        <li><a href="https://www.linkedin.com/in/masumrumi/"><b>LinkedIn</b></a></li>
        <li><a href="https://github.com/masumrumi"><b>Github</b></a></li>
        <li><a href="https://masumrumi.com/"><b>masumrumi.com</b></a></li>
    </ul>

<p>This kernel will always be a work in progress. I will incorporate new concepts of data science as I comprehend them with each update. If you have any idea/suggestions about this notebook, please let me know. Any feedback about further improvements would be genuinely appreciated.</p>

<h1>If you have come this far, Congratulations!!</h1>

<h1>If this notebook helped you in any way or you liked it, please upvote and/or leave a comment!! :)</h1></div>

<div class="alert alert-info">
    <h1>Versions</h1>
    <ul>
        <li>Version 16</li>
    </ul>
    
</div>