<img src="http://data.freehdw.com/ships-titanic-vehicles-best.jpg"  Width="800">




This kernel will give a tutorial for starting out with PySpark using Titanic dataset. Let's get started. 


### Kernel Goals
<a id="aboutthiskernel"></a>
***
There are three primary goals of this kernel.
- <b>Provide a tutorial for someone who is starting out with pyspark.
- <b>Do an exploratory data analysis(EDA)</b> of titanic with visualizations and storytelling.  
- <b>Predict</b>: Use machine learning classification models to predict the chances of passengers survival.

### What is Spark, anyway?
Spark is a platform for cluster computing. Spark lets us spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up data makes it easier to work with very large datasets because each node only works with a small amount of data.
As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

Deciding whether or not Spark is the best solution for your problem takes some experience, but you can consider questions like:
* Is my data too big to work with on a single machine?
* Can my calculations be easily parallelized?



In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('../input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

../input/titanic/test.csv
../input/titanic/train.csv


In [3]:
## installing pyspark
!pip install pyspark



The first step in using Spark is connecting to a cluster. In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master that manages splitting up the data and the computations. The master is connected to the rest of the computers in the cluster, which are called worker. The master sends the workers data and calculations to run, and they send their results back to the master.

We definitely don't need may clusters for Titanic dataset. In addition to that, the syntax for running locally or using many clusters are pretty similar. To start working with Spark DataFrames, we first have to create a SparkSession object from SparkContext. We can think of the SparkContext as the connection to the cluster and SparkSession as the interface with that connection. Let's create a SparkSession. 

# Beginner Tutorial
This part is solely for beginners. I recommend starting from here to get a good understanding of the flow. 

In [4]:
## creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tutorial').getOrCreate()

24/05/24 19:44:46 WARN Utils: Your hostname, Masums-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.139 instead (on interface en0)
24/05/24 19:44:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/24 19:44:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Let's read the dataset. 

In [5]:
df_train = spark.read.csv('../input/titanic/train.csv', header = True, inferSchema=True)
df_test = spark.read.csv('../input/titanic/test.csv', header = True, inferSchema=True)

In [6]:
titanic_train = df_train.alias("titanic_train")

In [7]:
## So, what is df_train?
type(df_train)

pyspark.sql.dataframe.DataFrame

In [8]:
## As you can see it's a Spark dataframe. Let's take a look at the preview of the dataset. 
df_train.show(truncate=False)

+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                                   |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+-------------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                                |male  |22.0|1    |0    |A/5 21171       |7.25   |NULL |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                                 |female|26.0|0    |0    |STON/O2. 3101282|7.925  |NULL |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)           |female|35.0|1  

In [9]:
## It looks a bit messi. See what I did there? ;). Anyway, how about using .toPandas() for change. 
df_train.toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
888,889,0,3,"""Johnston, Miss. Catherine Helen """"Carrie""""""",female,,1,2,W./C. 6607,23.4500,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C


In [12]:
## how about a summary. 
result = df_train.describe().toPandas()

In [13]:
result

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [14]:
# getting the total row count
df_train.count()

891

In [15]:
# We can also convert a pandas dataframe to spark dataframe. Here is how we do it. 
print(f"Before: {type(result)}")
spark_temp = spark.createDataFrame(result)
print(f"After: {type(spark_temp)}")

Before: <class 'pandas.core.frame.DataFrame'>
After: <class 'pyspark.sql.dataframe.DataFrame'>


In [16]:
# pyspark version
spark_temp.show()

                                                                                

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                NULL|  NULL| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [18]:
# pandas version
result

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [19]:
# Cool, Let's print the schema of the df using .printSchema()
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [20]:
# similar approach
df_train.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

The data in the real world is not this clean. We often have to create our own schema and implement it. We will describe more about it in the future. Since we are talking about schema, are you wondering if you would be able to implement sql with Spark?. Yes, you can. 

One of the best advantage of Spark is that you can run sql commands to do analysis. If you are like that nifty co-worker of mine, you would probably want to use sql with spark. Let's do an example. 

In [23]:
## First, we need to register a sql temporary view.
df_train.createOrReplaceTempView("mytable");

## Then, we use spark.sql and write sql inside it, which returns a spark Dataframe.  
result = spark.sql("SELECT * FROM mytable ORDER BY Fare DESC LIMIT 10")
result.show(truncate=False)

+-----------+--------+------+-----------------------------------------+------+----+-----+-----+--------+--------+---------------+--------+
|PassengerId|Survived|Pclass|Name                                     |Sex   |Age |SibSp|Parch|Ticket  |Fare    |Cabin          |Embarked|
+-----------+--------+------+-----------------------------------------+------+----+-----+-----+--------+--------+---------------+--------+
|680        |1       |1     |Cardeza, Mr. Thomas Drake Martinez       |male  |36.0|0    |1    |PC 17755|512.3292|B51 B53 B55    |C       |
|259        |1       |1     |Ward, Miss. Anna                         |female|35.0|0    |0    |PC 17755|512.3292|NULL           |C       |
|738        |1       |1     |Lesurer, Mr. Gustave J                   |male  |35.0|0    |0    |PC 17755|512.3292|B101           |C       |
|89         |1       |1     |Fortune, Miss. Mabel Helen               |female|23.0|3    |2    |19950   |263.0   |C23 C25 C27    |S       |
|28         |0       |1    

Similarly we can also register another sql temp view. 

In [24]:
df_test.createOrReplaceTempView("df_test")

Now that we have registered two tables within this spark session, wondering how we can see which once are registered?

In [25]:
spark.catalog.listTables()

