In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Spark_HW') \
    .getOrCreate()

25/03/03 17:46:40 WARN Utils: Your hostname, MichaelAngelo resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/03 17:46:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/03 17:46:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/03 17:46:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
df_yellow = spark.read.parquet("/mnt/c/Users/mab03/Desktop/DataEngineeringZoomcamp2025/Homework/Module5_Spark/yellow_tripdata_2024-10.parquet")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [4]:
# Repartition into 4 partitions
df_yellow.repartition(4)
# Save into parquet files
df_yellow.write.parquet('yellow/2024/10/')

                                                                                

In [12]:
# Find the approximate file size
!cd yellow/2024/10 && ls -lh

total 75M
-rwxrwxrwx 1 mab-linux mab-linux    0 Mar  3 17:51 _SUCCESS
-rwxrwxrwx 1 mab-linux mab-linux 2.1K Mar  3 17:51 part-00000-23f81cb1-4a4c-40f9-9680-6257be7c93a5-c000.snappy.parquet
-rwxrwxrwx 1 mab-linux mab-linux  21M Mar  3 17:51 part-00002-23f81cb1-4a4c-40f9-9680-6257be7c93a5-c000.snappy.parquet
-rwxrwxrwx 1 mab-linux mab-linux  21M Mar  3 17:51 part-00006-23f81cb1-4a4c-40f9-9680-6257be7c93a5-c000.snappy.parquet
-rwxrwxrwx 1 mab-linux mab-linux  21M Mar  3 17:51 part-00010-23f81cb1-4a4c-40f9-9680-6257be7c93a5-c000.snappy.parquet
-rwxrwxrwx 1 mab-linux mab-linux  14M Mar  3 17:51 part-00013-23f81cb1-4a4c-40f9-9680-6257be7c93a5-c000.snappy.parquet


In [19]:
# Inspect the data
df_yellow.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [53]:
# Create a temporary view to use in spark sql
df_yellow.createOrReplaceTempView('yellow_trips')

In [23]:
count_sql = spark.sql("""
    SELECT count(*) from yellow_trips
    where DATE(tpep_pickup_datetime) = "2024-10-15"
""")
count_sql.show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [26]:
from pyspark.sql import functions as F

In [39]:
trips_time_hours = df_yellow \
    .withColumn("diff_hours",(F.unix_timestamp("tpep_dropoff_datetime")-F.unix_timestamp("tpep_pickup_datetime"))/3600)
longest_trip = trips_time_hours \
    .select(F.max("diff_hours").alias("Longest trip (Hours)"))
longest_trip.show()

+--------------------+
|Longest trip (Hours)|
+--------------------+
|  162.61777777777777|
+--------------------+



In [44]:
# Define standard schema
from pyspark.sql import types
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True), 
    types.StructField('Borough', types.StringType(), True), 
    types.StructField('Zone', types.StringType(), True), 
    types.StructField('service_zone', types.StringType(), True)]
)

In [45]:
# Read taxi_zone_lookup csv
df_taxi_zones = spark.read \
    .option("header", True) \
    .schema(schema) \
    .csv("/mnt/c/Users/mab03/Desktop/DataEngineeringZoomcamp2025/Homework/Module5_Spark/taxi_zone_lookup.csv")

df_taxi_zones.schema

StructType([StructField('LocationID', IntegerType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [46]:
df_taxi_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [54]:
# Create temporary view
df_taxi_zones.createOrReplaceTempView('taxi_zones')

In [64]:
yellow_taxi_zones = spark.sql("""
    with t_zones as(
        select * from taxi_zones
        where Borough != 'Unknown'
    ),
    pickup_dropoff_zones as(
        SELECT 
            t_pu.zone as pickup_zone,
            t_do.zone as dropoff_zone
        from yellow_trips as y
        inner join t_zones as t_pu 
        on t_pu.locationid = y.PULocationID
        inner join t_zones as t_do
        on t_do.locationid = y.DOLocationID
    )
        SELECT 
            pickup_zone,
            count(pickup_zone) as frequency
        from pickup_dropoff_zones
        group by pickup_zone
        order by frequency asc
""")

yellow_taxi_zones.show(5)

+--------------------+---------+
|         pickup_zone|frequency|
+--------------------+---------+
|Governor's Island...|        1|
|       Rikers Island|        2|
|       Arden Heights|        2|
|         Jamaica Bay|        3|
| Green-Wood Cemetery|        3|
+--------------------+---------+
only showing top 5 rows

