<h1><center>Introduction to Google Colab and PySpark</center></h1>

## Table Of Contents:
<ol>
<li><a href="#objective">Objective</a></li>
<li><a href="#prerequisite">Prerequisite</a></li>
<li><a href="#notes-from-the-author">Notes from the Author</a></li>
<li><a href="#big-data-pyspark-and-colaboratory">Big data, PySpark and Colaboratory</a>
    <ol>
        <li><a href="#big-data">Big data</a></li>
        <li><a href="#pyspark">PySpark</a></li>
        <li><a href="#colaboratory">Colaboratory</a></li>
    </ol>
</li>
<li><a href="#jupyter-notebook-basics">Jupyter Notebook Basics</a>
    <ol>
        <li><a href="#code-cells">Code cells</a></li>
        <li><a href="#text-cells">Text cells</a></li>
        <li><a href="#access-to-the-shell">Access to the shell</a></li>
        <li><a href="#installing-spark">Installing Spark</a></li>
    </ol>
</li>
<li><a href="#exploring-the-dataset">Exploring the Dataset</a>
    <ol>
        <li><a href="#loading-the-dataset">Loading the Dataset</a></li>
        <li><a href="#viewing-the-dataframe">Viewing the Dataframe</a></li>
        <li><a href="#viewing-dataframe-columns">Viewing Dataframe Columns</a></li>
        <li><a href="#dataframe-schema">Dataframe Schema</a>
          <ul>
            <li><a href="#implicit-schema-inference">Inferring Schema Implicitly</a></li>
            <li><a href="#explicit-schema-inference">Defining Schema Explicitly</a></li>
          </ul>
        </li>
    </ol>
</li>
<li><a href="#dataframe-operations-on-columns">DataFrame Operations on Columns</a>
    <ol>
        <li><a href="#selecting-columns">Selecting Columns</a></li>
        <li><a href="#selecting-multiple-columns">Selecting Multiple Columns</a></li>
        <li><a href="#adding-new-columns">Adding New Columns</a></li>
        <li><a href="#renaming-columns">Renaming Columns</a>
        <li><a href="#grouping-by-columns">Grouping By Columns</a>
        <li><a href="#removing-columns">Removing Columns</a>
    </ol>
</li>
<li><a href="#dataframe-operations-on-rows">DataFrame Operations on Rows</a>
    <ol>
        <li><a href="#filtering-rows">Filtering Rows</a></li>
        <li><a href="#get-distinct-rows">Get Distinct Rows</a></li>
        <li><a href="#sorting-rows">Sorting Rows</a></li>
        <li><a href="#union-dataframes">Union Dataframes</a>
    </ol>
</li>
<li><a href="#common-data-manipulation-functions">Common Data Manipulation Functions</a>
    <ol>
        <li><a href="#string-functions">String Functions</a></li>
        <li><a href="#numeric-functions">Numeric Functions</a></li>
        <li><a href="#operations-on-date">Operations on Date</a></li>
    </ol>
</li>
<li><a href="#joins-in-pyspark">Joins in PySpark</a></li>
<li><a href="#spark-sql">Spark SQL</a></li>
<li><a href="#rdd">RDD</a></li>
<li><a href="#user-defined-functions-udf">User-Defined Functions (UDF)</a></li>
<li><a href="#common-questions">Common Questions</a>
    <ol>
        <li><a href="#recommended-ide">Recommended IDE</a></li>
        <li><a href="#submitting-a-spark-job">Submitting a Spark Job</a></li>
        <li><a href="#creating-dataframes">Creating Dataframes</a></li>
        <li><a href="#drop-duplicates">Drop Duplicates</a></li>
        <li><a href="#fine-tuning-a-pyspark-job">Fine Tuning a PySpark Job</a>
          <ul>
            <li><a href="#emr-sizing">EMR Sizing</a></li>
            <li><a href="#spark-configurations">Spark Configurations</a></li>
            <li><a href="#job-tuning">Job Tuning</a>
            <li><a href="#best-practices">Best Practices</a>
          </ul>
        </li>
    </ol>
</li>
</ol>

<a id='objective'></a>
## Objective
The objective of this notebook is to:
><li>Give a proper understanding about the different PySpark functions available. </li>
><li>A short introduction to Google Colab, as that is the platform on which this notebook is written on. </li>

Once you complete this notebook, you should be able to write pyspark programs in an efficent way. The ideal way to use this is by going through the examples given and then trying them on Colab. At the end there are a few hands on questions which you can use to evaluate yourself.

<a id='prerequisite'></a>
## Prerequisite
><li>Although some theory about pyspark and big data will be given in this notebook, I recommend everyone to read more about it and have a deeper understanding on how the functions get executed and the relevance of big data in the current scenario.
><li>A good understanding on python will be an added bonus.

<a id='notes-from-the-author'></a>
## Notes from the Author

