In [9]:
import pyspark
from pyspark.sql import SparkSession

In [10]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [11]:
spark.version

'3.3.2'

In [14]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-02-28 18:24:19--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.84.160.73, 52.84.160.213, 52.84.160.116, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.84.160.73|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-02-28 18:24:20 (81.4 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [15]:
df = spark.read.parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [17]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [18]:
df = df.repartition(4)

In [20]:
df.write.parquet("output_yellow_taxi_q2")

                                                                                

In [30]:
from pyspark.sql import functions as F
df \
    .withColumn('tpep_pickup_datetime', F.to_date(df.tpep_pickup_datetime)) \
    .filter("tpep_pickup_datetime = '2024-10-15'") \
    .count()

                                                                                

128893

In [38]:
from pyspark.sql import functions as F

df_yellow_oct_24 \
    .withColumn('duration_hours', 
                (F.unix_timestamp(df_yellow_oct_24.tpep_dropoff_datetime) - 
                 F.unix_timestamp(df_yellow_oct_24.tpep_pickup_datetime)) / 3600) \
    .withColumn('pickup_date', F.to_date(df_yellow_oct_24.tpep_pickup_datetime)) \
    .groupBy('pickup_date') \
    .agg(F.max('duration_hours').alias('max_duration_hours')) \
    .orderBy('max_duration_hours', ascending=False) \
    .limit(5) \
    .show()

[Stage 20:>                                                         (0 + 4) / 4]

+-----------+------------------+
|pickup_date|max_duration_hours|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-21| 89.89833333333333|
+-----------+------------------+



                                                                                

In [46]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-02-28 20:27:14--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 52.84.160.213, 52.84.160.73, 52.84.160.116, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|52.84.160.213|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv.3’


2025-02-28 20:27:14 (201 MB/s) - ‘taxi_zone_lookup.csv.3’ saved [12331/12331]



In [47]:
df_location = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)
df_location.createOrReplaceTempView("zone_lookup") # create a temporary view for the location data
df_yellow_oct_24.createOrReplaceTempView("yellow_trip_data") # create a temporary view for the yellow data

In [50]:
joined_df = df_yellow_oct_24.join(df_location, df_yellow_oct_24.PULocationID == df_location.LocationID, "inner")

pickup_counts = joined_df.groupBy("Zone").agg(F.count("PULocationID").alias("pickup_count"))

result = pickup_counts.orderBy("pickup_count").limit(5)

result.show()

[Stage 37:>                                                         (0 + 4) / 4]

+--------------------+------------+
|                Zone|pickup_count|
+--------------------+------------+
|Governor's Island...|           1|
|       Rikers Island|           2|
|       Arden Heights|           2|
|         Jamaica Bay|           3|
| Green-Wood Cemetery|           3|
+--------------------+------------+



                                                                                