In [66]:
import os
from pathlib import Path
from dotenv import load_dotenv
import findspark
env_path = Path(os.getenv("HOME")) / "data_engineering_zoomcamp_2025" / "5_batch" / ".env"
load_dotenv(dotenv_path=env_path)
findspark.init()

In [67]:
import pyspark
from pyspark.sql import SparkSession

In [68]:
print(pyspark.__version__)

3.3.2


In [69]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [70]:
spark.version

'3.3.2'

In [71]:
from pyspark.sql import types

In [72]:
df = spark.read \
    .option("header", "true") \
    .parquet('./data/homework/yellow_tripdata_2024-10.parquet')

In [73]:
df.head(10)

[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 30, 44), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 48, 26), passenger_count=1, trip_distance=3.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=246, payment_type=1, fare_amount=18.4, extra=1.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=24.9, congestion_surcharge=2.5, Airport_fee=0.0),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 12, 20), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 2, 25, 25), passenger_count=1, trip_distance=2.2, RatecodeID=1, store_and_fwd_flag='N', PULocationID=48, DOLocationID=236, payment_type=1, fare_amount=14.2, extra=3.5, mta_tax=0.5, tip_amount=3.8, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=23.0, congestion_surcharge=2.5, Airport_fee=0.0),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 2, 4, 46), tpep_dropoff_datetime=datetime.datetime(2

In [74]:
output_path = "./data/pq/homework"


df \
    .repartition(4) \
    .write.parquet(output_path, mode='overwrite')

                                                                                

In [75]:
from pyspark.sql import functions as F

In [76]:
df = df \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [77]:
df = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime))\
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime))

In [78]:
df \
    .filter(df.pickup_date == '2024-10-15')\
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID')\
    .count()

125567

In [79]:
df = df \
    .withColumn('trip_duration_hours', (F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime))/3600)

In [80]:
df.orderBy(F.col('trip_duration_hours').desc()).show()

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+-------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_date|dropoff_date|trip_duration_hours|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+-------------------+
|       2|2024-10-16 15:03:49|2024-10-23 09:40:53|           

In [81]:
zone_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True),
])

df_zones = spark.read \
    .option("header", "true") \
    .schema(zone_schema)\
    .csv('./data/homework/taxi_zone_lookup.csv')

In [82]:
output_path = "./data/pq/homework/zones"

df_zones \
    .repartition(1) \
    .write.parquet(output_path, mode='overwrite')

In [83]:
df_zones = spark.read.parquet('./data/pq/homework/zones/')

In [95]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [88]:
df_result = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [96]:
df_result\
    .groupBy('Zone')\
    .count()\
    .show()

+--------------------+------+
|                Zone| count|
+--------------------+------+
|           Homecrest|   263|
|              Corona|   496|
|    Bensonhurst West|   312|
|          Douglaston|    74|
|      Newark Airport|   555|
|Charleston/Totten...|     4|
|          Mount Hope|   339|
|      Pelham Parkway|   178|
|East Concourse/Co...|   683|
|         Marble Hill|    73|
|           Rego Park|   471|
|Upper East Side S...|191011|
|       Dyker Heights|   172|
|   Kew Gardens Hills|   245|
|       Rikers Island|     2|
|     Jackson Heights|  1760|
|             Bayside|   117|
|      Yorkville West| 69757|
|TriBeCa/Civic Center| 53604|
|          Highbridge|   426|
+--------------------+------+
only showing top 20 rows

