In [167]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from datetime import date
from pyspark.sql.functions import col, expr, rank, unix_timestamp, round, to_date, lit, count
from pyspark.sql.window import Window

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/03 16:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
## Question 1
pyspark.__version__

'3.5.4'

In [None]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

In [None]:
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

In [None]:
df_partition = df.repartition(4)

In [None]:
## Question 2
df_partition.write.parquet('yellow_tripdata/2024/10/')

In [9]:
df = spark.read.parquet('yellow_tripdata/2024/10/')

In [133]:
df.dtypes

[('VendorID', 'int'),
 ('tpep_pickup_datetime', 'timestamp_ntz'),
 ('tpep_dropoff_datetime', 'timestamp_ntz'),
 ('passenger_count', 'bigint'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'bigint'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('payment_type', 'bigint'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('congestion_surcharge', 'double'),
 ('Airport_fee', 'double')]

In [139]:
df_q3 = df.filter(to_date(df.tpep_pickup_datetime) == '2024-10-15')

In [141]:
## Question 3
df_q3.count()

128893

In [153]:
df_q4_diff = df \
    .select("tpep_dropoff_datetime", "tpep_pickup_datetime") \
    .withColumn("trip_duration_seconds", unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime")))
# )

In [155]:
df_q4_diff = df_q4_diff.withColumn("trip_duration_hours", round(col("trip_duration_seconds") / 3600, 2)) 

In [157]:
window_spec = Window.orderBy(col("trip_duration_hours").desc())

df_q4_diff = df_q4_diff.withColumn("rank", rank().over(window_spec))

ranked_df_q4_diff = df_q4_diff.orderBy(col("trip_duration_hours").desc())

In [159]:
## Question 4
df_q4_diff.show()

25/03/03 17:04:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:04:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:04:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---------------------+--------------------+---------------------+-------------------+----+
|tpep_dropoff_datetime|tpep_pickup_datetime|trip_duration_seconds|trip_duration_hours|rank|
+---------------------+--------------------+---------------------+-------------------+----+
|  2024-10-23 07:40:53| 2024-10-16 13:03:49|               585424|             162.62|   1|
|  2024-10-09 18:06:55| 2024-10-03 18:47:25|               515970|             143.33|   2|
|  2024-10-28 09:46:33| 2024-10-22 16:00:55|               495938|             137.76|   3|
|  2024-10-23 04:43:37| 2024-10-18 09:53:32|               413405|             114.83|   4|
|  2024-10-24 18:30:18| 2024-10-21 00:36:24|               323634|               89.9|   5|
|  2024-10-24 06:57:38| 2024-10-20 13:30:52|               322006|              89.45|   6|
|  2024-10-25 14:22:49| 2024-10-22 16:04:52|               253077|               70.3|   7|
|  2024-10-15 15:07:15| 2024-10-12 19:32:51|               243264|              

In [161]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-03 17:18:58--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.48.100, 54.230.48.207, 54.230.48.149, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.48.100|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-03 17:18:59 (406 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [163]:
df_zone = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [187]:
df_counts = df.groupBy("PULocationID").agg(count("*").alias("pickup_count"))
window_spec = Window.orderBy(col("pickup_count").asc()) 
df_rank = df_counts.withColumn("rank", rank().over(window_spec))
df_joined = df_rank.join(df_zone, df_rank.PULocationID == df_zone.LocationID, "inner")\
    .select(df_rank["PULocationID"], col("Zone"), col("pickup_count"), col("rank"))

In [189]:
## Question 5
df_joined.head(10)

25/03/03 17:26:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:26:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:26:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:26:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:26:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 17:26:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/03 1

[Row(PULocationID=105, Zone="Governor's Island/Ellis Island/Liberty Island", pickup_count=1, rank=1),
 Row(PULocationID=5, Zone='Arden Heights', pickup_count=2, rank=2),
 Row(PULocationID=199, Zone='Rikers Island', pickup_count=2, rank=2),
 Row(PULocationID=111, Zone='Green-Wood Cemetery', pickup_count=3, rank=4),
 Row(PULocationID=2, Zone='Jamaica Bay', pickup_count=3, rank=4),
 Row(PULocationID=44, Zone='Charleston/Tottenville', pickup_count=4, rank=6),
 Row(PULocationID=84, Zone="Eltingville/Annadale/Prince's Bay", pickup_count=4, rank=6),
 Row(PULocationID=245, Zone='West Brighton', pickup_count=4, rank=6),
 Row(PULocationID=204, Zone='Rossville/Woodrow', pickup_count=4, rank=6),
 Row(PULocationID=187, Zone='Port Richmond', pickup_count=4, rank=6)]