This tutorial was made using Google Colab so the code you see here is meant to run on a colab notebook. <br>
It goes through basic [PySpark Functions](https://spark.apache.org/docs/latest/api/python/index.html) and a short introduction on how to use [Colab](https://colab.research.google.com/notebooks/basic_features_overview.ipynb). <br>
If you want to view my colab notebook for this particular tutorial, you can view it [here](https://colab.research.google.com/drive/1G894WS7ltIUTusWWmsCnF_zQhQqZCDOc). The viewing experience and readability is much better there. <br>
If you want to try out things with this notebook as a base, feel free to download it from my repo [here](https://github.com/jacobceles/knowledge-repo/blob/master/pyspark/Colab%20and%20PySpark.ipynb) and then use it with jupyter notebook.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [9]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark
print(spark.version)

3.5.1


In [39]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-02-23 15:45:27--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.154.99.220, 18.154.99.225, 18.154.99.177, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.154.99.220|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-02-23 15:45:27 (185 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [40]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [55]:
df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [41]:
import os
import glob
df_repartitioned = df.repartition(4)

df_repartitioned.write.mode("overwrite").parquet("output_data")
output_dir = "output_data"

parquet_files = glob.glob(os.path.join(output_dir, "*.parquet"))

total_size = sum(os.path.getsize(f) for f in parquet_files)
num_files = len(parquet_files)

avg_size = total_size / num_files if num_files > 0 else 0

print(f"Avg size: {avg_size / (1024 * 1024):.2f} MB")


Avg size: 23.04 MB


In [42]:
df[(df["tpep_pickup_datetime"]>="2024-10-15") & (df["tpep_pickup_datetime"]<"2024-10-16")].count()



128893

In [43]:
df.count()

3833771

In [58]:
from pyspark.sql.functions import col, max, unix_timestamp

pnt = df.orderBy(df.trip_distance.desc()).limit(1)
df = df.withColumn("duration_seconds", unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime"))



In [61]:
df.agg(max("duration_seconds")).collect()[0][0]/3600

162.61777777777777

In [51]:
from datetime import datetime
temp.orderBy(temp.duration_seconds.desc()).limit(10)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,duration_seconds
2,2024-11-14 18:30:00,2024-11-14 18:41:41,1,2.07,1,N,162,142,1,12.8,1.0,0.5,3.56,0.0,1.0,21.36,2.5,0.0,1731128698.5
2,2024-11-01 16:49:14,2024-11-02 16:25:32,1,1.99,1,N,238,142,1,13.5,2.5,0.5,3.0,0.0,1.0,23.0,2.5,0.0,1730084043.1794446
2,2024-11-01 23:45:57,2024-11-01 23:59:43,1,5.23,1,N,138,95,1,23.3,6.0,0.5,3.0,0.0,1.0,35.55,0.0,1.75,1730024887.2341666
2,2024-10-31 23:44:08,2024-11-01 23:35:16,1,0.44,1,N,79,114,2,7.2,1.0,0.5,0.0,0.0,1.0,12.2,2.5,0.0,1730023444.2644444
2,2024-10-31 23:32:18,2024-11-01 23:26:13,1,1.41,1,N,113,137,1,17.0,1.0,0.5,2.2,0.0,1.0,24.2,2.5,0.0,1730022901.4616666
2,2024-10-31 23:26:12,2024-11-01 23:25:02,2,2.87,1,N,137,144,1,21.9,1.0,0.5,0.0,0.0,1.0,26.9,2.5,0.0,1730022830.5633333
2,2024-10-31 23:28:04,2024-11-01 23:13:10,2,3.94,1,N,24,141,1,21.2,1.0,0.5,5.24,0.0,1.0,31.44,2.5,0.0,1730022118.5322225
2,2024-10-31 23:39:07,2024-11-01 23:11:37,1,4.83,1,N,48,152,1,25.4,1.0,0.5,6.08,0.0,1.0,36.48,2.5,0.0,1730022025.3480556
2,2024-10-31 23:15:09,2024-11-01 23:08:25,1,1.48,1,N,170,79,2,10.7,1.0,0.5,0.0,0.0,1.0,15.7,2.5,0.0,1730021833.7475
2,2024-10-31 23:09:33,2024-11-01 22:56:44,1,1.23,1,N,186,170,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0,1730021132.8408334


In [62]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-02-23 16:02:19--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 18.154.99.47, 18.154.99.220, 18.154.99.225, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|18.154.99.47|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv.1’


2025-02-23 16:02:19 (14.4 MB/s) - ‘taxi_zone_lookup.csv.1’ saved [12331/12331]



In [72]:
db = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')
db.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [77]:
from pyspark.sql.functions import col, asc

df.groupBy("PULocationID").count().orderBy(asc("count")).limit(1).show()

+------------+-----+
|PULocationID|count|
+------------+-----+
|         105|    1|
+------------+-----+



In [78]:
db_filtered = db.filter(col("LocationID").isNotNull() & col("LocationID").isin([105]))
db_filtered.show()

+----------+---------+--------------------+------------+
|LocationID|  Borough|                Zone|service_zone|
+----------+---------+--------------------+------------+
|       105|Manhattan|Governor's Island...| Yellow Zone|
+----------+---------+--------------------+------------+