[Table(name='df_test', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='mytable', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [26]:
# similarly
spark.sql("SHOW views").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         | mytable|       true|
+---------+--------+-----------+



In [30]:
# or 
spark.sql("SHOW tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|         |  df_test|       true|
|         |  mytable|       true|
+---------+---------+-----------+



In [31]:
# We can also create spark dataframe out of these tables using spark.table
temp_table = spark.table("df_test")
print(type(temp_table))
temp_table.show(5)

<class 'pyspark.sql.dataframe.DataFrame'>
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| NULL|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| NULL|       S|
|        894|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| NULL|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| NULL|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| NULL|       S|
+-----------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [32]:
# pretty cool, We will dive deep in sql later. 
# Let's go back to dataFrame and do some nitty-gritty stuff. 
# What if want the column names only. 
df_train.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [33]:
# What about just a column?
df_train['Age']

Column<'Age'>

In [36]:
# similarly 
df_train.Age

Column<'Age'>

In [37]:
type(df_train['Age'])

pyspark.sql.column.Column

In [39]:
# Well, that's not what we pandas users have expected. 
# Yes, in order to get a column we need to use select().  
# df.select(df['Age']).show()
df_train.select('Age').show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|NULL|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|NULL|
|31.0|
|NULL|
+----+
only showing top 20 rows



In [44]:
# similarly...
df_train[['Age']].show()

+----+
| Age|
+----+
|22.0|
|38.0|
|26.0|
|35.0|
|35.0|
|NULL|
|54.0|
| 2.0|
|27.0|
|14.0|
| 4.0|
|58.0|
|20.0|
|39.0|
|14.0|
|55.0|
| 2.0|
|NULL|
|31.0|
|NULL|
+----+
only showing top 20 rows



In [40]:
## What if we want multiple columns?
df_train.select(['Age', 'Fare']).show()

+----+-------+
| Age|   Fare|
+----+-------+
|22.0|   7.25|
|38.0|71.2833|
|26.0|  7.925|
|35.0|   53.1|
|35.0|   8.05|
|NULL| 8.4583|
|54.0|51.8625|
| 2.0| 21.075|
|27.0|11.1333|
|14.0|30.0708|
| 4.0|   16.7|
|58.0|  26.55|
|20.0|   8.05|
|39.0| 31.275|
|14.0| 7.8542|
|55.0|   16.0|
| 2.0| 29.125|
|NULL|   13.0|
|31.0|   18.0|
|NULL|  7.225|
+----+-------+
only showing top 20 rows



In [45]:
# similarly 
df_train[['Age', 'Fare']].show()

+----+-------+
| Age|   Fare|
+----+-------+
|22.0|   7.25|
|38.0|71.2833|
|26.0|  7.925|
|35.0|   53.1|
|35.0|   8.05|
|NULL| 8.4583|
|54.0|51.8625|
| 2.0| 21.075|
|27.0|11.1333|
|14.0|30.0708|
| 4.0|   16.7|
|58.0|  26.55|
|20.0|   8.05|
|39.0| 31.275|
|14.0| 7.8542|
|55.0|   16.0|
| 2.0| 29.125|
|NULL|   13.0|
|31.0|   18.0|
|NULL|  7.225|
+----+-------+
only showing top 20 rows



In [46]:
# or 
df_train[df_train.Age, 
         df_train.Fare].show()

+----+-------+
| Age|   Fare|
+----+-------+
|22.0|   7.25|
|38.0|71.2833|
|26.0|  7.925|
|35.0|   53.1|
|35.0|   8.05|
|NULL| 8.4583|
|54.0|51.8625|
| 2.0| 21.075|
|27.0|11.1333|
|14.0|30.0708|
| 4.0|   16.7|
|58.0|  26.55|
|20.0|   8.05|
|39.0| 31.275|
|14.0| 7.8542|
|55.0|   16.0|
| 2.0| 29.125|
|NULL|   13.0|
|31.0|   18.0|
|NULL|  7.225|
+----+-------+
only showing top 20 rows



As you can see pyspark dataframe syntax is pretty simple with a lot of ways to implement. Which syntex is best implemented depends on what we are trying to accomplish. I will discuss more on this as we go on. Now let's see how we can access a row. 

In [47]:
df_train.head(1)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')]

In [48]:
type(df_train.head(1))

list

In [51]:
## returns a list. let's get the item in the list
row = df_train.head(1)[0]
row

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')

In [52]:
type(row)

pyspark.sql.types.Row

In [53]:
## row can be converted into dict using .asDict()
row.asDict()

{'PassengerId': 1,
 'Survived': 0,
 'Pclass': 3,
 'Name': 'Braund, Mr. Owen Harris',
 'Sex': 'male',
 'Age': 22.0,
 'SibSp': 1,
 'Parch': 0,
 'Ticket': 'A/5 21171',
 'Fare': 7.25,
 'Cabin': None,
 'Embarked': 'S'}

In [54]:
## Then the value can be accessed from the row dictionaly. 
row.asDict()['PassengerId']

1

In [55]:
## similarly
row.asDict()['Name']

'Braund, Mr. Owen Harris'

In [56]:
## let's say we want to change the name of a column. we can use withColumnRenamed
# df.withColumnRenamed('exsisting name', 'anticipated name');
df_train.withColumnRenamed("Age", "newA").limit(5).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|newA|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [57]:
# Let's say we want to modify a column, for example, add in this case; adding $20 with every fare. 
## df.withColumn('existing column', 'calculation with the column(we have to put df not just column)')
## so not df.withColumn('Fare', 'Fare' +20).show()
df_train.withColumn('Fare', df_train['Fare']+20).limit(5).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|  27.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|91.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282| 27.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   73.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|  28.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

Now this change isn't permanent since we are not assigning it to any variables. 

In [38]:
## let's say we want to get the average fare.
# we will use the "mean" function from pyspark.sql.functions(this is where all the functions are stored) and
# collect the data using ".collect()" instead of using .show()
# collect returns a list so we need to get the value from the list using index

In [39]:
from pyspark.sql.functions import mean
fare_mean = df_train.select(mean("Fare")).collect()
fare_mean[0][0]

32.2042079685746

In [40]:
fare_mean = fare_mean[0][0]
fare_mean

32.2042079685746

#### Filter

In [41]:
# What if we want to filter data and see all fare above average. 
# there are two approaches of this, we can use sql syntex/passing a string
# or just dataframe approach. 
df_train.filter("Fare > 32.20" ).limit(3).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+



In [42]:
# similarly 
df_train[df_train.Fare > 32.20].limit(3).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+



In [43]:
# or we can use the dataframe approach
df_train.filter(df_train['Fare'] > fare_mean).limit(3).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|PC 17599|71.2833|  C85|       C|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|  113803|   53.1| C123|       S|
|          7|       0|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|   17463|51.8625|  E46|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-------+-----+--------+



In [44]:
## What if we want to filter by multiple columns.
# passenger with below average fare with a sex equals male
temp_df = df_train.filter((df_train['Fare'] < fare_mean) &
          (df_train['Sex'] ==  'male')
         )
temp_df.show(5)

+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|   Ticket|  Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|A/5 21171|  7.25| NULL|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|   373450|  8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|male|NULL|    0|    0|   330877|8.4583| NULL|       Q|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|   349909|21.075| NULL|       S|
|         13|       0|     3|Saundercock, Mr. ...|male|20.0|    0|    0|A/5. 2151|  8.05| NULL|       S|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
only showing top 5 rows



In [45]:
# similarly 
df_train[(df_train.Fare < fare_mean) & 
         (df_train.Sex == "male")].show(5)

+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|   Ticket|  Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|A/5 21171|  7.25| NULL|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|   373450|  8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|male|NULL|    0|    0|   330877|8.4583| NULL|       Q|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|   349909|21.075| NULL|       S|
|         13|       0|     3|Saundercock, Mr. ...|male|20.0|    0|    0|A/5. 2151|  8.05| NULL|       S|
+-----------+--------+------+--------------------+----+----+-----+-----+---------+------+-----+--------+
only showing top 5 rows



In [46]:
# passenger with below average fare and are not male
filter1_less_than_mean_fare = df_train['Fare'] < fare_mean
filter2_sex_not_male = df_train['Sex'] != "male"
df_train.filter((filter1_less_than_mean_fare) &
                (filter2_sex_not_male)).show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| NULL|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736|30.0708| NULL|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|         PP 9549|   16.7|   G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|          113783|  26.55| C103|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0|      

In [47]:
# We can also apply it this way
# passenger with below fare and are not male
# creating filters
filter1_less_than_mean_fare = df_train['Fare'] < fare_mean
filter2_sex_not_male = df_train['Sex'] != "male"
# applying filters
df_train.filter(filter1_less_than_mean_fare).filter(filter2_sex_not_male).show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| NULL|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736|30.0708| NULL|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|         PP 9549|   16.7|   G6|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|          113783|  26.55| C103|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0|      

In [48]:
# we can also filter by using builtin functions.
# between
df_train.select("PassengerId", "Fare").filter(df_train.Fare.between(10,40)).show()

+-----------+-------+
|PassengerId|   Fare|
+-----------+-------+
|          8| 21.075|
|          9|11.1333|
|         10|30.0708|
|         11|   16.7|
|         12|  26.55|
|         14| 31.275|
|         16|   16.0|
|         17| 29.125|
|         18|   13.0|
|         19|   18.0|
|         21|   26.0|
|         22|   13.0|
|         24|   35.5|
|         25| 21.075|
|         26|31.3875|
|         31|27.7208|
|         34|   10.5|
|         39|   18.0|
|         40|11.2417|
|         42|   21.0|
+-----------+-------+
only showing top 20 rows



In [49]:
df_train.select("PassengerID", df_train.Fare.between(10,40)).show()

+-----------+-------------------------------+
|PassengerID|((Fare >= 10) AND (Fare <= 40))|
+-----------+-------------------------------+
|          1|                          false|
|          2|                          false|
|          3|                          false|
|          4|                          false|
|          5|                          false|
|          6|                          false|
|          7|                          false|
|          8|                           true|
|          9|                           true|
|         10|                           true|
|         11|                           true|
|         12|                           true|
|         13|                          false|
|         14|                           true|
|         15|                          false|
|         16|                           true|
|         17|                           true|
|         18|                           true|
|         19|                     

In [50]:
# contains
df_train.select("PassengerId", "Name").filter(df_train.Name.contains("Mr")).show()

+-----------+--------------------+
|PassengerId|                Name|
+-----------+--------------------+
|          1|Braund, Mr. Owen ...|
|          2|Cumings, Mrs. Joh...|
|          4|Futrelle, Mrs. Ja...|
|          5|Allen, Mr. Willia...|
|          6|    Moran, Mr. James|
|          7|McCarthy, Mr. Tim...|
|          9|Johnson, Mrs. Osc...|
|         10|Nasser, Mrs. Nich...|
|         13|Saundercock, Mr. ...|
|         14|Andersson, Mr. An...|
|         16|Hewlett, Mrs. (Ma...|
|         18|Williams, Mr. Cha...|
|         19|Vander Planke, Mr...|
|         20|Masselmani, Mrs. ...|
|         21|Fynney, Mr. Joseph J|
|         22|Beesley, Mr. Lawr...|
|         24|Sloper, Mr. Willi...|
|         26|Asplund, Mrs. Car...|
|         27|Emir, Mr. Farred ...|
|         28|Fortune, Mr. Char...|
+-----------+--------------------+
only showing top 20 rows



In [51]:
# startswith 
df_train.select("PassengerID", 'Sex').filter(df_train.Sex.startswith("fe")).show()

+-----------+------+
|PassengerID|   Sex|
+-----------+------+
|          2|female|
|          3|female|
|          4|female|
|          9|female|
|         10|female|
|         11|female|
|         12|female|
|         15|female|
|         16|female|
|         19|female|
|         20|female|
|         23|female|
|         25|female|
|         26|female|
|         29|female|
|         32|female|
|         33|female|
|         39|female|
|         40|female|
|         41|female|
+-----------+------+
only showing top 20 rows



In [52]:
# endswith
df_train.select("PassengerID", 'Ticket').filter(df_train.Ticket.endswith("50")).show()

+-----------+----------+
|PassengerID|    Ticket|
+-----------+----------+
|          5|    373450|
|         28|     19950|
|         89|     19950|
|        256|      2650|
|        342|     19950|
|        439|     19950|
|        537|    113050|
|        641|    350050|
|        671|     29750|
|        672|F.C. 12750|
|        685|     29750|
|        768|    364850|
|        807|    112050|
+-----------+----------+



In [53]:
# isin
df_train[df_train.PassengerId.isin([1,2,3])].show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+



In [54]:
# like
df_train[df_train.Name.like("Br%")].show()

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| NULL|       S|
|        195|       1|     1|Brown, Mrs. James...|female|44.0|    0|    0| PC 17610|27.7208|   B4|       C|
|        222|       0|     2|Bracken, Mr. James H|  male|27.0|    0|    0|   220367|   13.0| NULL|       S|
|        478|       0|     3|Braund, Mr. Lewis...|  male|29.0|    1|    0|     3460| 7.0458| NULL|       S|
|        615|       0|     3|Brocklebank, Mr. ...|  male|35.0|    0|    0|   364512|   8.05| NULL|       S|
|        671|       1|     2|Brown, Mrs. Thoma...|female|40.0|    1|    1|    29750|   39.0| NULL|       S|
|        685|       0|     2

In [55]:
# substr
df_train.select(df_train.Name.substr(1,5)).show()

+---------------------+
|substring(Name, 1, 5)|
+---------------------+
|                Braun|
|                Cumin|
|                Heikk|
|                Futre|
|                Allen|
|                Moran|
|                McCar|
|                Palss|
|                Johns|
|                Nasse|
|                Sands|
|                Bonne|
|                Saund|
|                Ander|
|                Vestr|
|                Hewle|
|                Rice,|
|                Willi|
|                Vande|
|                Masse|
+---------------------+
only showing top 20 rows



In [56]:
# similarly 
df_train[[df_train.Name.substr(1,5)]].show()

+---------------------+
|substring(Name, 1, 5)|
+---------------------+
|                Braun|
|                Cumin|
|                Heikk|
|                Futre|
|                Allen|
|                Moran|
|                McCar|
|                Palss|
|                Johns|
|                Nasse|
|                Sands|
|                Bonne|
|                Saund|
|                Ander|
|                Vestr|
|                Hewle|
|                Rice,|
|                Willi|
|                Vande|
|                Masse|
+---------------------+
only showing top 20 rows



One interesting thing about substr method is that we can't implement the following syntax while working with substr. This syntax is best implemented in a filter when the return values are boolean not a column.

In [57]:
# df_train[df_train.Name.substr(1,5)].show()

#### GroupBy

In [58]:
## Let's group by Pclass and get the average fare price per Pclass.  
df_train.groupBy("Pclass").mean().toPandas()

Unnamed: 0,Pclass,avg(PassengerId),avg(Survived),avg(Pclass),avg(Age),avg(SibSp),avg(Parch),avg(Fare)
0,1,461.597222,0.62963,1.0,38.233441,0.416667,0.356481,84.154687
1,3,439.154786,0.242363,3.0,25.14062,0.615071,0.393075,13.67555
2,2,445.956522,0.472826,2.0,29.87763,0.402174,0.380435,20.662183


In [59]:
## let's just look at the Pclass and avg(Fare)
df_train.groupBy("Pclass").mean().select('Pclass', 'avg(Fare)').show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [60]:
# Alternative way
df_train.groupBy("Pclass").mean("Fare").show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [61]:
## What if we want just the average of all fare, we can use .agg with the dataframe. 
df_train.agg({'Fare':'mean'}).show()

+----------------+
|       avg(Fare)|
+----------------+
|32.2042079685746|
+----------------+



In [62]:
## another way this can be done is by importing "mean" funciton from pyspark.sql.functions
from pyspark.sql.functions import mean
df_train.select(mean("Fare")).show()

+----------------+
|       avg(Fare)|
+----------------+
|32.2042079685746|
+----------------+



In [63]:
## we can also combine the few previous approaches to get similar results. 
temp = df_train.groupBy("Pclass")
temp.agg({"Fare": 'mean'}).show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [64]:
# What if we want to format the results. 
# for example,
# I want to rename the column. this will be accomplished using .alias() method.  
# I want to format the number with only two decimals. this can be done using "format_number"
from pyspark.sql.functions import format_number
temp = df_train.groupBy("Pclass")
temp = temp.agg({"Fare": 'mean'})
temp.select('Pclass', format_number("avg(Fare)", 2).alias("average fare")).show()

+------+------------+
|Pclass|average fare|
+------+------------+
|     1|       84.15|
|     3|       13.68|
|     2|       20.66|
+------+------------+



#### OrderBy
There are many built in functions that we can use to do orderby in spark. Let's look at some of those. 

In [65]:
## What if I want to order by Fare in ascending order. 
df_train.orderBy("Fare").limit(20).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,278,0,2,"""Parkes, Mr. Francis """"Frank""""""",male,,0,0,239853,0.0,,S
1,303,0,3,"Johnson, Mr. William Cahoone Jr",male,19.0,0,0,LINE,0.0,,S
2,180,0,3,"Leonard, Mr. Lionel",male,36.0,0,0,LINE,0.0,,S
3,272,1,3,"Tornquist, Mr. William Henry",male,25.0,0,0,LINE,0.0,,S
4,264,0,1,"Harrison, Mr. William",male,40.0,0,0,112059,0.0,B94,S
5,482,0,2,"""Frost, Mr. Anthony Wood """"Archie""""""",male,,0,0,239854,0.0,,S
6,414,0,2,"Cunningham, Mr. Alfred Fleming",male,,0,0,239853,0.0,,S
7,467,0,2,"Campbell, Mr. William",male,,0,0,239853,0.0,,S
8,598,0,3,"Johnson, Mr. Alfred",male,49.0,0,0,LINE,0.0,,S
9,634,0,1,"Parr, Mr. William Henry Marsh",male,,0,0,112052,0.0,,S


In [66]:
# similarly
df_train.orderBy(df_train.Fare.asc()).show()

+-----------+--------+------+--------------------+----+----+-----+-----+------+------+-----------+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|Ticket|  Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+------+------+-----------+--------+
|        303|       0|     3|Johnson, Mr. Will...|male|19.0|    0|    0|  LINE|   0.0|       NULL|       S|
|        278|       0|     2|"Parkes, Mr. Fran...|male|NULL|    0|    0|239853|   0.0|       NULL|       S|
|        272|       1|     3|Tornquist, Mr. Wi...|male|25.0|    0|    0|  LINE|   0.0|       NULL|       S|
|        264|       0|     1|Harrison, Mr. Wil...|male|40.0|    0|    0|112059|   0.0|        B94|       S|
|        482|       0|     2|"Frost, Mr. Antho...|male|NULL|    0|    0|239854|   0.0|       NULL|       S|
|        180|       0|     3| Leonard, Mr. Lionel|male|36.0|    0|    0|  LINE|   0.0|       NULL|       S|
|        414|       0|     2

In [67]:
# What about descending order
# df.orderBy(df['Fare'].desc()).limit(5).show()
# dot notation
df_train.orderBy(df_train.Fare.desc()).limit(5).show()

+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|  Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+--------+-----------+--------+
|        738|       1|     1|Lesurer, Mr. Gust...|  male|35.0|    0|    0|PC 17755|512.3292|       B101|       C|
|        680|       1|     1|Cardeza, Mr. Thom...|  male|36.0|    0|    1|PC 17755|512.3292|B51 B53 B55|       C|
|        259|       1|     1|    Ward, Miss. Anna|female|35.0|    0|    0|PC 17755|512.3292|       NULL|       C|
|        439|       0|     1|   Fortune, Mr. Mark|  male|64.0|    1|    4|   19950|   263.0|C23 C25 C27|       S|
|         89|       1|     1|Fortune, Miss. Ma...|female|23.0|    3|    2|   19950|   263.0|C23 C25 C27|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+--------+-----

In [68]:
df_train.filter(df_train.Embarked.isNull()).count()

2

In [69]:
df_train.select('PassengerID','Embarked').orderBy(df_train.Embarked.asc_nulls_first()).show()

+-----------+--------+
|PassengerID|Embarked|
+-----------+--------+
|         62|    NULL|
|        830|    NULL|
|        204|       C|
|         74|       C|
|         66|       C|
|         97|       C|
|         65|       C|
|         98|       C|
|         31|       C|
|        112|       C|
|         35|       C|
|        115|       C|
|         40|       C|
|        119|       C|
|         44|       C|
|        123|       C|
|         53|       C|
|        126|       C|
|         58|       C|
|        129|       C|
+-----------+--------+
only showing top 20 rows



In [70]:
df_train.select('PassengerID','Embarked').orderBy(df_train.Embarked.asc_nulls_last()).tail(5)

[Row(PassengerID=887, Embarked='S'),
 Row(PassengerID=888, Embarked='S'),
 Row(PassengerID=889, Embarked='S'),
 Row(PassengerID=62, Embarked=None),
 Row(PassengerID=830, Embarked=None)]

In [71]:
## How do we deal with missing values. 
# df.na.drop(how=("any"/"all"), thresh=(1,2,3,4,5...))
df_train.na.drop(how="any").limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
1,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
2,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
3,11,1,3,"Sandstrom, Miss. Marguerite Rut",female,4.0,1,1,PP 9549,16.7,G6,S
4,12,1,1,"Bonnell, Miss. Elizabeth",female,58.0,0,0,113783,26.55,C103,S


# Advanced Tutorial


### Spark Catalog

In [72]:
# If you have used Spark for a while now, this is a good time to learn about spark Catalog.
# you can also totally skip this section since it is totally independed of what follows.

In [73]:
# get all the databases in the database. 
spark.catalog.listDatabases()

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/Users/masumrumi/Projects/data_science/pyspark_titanic/working/spark-warehouse')]

In [74]:
# get the name of the current database
spark.catalog.currentDatabase()

'default'

In [75]:
## lists tables
spark.catalog.listTables()

[Table(name='df_test', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='mytable', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [76]:
# add a table to the catalog
df_train.createOrReplaceTempView("df_train")

In [77]:
# list tables
spark.catalog.listTables()

[Table(name='df_test', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='df_train', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='mytable', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [78]:
# Caching
# cached table "df_train"
spark.catalog.cacheTable("df_train")

In [79]:
# checks if the table is cached
spark.catalog.isCached("df_train")

True

In [80]:
spark.catalog.isCached("df_test")

False

In [81]:
# lets cahche df_test as well
spark.catalog.cacheTable("df_test")

In [82]:
spark.catalog.isCached("df_test")

True

In [83]:
# let's uncache df_train
spark.catalog.uncacheTable("df_train")

In [84]:
spark.catalog.isCached("df_train")

False

In [85]:
spark.catalog.isCached("df_test")

True

In [86]:
# How about clearing all cached tables at once. 
spark.catalog.clearCache()

In [87]:
spark.catalog.isCached("df_train")

False

In [88]:
# creating a global temp view
df_train.createGlobalTempView("df_train")

In [89]:
# listing all views in global_temp
spark.sql("SHOW VIEWS IN global_temp;").show()

+-----------+--------+-----------+
|  namespace|viewName|isTemporary|
+-----------+--------+-----------+
|global_temp|df_train|       true|
|           | df_test|       true|
|           |df_train|       true|
|           | mytable|       true|
+-----------+--------+-----------+



In [90]:
# dropping a table. 
spark.catalog.dropGlobalTempView("df_train")

True

In [91]:
# checking that global temp view is dropped.
spark.sql("SHOW VIEWS IN global_temp;").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         |df_train|       true|
|         | mytable|       true|
+---------+--------+-----------+



In [92]:
spark.catalog.dropTempView("df_train")

True

In [93]:
# checking that global temp view is dropped.
spark.sql("SHOW VIEWS IN global_temp;").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         | mytable|       true|
+---------+--------+-----------+



In [94]:
spark.sql("SHOW VIEWS").show()

+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
|         | df_test|       true|
|         | mytable|       true|
+---------+--------+-----------+



## Dealing with Missing Values
### Cabin

In [95]:
# filling the null values in cabin with "N".
# df.fillna(value, subset=[]);
df_train = df_train.na.fill('N', subset=['Cabin'])
df_test = df_test.na.fill('N', subset=['Cabin'])

### Fare

In [96]:
## how do we find out the rows with missing values?
# we can use .where(condition) with .isNull()
df_test.where(df_test['Fare'].isNull()).show()

+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|              Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+
|       1044|     3|Storey, Mr. Thomas|male|60.5|    0|    0|  3701|NULL|    N|       S|
+-----------+------+------------------+----+----+-----+-----+------+----+-----+--------+



Here, We can take the average of the **Fare** column to fill in the NaN value. However, for the sake of learning and practicing, we will try something else. We can take the average of the values where **Pclass** is ***3***, **Sex** is ***male*** and **Embarked** is ***S***

In [97]:
missing_value = df_test.filter(
    (df_test['Pclass'] == 3) &
    (df_test.Embarked == 'S') &
    (df_test.Sex == "male")
)
## filling in the null value in the fare column using Fare mean. 
df_test = df_test.na.fill(
    missing_value.select(mean('Fare')).collect()[0][0],
    subset=['Fare']
)

In [98]:
# Checking
df_test.where(df_test['Fare'].isNull()).show()

+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+



### Embarked

In [99]:
df_train.where(df_train['Embarked'].isNull()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+
|         62|       1|     1| Icard, Miss. Amelie|female|38.0|    0|    0|113572|80.0|  B28|    NULL|
|        830|       1|     1|Stone, Mrs. Georg...|female|62.0|    0|    0|113572|80.0|  B28|    NULL|
+-----------+--------+------+--------------------+------+----+-----+-----+------+----+-----+--------+



In [100]:
## Replacing the null values in the Embarked column with the mode. 
df_train = df_train.na.fill('C', subset=['Embarked'])

In [101]:
## checking
df_train.where(df_train['Embarked'].isNull()).show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+



In [102]:
df_test.where(df_test.Embarked.isNull()).show()

+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+



## Feature Engineering
### Cabin

In [103]:
## this is a code to create a wrapper for function, that works for both python and Pyspark.
from typing import Callable
from pyspark.sql import Column
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, IntegerType, ArrayType, DataType
class py_or_udf:
    def __init__(self, returnType : DataType=StringType()):
        self.spark_udf_type = returnType
        
    def __call__(self, func : Callable):
        def wrapped_func(*args, **kwargs):
            if any([isinstance(arg, Column) for arg in args]) or \
                any([isinstance(vv, Column) for vv in kwargs.values()]):
                return udf(func, self.spark_udf_type)(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        return wrapped_func

    
@py_or_udf(returnType=StringType())
def first_char(col):
    return col[0]
    

In [104]:
df_train = df_train.withColumn('Cabin', first_char(df_train['Cabin']))

In [105]:
df_test = df_test.withColumn('Cabin', first_char(df_test['Cabin']))

In [106]:
df_train.limit(5).toPandas()

                                                                                

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,N,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,N,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,N,S


We can use the average of the fare column We can use pyspark's ***groupby*** function to get the mean fare of each cabin letter.

In [107]:
df_train.groupBy('Cabin').mean("Fare").show()

+-----+------------------+
|Cabin|         avg(Fare)|
+-----+------------------+
|    F| 18.69679230769231|
|    E|46.026693749999986|
|    T|              35.5|
|    B|113.50576382978724|
|    D| 57.24457575757576|
|    C|100.15134067796612|
|    A|39.623886666666664|
|    N|  19.1573253275109|
|    G|          13.58125|
+-----+------------------+



Now, these mean can help us determine the unknown cabins, if we compare each unknown cabin rows with the given mean's above. Let's write a simple function so that we can give cabin names based on the means. 

In [108]:
@py_or_udf(returnType=StringType())
def cabin_estimator(i):
    """Grouping cabin feature by the first letter"""
    a = 0
    if i<16:
        a = "G"
    elif i>=16 and i<27:
        a = "F"
    elif i>=27 and i<38:
        a = "T"
    elif i>=38 and i<47:
        a = "A"
    elif i>= 47 and i<53:
        a = "E"
    elif i>= 53 and i<54:
        a = "D"
    elif i>=54 and i<116:
        a = 'C'
    else:
        a = "B"
    return a

In [109]:
## separating data where Cabin == 'N', remeber we used 'N' for Null. 
df_withN = df_train.filter(df_train['Cabin'] == 'N')
df2 = df_train.filter(df_train['Cabin'] != 'N')

## replacing 'N' using cabin estimated function. 
df_withN = df_withN.withColumn('Cabin', cabin_estimator(df_withN['Fare']))

# putting the dataframe back together. 
df_train = df_withN.union(df2).orderBy('PassengerId') 

In [110]:
#let's do the same for test set
df_testN = df_test.filter(df_test['Cabin'] == 'N')
df_testNoN = df_test.filter(df_test['Cabin'] != 'N')
df_testN = df_testN.withColumn('Cabin', cabin_estimator(df_testN['Fare']))
df_test = df_testN.union(df_testNoN).orderBy('PassengerId')

### Name

In [111]:
## creating UDF functions
@py_or_udf(returnType=IntegerType())
def name_length(name):
    return len(name)


@py_or_udf(returnType=StringType())
def name_length_group(size):
    a = ''
    if (size <=20):
        a = 'short'
    elif (size <=35):
        a = 'medium'
    elif (size <=45):
        a = 'good'
    else:
        a = 'long'
    return a

In [112]:
## getting the name length from name. 
df_train = df_train.withColumn("name_length", name_length(df_train['Name']))

## grouping based on name length. 
df_train = df_train.withColumn("nLength_group", name_length_group(df_train['name_length']))

In [113]:
## Let's do the same for test set. 
df_test = df_test.withColumn("name_length", name_length(df_test['Name']))

df_test = df_test.withColumn("nLength_group", name_length_group(df_test['name_length']))

### Title

In [114]:
## this function helps getting the title from the name. 
@py_or_udf(returnType=StringType())
def get_title(name):
    return name.split('.')[0].split(',')[1].strip()

df_train = df_train.withColumn("title", get_title(df_train['Name']))
df_test = df_test.withColumn('title', get_title(df_test['Name']))

In [115]:
## we are writing a function that can help us modify title column
@py_or_udf(returnType=StringType())
def fuse_title1(feature):
    """
    This function helps modifying the title column
    """
    if feature in ['the Countess','Capt','Lady','Sir','Jonkheer','Don','Major','Col', 'Rev', 'Dona', 'Dr']:
        return 'rare'
    elif feature in ['Ms', 'Mlle']:
        return 'Miss'
    elif feature == 'Mme':
        return 'Mrs'
    else:
        return feature

In [116]:
df_train = df_train.withColumn("title", fuse_title1(df_train["title"]))

In [117]:
df_test = df_test.withColumn("title", fuse_title1(df_test['title']))

In [118]:
print(df_train.toPandas()['title'].unique())
print(df_test.toPandas()['title'].unique())

['Mr' 'Mrs' 'Miss' 'Master' 'rare']
['Mr' 'Mrs' 'Miss' 'Master' 'rare']


### family_size

In [119]:
df_train = df_train.withColumn("family_size", df_train['SibSp']+df_train['Parch'])
df_test = df_test.withColumn("family_size", df_test['SibSp']+df_test['Parch'])

In [120]:
## bin the family size. 
@py_or_udf(returnType=StringType())
def family_group(size):
    """
    This funciton groups(loner, small, large) family based on family size
    """
    
    a = ''
    if (size <= 1):
        a = 'loner'
    elif (size <= 4):
        a = 'small'
    else:
        a = 'large'
    return a

In [121]:
df_train = df_train.withColumn("family_group", family_group(df_train['family_size']))
df_test = df_test.withColumn("family_group", family_group(df_test['family_size']))


### is_alone

In [122]:
@py_or_udf(returnType=IntegerType())
def is_alone(num):
    if num<2:
        return 1
    else:
        return 0

In [123]:
df_train = df_train.withColumn("is_alone", is_alone(df_train['family_size']))
df_test = df_test.withColumn("is_alone", is_alone(df_test["family_size"]))

### ticket

In [124]:
## dropping ticket column
df_train = df_train.drop('ticket')
df_test = df_test.drop("ticket")

### calculated_fare

In [125]:
from pyspark.sql.functions import expr, col, when, coalesce, lit

In [126]:
## here I am using a something similar to if and else statement, 
#when(condition, value_when_condition_met).otherwise(alt_condition)
df_train = df_train.withColumn(
    "calculated_fare", 
    when((col("Fare")/col("family_size")).isNull(), col('Fare'))
    .otherwise((col("Fare")/col("family_size"))))

In [127]:
df_test = df_test.withColumn(
    "calculated_fare", 
    when((col("Fare")/col("family_size")).isNull(), col('Fare'))
    .otherwise((col("Fare")/col("family_size"))))

### fare_group

In [128]:
@py_or_udf(returnType=StringType())
def fare_group(fare):
    """
    This function creates a fare group based on the fare provided
    """
    
    a= ''
    if fare <= 4:
        a = 'Very_low'
    elif fare <= 10:
        a = 'low'
    elif fare <= 20:
        a = 'mid'
    elif fare <= 45:
        a = 'high'
    else:
        a = "very_high"
    return a

In [129]:
df_train = df_train.withColumn("fare_group", fare_group(col("Fare")))
df_test = df_test.withColumn("fare_group", fare_group(col("Fare")))

# That's all for today. Let's come back tomorrow when we will learn how to apply machine learning with Pyspark

In [130]:
# Binarizing, Bucketing & Encoding

In [131]:
train = spark.read.csv('../input/titanic/train.csv', header = True, inferSchema=True)
test = spark.read.csv('../input/titanic/test.csv', header = True, inferSchema=True)

In [132]:
train.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [133]:
# Binarzing
from pyspark.ml.feature import Binarizer
# Cast the data type to double
train = train.withColumn('SibSp', train['SibSp'].cast('double'))
# Create binarzing transform
bin = Binarizer(threshold=0.0, inputCol='SibSp', outputCol='SibSpBin')
# Apply the transform
train = bin.transform(train)

In [134]:
train.select('SibSp', 'SibSpBin').show(10)

+-----+--------+
|SibSp|SibSpBin|
+-----+--------+
|  1.0|     1.0|
|  1.0|     1.0|
|  0.0|     0.0|
|  1.0|     1.0|
|  0.0|     0.0|
|  0.0|     0.0|
|  0.0|     0.0|
|  3.0|     1.0|
|  0.0|     0.0|
|  1.0|     1.0|
+-----+--------+
only showing top 10 rows



In [135]:
# Bucketing
from pyspark.ml.feature import Bucketizer
# We are going to bucket the fare column
# Define the split
splits = [0,4,10,20,45, float('Inf')]

# Create bucketing transformer
buck = Bucketizer(splits=splits, inputCol='Fare', outputCol='FareB')

# Apply transformer
train = buck.transform(train)

In [136]:
train.toPandas().head(10)

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,SibSpBin,FareB
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1.0,0,A/5 21171,7.25,,S,1.0,1.0
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1.0,0,PC 17599,71.2833,C85,C,1.0,4.0
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0.0,0,STON/O2. 3101282,7.925,,S,0.0,1.0
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1.0,0,113803,53.1,C123,S,1.0,4.0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0.0,0,373450,8.05,,S,0.0,1.0
5,6,0,3,"Moran, Mr. James",male,,0.0,0,330877,8.4583,,Q,0.0,1.0
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0.0,0,17463,51.8625,E46,S,0.0,4.0
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3.0,1,349909,21.075,,S,1.0,3.0
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0.0,2,347742,11.1333,,S,0.0,2.0
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1.0,0,237736,30.0708,,C,1.0,3.0


In [137]:
# One Hot Encoding
# it is a two step process
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# Create indexer transformer for Sex Column

# Step 1: Create indexer for texts
stringIndexer = StringIndexer(inputCol='Sex', outputCol='SexIndex')

# fit transform
model = stringIndexer.fit(train)

# Apply transform
indexed = model.transform(train)

In [138]:
# Step 2: One Hot Encode
# Create encoder transformer
encoder = OneHotEncoder(inputCol='SexIndex', outputCol='Sex_Vec')

# fit model
model = encoder.fit(indexed)

# apply transform
encoded_df = model.transform(indexed)

encoded_df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,SibSpBin,FareB,SexIndex,Sex_Vec
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1.0,0,A/5 21171,7.25,,S,1.0,1.0,0.0,(1.0)
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1.0,0,PC 17599,71.2833,C85,C,1.0,4.0,1.0,(0.0)
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0.0,0,STON/O2. 3101282,7.925,,S,0.0,1.0,1.0,(0.0)
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1.0,0,113803,53.1,C123,S,1.0,4.0,1.0,(0.0)
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0.0,0,373450,8.05,,S,0.0,1.0,0.0,(1.0)


<div class="alert alert-info">
    <h1>Resources</h1>
    <ul>
        <li><a href="https://docs.databricks.com/spark/latest/spark-sql/udf-python.html">User-defined functions - Python</a></li>
        <li><a href="https://medium.com/@ayplam/developing-pyspark-udfs-d179db0ccc87">Developing PySpark UDFs</a></li>
    </ul>
        <h1>Credits</h1>
    <ul>
        <li>To DataCamp, I have learned so much from DataCamp.</li>
        <li>To Jose Portilla, Such an amazing teacher with all of his resources</li>
    </ul>
    
</div>

<div class="alert alert-info">
<h4>If you like to discuss any other projects or just have a chat about data science topics, I'll be more than happy to connect with you on:</h4>
    <ul>
        <li><a href="https://www.linkedin.com/in/masumrumi/"><b>LinkedIn</b></a></li>
        <li><a href="https://github.com/masumrumi"><b>Github</b></a></li>
        <li><a href="https://masumrumi.com/"><b>masumrumi.com</b></a></li>
        <li><a href="https://www.youtube.com/channel/UC1mPjGyLcZmsMgZ8SJgrfdw"><b>Youtube</b></a></li>
    </ul>

<p>This kernel will always be a work in progress. I will incorporate new concepts of data science as I comprehend them with each update. If you have any idea/suggestions about this notebook, please let me know. Any feedback about further improvements would be genuinely appreciated.</p>

<h1>If you have come this far, Congratulations!!</h1>

<h1>If this notebook helped you in any way or you liked it, please upvote and/or leave a comment!! :)</h1></div>

<div class="alert alert-info">
    <h1>Versions</h1>
    <ul>
        <li>Version 16</li>
    </ul>
    
</div>

<div class="alert alert-danger">
    <h1>Work Area</h1>
</div>

### Other DataFrame Methods

In [139]:
df_train.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|       1|     3|Heikkinen, Miss.

In [140]:
# agg
df_train.agg({"Age" : "min"}).show()

+--------+
|min(Age)|
+--------+
|    0.42|
+--------+



In [141]:
# agg
from pyspark.sql import functions as F
df_train.groupBy("Sex").agg(
    F.min("Age").name("min_age"), 
    F.max("Age").alias("max_age")).show()

+------+-------+-------+
|   Sex|min_age|max_age|
+------+-------+-------+
|female|   0.75|   63.0|
|  male|   0.42|   80.0|
+------+-------+-------+



In [142]:
# colRegex
df_train.select(df_train.colRegex("`(Sex)?+.+`")).show(5)

+-----------+--------+------+--------------------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|       1|     3|Heikkinen, Miss. ...|26.0|    0|    0|  7.925|    G

In [143]:
# distinct
df_train[['Pclass', 'Sex']].distinct().show()

+------+------+
|Pclass|   Sex|
+------+------+
|     2|female|
|     3|  male|
|     1|  male|
|     3|female|
|     1|female|
|     2|  male|
+------+------+



In [144]:
# another way
# dropDuplicates
df_train[['Pclass', 'Sex']].dropDuplicates().show()

+------+------+
|Pclass|   Sex|
+------+------+
|     2|female|
|     3|  male|
|     1|  male|
|     3|female|
|     1|female|
|     2|  male|
+------+------+



In [145]:
# beware, this is probably not something we want when we try to do dropDuplicates
df_train.dropDuplicates(subset=['Pclass']).show()

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|30.0708|    T|       C|         35|       medium|  Mrs|          1|       loner|       1|        30.0708|      high|
|          1|       0|     3|Braund, Mr. Owen

In [146]:
# drop_dupllicates()
# drop_duplicates() is an alias of dropDuplicates()
df_train[['Pclass', 'Sex']].drop_duplicates().show()

+------+------+
|Pclass|   Sex|
+------+------+
|     2|female|
|     3|  male|
|     1|  male|
|     3|female|
|     1|female|
|     2|  male|
+------+------+



In [147]:
# drop
# dropping a column
df_train.drop('Name').show(5)

+-----------+--------+------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|       0|     3|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|       1|     1|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|       1|     3|female|26.0|    0|    0|  7.925|    G|       S|         22|       medium| Miss|          0|       loner|       1|        

In [148]:
# drop
# dropping multiple columns
df_train.drop("name", "Survived").show(5)

+-----------+------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Pclass|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|     3|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|     1|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|     3|female|26.0|    0|    0|  7.925|    G|       S|         22|       medium| Miss|          0|       loner|       1|          7.925|       low|
|          4|     1|female|35.0|  

In [149]:
# dropna
df_train.dropna(how="any", subset=["Age"]).count()

714

In [150]:
#similarly
df_train.na.drop(how="any", subset=['Age']).count()

714

In [151]:
# exceptAll
# temp dataframes
df1 = spark.createDataFrame(
        [("a", 1), ("a", 1), ("a", 1), ("a", 2), ("b",  3), ("c", 4)], ["C1", "C2"])
df2 = spark.createDataFrame([("a", 1),("a", 1), ("b", 3)], ["C1", "C2"])

In [152]:
df1.show()

                                                                                

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  a|  1|
|  a|  2|
|  b|  3|
|  c|  4|
+---+---+



In [153]:
df2.show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+



In [154]:
df1.exceptAll(df2).show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  2|
|  c|  4|
+---+---+



In [155]:
# intersect
df1.intersect(df2).show()

+---+---+
| C1| C2|
+---+---+
|  b|  3|
|  a|  1|
+---+---+



In [156]:
# intersectAll
# intersectAll preserves the duplicates. 
df1.intersectAll(df2).show()

+---+---+
| C1| C2|
+---+---+
|  a|  1|
|  a|  1|
|  b|  3|
+---+---+



In [157]:
# Returns True if the collect() and take() methods can be run locally
df_train.isLocal()

False

In [158]:
## fillna
df_train.fillna("N", subset=['Cabin']).show()

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+------------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group| title|family_size|family_group|is_alone|   calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+------------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|    Mr|          1|       loner|       1|              7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|   Mrs|          1|       loner|       1|           71.2833| very_high|
|          3|       1|   

In [159]:
# similarly
# dataFrame.na.fill() is alias of dataFrame.fillna()
df_train.na.fill("N", subset=['Cabin']).show()

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+------------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group| title|family_size|family_group|is_alone|   calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+------------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|    Mr|          1|       loner|       1|              7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|   Mrs|          1|       loner|       1|           71.2833| very_high|
|          3|       1|   

In [160]:
age_mean = df_train.agg({"Age": "mean"}).collect()[0][0]

In [161]:
age_mean

29.69911764705882

In [162]:
df_train.fillna({"Age": age_mean, "Cabin": "N"})[['Age', "Cabin"]].show(10)

+-----------------+-----+
|              Age|Cabin|
+-----------------+-----+
|             22.0|    G|
|             38.0|    C|
|             26.0|    G|
|             35.0|    C|
|             35.0|    G|
|29.69911764705882|    G|
|             54.0|    E|
|              2.0|    F|
|             27.0|    G|
|             14.0|    T|
+-----------------+-----+
only showing top 10 rows



In [163]:
# first
df_train.first()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Fare=7.25, Cabin='G', Embarked='S', name_length=23, nLength_group='medium', title='Mr', family_size=1, family_group='loner', is_alone=1, calculated_fare=7.25, fare_group='low')

In [164]:
def f(passenger):
    print(passenger.Name)

In [165]:
# foreach
# this prints out in the terminal. 
df_train.foreach(f)

Braund, Mr. Owen Harris
Cumings, Mrs. John Bradley (Florence Briggs Thayer)
Heikkinen, Miss. Laina
Futrelle, Mrs. Jacques Heath (Lily May Peel)
Allen, Mr. William Henry
Moran, Mr. James
McCarthy, Mr. Timothy J
Palsson, Master. Gosta Leonard
Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)
Nasser, Mrs. Nicholas (Adele Achem)
Sandstrom, Miss. Marguerite Rut
Bonnell, Miss. Elizabeth
Saundercock, Mr. William Henry
Andersson, Mr. Anders Johan
Vestrom, Miss. Hulda Amanda Adolfina
Hewlett, Mrs. (Mary D Kingcome) 
Rice, Master. Eugene
Williams, Mr. Charles Eugene
Vander Planke, Mrs. Julius (Emelia Maria Vandemoortele)
Masselmani, Mrs. Fatima
Fynney, Mr. Joseph J
Beesley, Mr. Lawrence
"McGowan, Miss. Anna ""Annie"""
Sloper, Mr. William Thompson
Palsson, Miss. Torborg Danira
Asplund, Mrs. Carl Oscar (Selma Augusta Emilia Johansson)
Emir, Mr. Farred Chehab
Fortune, Mr. Charles Alexander
"O'Dwyer, Miss. Ellen ""Nellie"""
Todoroff, Mr. Lalio
Uruchurtu, Don. Manuel E
Spencer, Mrs. William Augustus 

In [166]:
# freqItems
# this function is meant for exploratory data analysis.
df_train.freqItems(cols=["Cabin"]).show()

+--------------------+
|     Cabin_freqItems|
+--------------------+
|[D, A, B, E, T, C...|
+--------------------+



In [167]:
# groupBy
# pandas value_counts() equivalent. 
df_train.groupBy("Fare").count().orderBy("count", ascending=False).show()

+------+-----+
|  Fare|count|
+------+-----+
|  8.05|   43|
|  13.0|   42|
|7.8958|   38|
|  7.75|   34|
|  26.0|   31|
|  10.5|   24|
| 7.925|   18|
| 7.775|   16|
|   0.0|   15|
|7.2292|   15|
| 26.55|   15|
|  7.25|   13|
|7.8542|   13|
|8.6625|   13|
| 7.225|   12|
|   9.5|    9|
|  16.1|    9|
| 24.15|    8|
|  15.5|    8|
|31.275|    7|
+------+-----+
only showing top 20 rows



In [168]:
df_train.groupBy(['Sex', 'Pclass']).count().show()

+------+------+-----+
|   Sex|Pclass|count|
+------+------+-----+
|  male|     3|  347|
|female|     3|  144|
|female|     1|   94|
|female|     2|   76|
|  male|     2|  108|
|  male|     1|  122|
+------+------+-----+



In [169]:
df_train.hint("broadcast").show()

23/10/26 20:23:34 WARN HintErrorLogger: A join hint (strategy=broadcast) is specified but it is not part of a join relation.


+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+------------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group| title|family_size|family_group|is_alone|   calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+------------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|    Mr|          1|       loner|       1|              7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|   Mrs|          1|       loner|       1|           71.2833| very_high|
|          3|       1|   

In [170]:
# isStreaming
# Returns True if this DataFrame contains one or more sources that continuously return data as it arrives.
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.isStreaming.html
df_train.isStreaming

False

In [171]:
# sort/orderBy
df_train.sort('Survived', ascending = False).show()

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group| title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+---------------+----------+
|        124|       1|     2| Webber, Miss. Susan|female|32.5|    0|    0|   13.0|    E|       S|         19|        short|  Miss|          0|       loner|       1|           13.0|       mid|
|         33|       1|     3|Glynn, Miss. Mary...|female|NULL|    0|    0|   7.75|    G|       Q|         24|       medium|  Miss|          0|       loner|       1|           7.75|       low|
|        299|       1|     1|Saalfeld, M

In [172]:
# randomSplit
# randomly splits the dataframe into two based on the given weights.
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.randomSplit.html
splits = df_train.randomSplit([1.0, 2.0], seed=42)

In [173]:
splits[0].count()

315

In [174]:
splits[0].show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group| title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+------+-----------+------------+--------+---------------+----------+
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|   53.1|    C|       S|         44|         good|   Mrs|          1|       loner|       1|           53.1| very_high|
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 21.075|    F|       S|         30|       medium|Master|          4|       small|       0|        5.26875|      high|
|         17|       0|     3|Rice, Maste

In [175]:
splits[1].count()

576

In [176]:
splits[1].show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|       1|     3|Heikkinen, Miss.

In [177]:
# replace
df_train.replace("male", "Man").show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|   Man|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|       1|     3|Heikkinen, Miss.

In [178]:
# similarly
df_train.na.replace("male", "Man").show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Fare|Cabin|Embarked|name_length|nLength_group|title|family_size|family_group|is_alone|calculated_fare|fare_group|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-----+--------+-----------+-------------+-----+-----------+------------+--------+---------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|   Man|22.0|    1|    0|   7.25|    G|       S|         23|       medium|   Mr|          1|       loner|       1|           7.25|       low|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|71.2833|    C|       C|         51|         long|  Mrs|          1|       loner|       1|        71.2833| very_high|
|          3|       1|     3|Heikkinen, Miss.

In [179]:
# cube
# the following stack overflow explains cube better than official spark page. 
# https://stackoverflow.com/questions/37975227/what-is-the-difference-between-cube-rollup-and-groupby-operators
df = spark.createDataFrame([("foo", 1), ("foo", 2), ("bar", 2), ("bar", 2)]).toDF("x", "y")
df.show()

+---+---+
|  x|  y|
+---+---+
|foo|  1|
|foo|  2|
|bar|  2|
|bar|  2|
+---+---+



In [180]:
temp_df.show()

+-----------+--------+------+--------------------+----+----+-----+-----+---------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|         Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+---------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0|      A/5 21171|   7.25| NULL|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|         373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|male|NULL|    0|    0|         330877| 8.4583| NULL|       Q|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|         349909| 21.075| NULL|       S|
|         13|       0|     3|Saundercock, Mr. ...|male|20.0|    0|    0|      A/5. 2151|   8.05| NULL|       S|
|         14|       0|     3|Andersson, Mr. An...|male|39.0|    1|    5|         347082| 31.275| NULL|  

In [181]:
df.cube("x", "y").count().show()

+----+----+-----+
|   x|   y|count|
+----+----+-----+
| foo|   1|    1|
|NULL|NULL|    4|
|NULL|   1|    1|
| foo|NULL|    2|
| foo|   2|    1|
|NULL|   2|    3|
| bar|   2|    2|
| bar|NULL|    2|
+----+----+-----+



Here is what cube returns
```
// +----+----+-----+     
// |   x|   y|count|
// +----+----+-----+
// | foo|   1|    1|   <- count of records where x = foo AND y = 1
// | foo|   2|    1|   <- count of records where x = foo AND y = 2
// | bar|   2|    2|   <- count of records where x = bar AND y = 2
// |null|null|    4|   <- total count of records
// |null|   2|    3|   <- count of records where y = 2
// |null|   1|    1|   <- count of records where y = 1
// | bar|null|    2|   <- count of records where x = bar
// | foo|null|    2|   <- count of records where x = foo
// +----+----+-----+```

In [182]:
# rollup
df.rollup("x", "y").count().show()

+----+----+-----+
|   x|   y|count|
+----+----+-----+
| foo|   1|    1|
|NULL|NULL|    4|
| foo|NULL|    2|
| foo|   2|    1|
| bar|   2|    2|
| bar|NULL|    2|
+----+----+-----+



Here is what rollup's look like
```
// +----+----+-----+
// |   x|   y|count|
// +----+----+-----+
// | foo|null|    2|   <- count where x is fixed to foo
// | bar|   2|    2|   <- count where x is fixed to bar and y is fixed to  2
// | foo|   1|    1|   ...
// | foo|   2|    1|   ...
// |null|null|    4|   <- count where no column is fixed
// | bar|null|    2|   <- count where x is fixed to bar
// +----+----+-----+```

In [183]:
# sameSemantics
# Returns True when the logical query plans inside both DataFrames are equal and therefore return same results.
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.sameSemantics.html
df1 = spark.range(10)
df2 = spark.range(10)

In [184]:
df1.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [185]:
df2.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [186]:
df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id * 2))

True

In [187]:
df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col1", df2.id + 2))

False

In [188]:
df1.withColumn("col1", df1.id * 2).sameSemantics(df2.withColumn("col0", df2.id * 2))

True

In [189]:
df_train.schema

StructType([StructField('PassengerId', IntegerType(), True), StructField('Survived', IntegerType(), True), StructField('Pclass', IntegerType(), True), StructField('Name', StringType(), True), StructField('Sex', StringType(), True), StructField('Age', DoubleType(), True), StructField('SibSp', IntegerType(), True), StructField('Parch', IntegerType(), True), StructField('Fare', DoubleType(), True), StructField('Cabin', StringType(), True), StructField('Embarked', StringType(), False), StructField('name_length', IntegerType(), True), StructField('nLength_group', StringType(), True), StructField('title', StringType(), True), StructField('family_size', IntegerType(), True), StructField('family_group', StringType(), True), StructField('is_alone', IntegerType(), True), StructField('calculated_fare', DoubleType(), True), StructField('fare_group', StringType(), True)])

In [190]:
df_train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- name_length: integer (nullable = true)
 |-- nLength_group: string (nullable = true)
 |-- title: string (nullable = true)
 |-- family_size: integer (nullable = true)
 |-- family_group: string (nullable = true)
 |-- is_alone: integer (nullable = true)
 |-- calculated_fare: double (nullable = true)
 |-- fare_group: string (nullable = true)

