<a href="https://colab.research.google.com/github/nv-hiep/pyspark/blob/main/Spark_SQL_DataFrame_Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
# For python users, you should also install pyspark using the following command.
# !pip install pyspark

Run a local spark session to test your installation:

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# Connect to Kaggle

Ref: https://galhever.medium.com/how-to-import-data-from-kaggle-to-google-colab-8160caa11e2

Install the Kaggle package that will be used for importing the data.

In [None]:
!pip install kaggle



Next, we need to upload the credentials of our Kaggle account. To do so, you need to enter your profile and “Create New API Token”. If you already have one you can click on “Expire API Token” and create a new one.

Then, save the json file with your credentials on your computer and upload this file to Colab using the code below:

In [None]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"kiemhiep","key":"346d05746c99fb614062800d8014518f"}'}

The Kaggle API client expects the json file to be in ~/.kaggle folder so let’s create a new folder and move it inside.

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Next, we will click on the three vertical points on the right side of the screen and “Copy the API Command” of the dataset that we want to import from Kaggle.

In [None]:
# !kaggle datasets download -d jessicali9530/celeba-dataset
!kaggle datasets download -d hesh97/titanicdataset-traincsv

Downloading titanicdataset-traincsv.zip to /content
  0% 0.00/22.0k [00:00<?, ?B/s]
100% 22.0k/22.0k [00:00<00:00, 21.0MB/s]


In [None]:
!pwd

/content


Let’s see the imported files:

In [None]:
!ls

celeba-dataset.zip  sample_data		       spark-3.1.1-bin-hadoop3.2.tgz
kaggle.json	    spark-3.1.1-bin-hadoop3.2  titanicdataset-traincsv.zip


Now, the last step is to open the extracted files and get the data:

In [None]:
import zipfile
zip_ref = zipfile.ZipFile('titanicdataset-traincsv.zip', 'r')
zip_ref.extractall('files')
zip_ref.close()

In [None]:
!ls

files	     sample_data		spark-3.1.1-bin-hadoop3.2.tgz
kaggle.json  spark-3.1.1-bin-hadoop3.2	titanicdataset-traincsv.zip


In [None]:
! rm -rf titanicdataset-traincsv.zip

In [None]:
!ls

files	     sample_data		spark-3.1.1-bin-hadoop3.2.tgz
kaggle.json  spark-3.1.1-bin-hadoop3.2


In [None]:
% cd files

/content/files


In [None]:
!ls

train.csv


That’s it! Now your data is ready and you can start working on it.

# Load data

In [None]:
# Load the csv into a dataframe
titanic_df = spark.read.csv('files/train.csv', header=True, inferSchema=True)
titanic_df

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [None]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [None]:
titanic_df.limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S


In [None]:
titanic_df.where( (titanic_df['Age'] > 25) & (titanic_df['Survived'] == 1) ).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| null|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|          113783|  26.55| C103|       S|
|         16|       1|     2|Hewlett, Mrs. (Ma...|female|55.0|    0|    0|      

In [None]:
titanic_df.filter( (titanic_df['Age'] > 25) & (titanic_df['Survived'] == 1) ).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742|11.1333| null|       S|
|         12|       1|     1|Bonnell, Miss. El...|female|58.0|    0|    0|          113783|  26.55| C103|       S|
|         16|       1|     2|Hewlett, Mrs. (Ma...|female|55.0|    0|    0|      

In [None]:
titanic_df.agg( {'Fare': 'avg'} ).show()

+----------------+
|       avg(Fare)|
+----------------+
|32.2042079685746|
+----------------+



In [None]:
titanic_df.groupBy('Pclass').agg({'Fare':'avg'}).orderBy('Pclass', ascending=True).show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     2| 20.66218315217391|
|     3|13.675550101832997|
+------+------------------+



In [None]:
titanic_df.groupBy('Pclass').agg({'Fare':'avg'}).show()

+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     3|13.675550101832997|
|     2| 20.66218315217391|
+------+------------------+



In [None]:
titanic_df.where( titanic_df['Age'] > 30. ).agg({'Fare':'avg'}).show()

+-----------------+
|        avg(Fare)|
+-----------------+
|42.35290983606555|
+-----------------+



In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [None]:
def _round(x):
  return int(x)

In [None]:
round_float_down_udf = udf(_round, IntegerType())

In [None]:
titanic_df.select( titanic_df.PassengerId.alias('ID'), titanic_df.Fare, round_float_down_udf( titanic_df.Fare ).alias('RoundedFare')  ).show()

+---+-------+-----------+
| ID|   Fare|RoundedFare|
+---+-------+-----------+
|  1|   7.25|          7|
|  2|71.2833|         71|
|  3|  7.925|          7|
|  4|   53.1|         53|
|  5|   8.05|          8|
|  6| 8.4583|          8|
|  7|51.8625|         51|
|  8| 21.075|         21|
|  9|11.1333|         11|
| 10|30.0708|         30|
| 11|   16.7|         16|
| 12|  26.55|         26|
| 13|   8.05|          8|
| 14| 31.275|         31|
| 15| 7.8542|          7|
| 16|   16.0|         16|
| 17| 29.125|         29|
| 18|   13.0|         13|
| 19|   18.0|         18|
| 20|  7.225|          7|
+---+-------+-----------+
only showing top 20 rows



In [None]:
# Nice one here
titanic_df.createOrReplaceTempView("Titanic")

In [None]:
spark.sql('Select * from Titanic').limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S


In [None]:
spark.sql('SELECT COUNT(PassengerId), Pclass FROM Titanic GROUP BY Pclass;').limit(5)

count(PassengerId),Pclass
216,1
491,3
184,2


In [None]:
spark.sql('SELECT COUNT(PassengerId), Pclass FROM Titanic GROUP BY Pclass ORDER BY COUNT(PassengerID) ASC;').limit(5)

count(PassengerId),Pclass
184,2
216,1
491,3


In [None]:
spark.sql('SELECT COUNT(PassengerId), Pclass FROM Titanic GROUP BY Pclass ORDER BY COUNT(PassengerID) DESC;').limit(5)

count(PassengerId),Pclass
491,3
216,1
184,2
