In [1]:
import sys
sys.path.append('/usr/local/bin')  # Add PySpark's location

import findspark  # Helps Jupyter find Spark
findspark.init()

import pyspark



In [2]:
pyspark.__file__

'/usr/local/opt/apache-spark/libexec/python/pyspark/__init__.py'

In [3]:
from pyspark.sql import SparkSession

In [22]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("YellowTaxi2024")\
        .getOrCreate()

In [10]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-05 00:09:44--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
2600:9000:25a2:ce00:b:20a5:b140:21, 2600:9000:25a2:7a00:b:20a5:b140:21, 2600:9000:25a2:dc00:b:20a5:b140:21, ...
Verbindungsaufbau zu d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:25a2:ce00:b:20a5:b140:21|:443 … verbunden.
200 OKnforderung gesendet, auf Antwort wird gewartet … 
Länge: 64346071 (61M) [binary/octet-stream]
Wird in »yellow_tripdata_2024-10.parquet« gespeichert.


2025-03-05 00:09:45 (65,6 MB/s) - »yellow_tripdata_2024-10.parquet« gespeichert [64346071/64346071]



In [26]:
df = spark.read\
   .parquet("yellow_taxi")

In [27]:
df.printSchema()
df.show(5)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------

## Question 1

In [14]:
df_repartitioned = df.repartition(4)

In [17]:
df_repartitioned.write.mode("overwrite").parquet(".")

                                                                                

## Question 2

In [28]:
from pyspark.sql.functions import to_date, col

# Convert pickup datetime to date and filter for October 15, 2024
df_oct_15 = df.filter(to_date(col("tpep_pickup_datetime")) == "2024-10-15")

# Count the number of trips
trip_count = df_oct_15.count()

print(f"Number of taxi trips on October 15th, 2024: {trip_count}")

Number of taxi trips on October 15th, 2024: 128893


## Question 3

In [49]:
from pyspark.sql.functions import unix_timestamp,max

df_q3 = df.withColumn("trip_duration_hours",
                   (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 3600)

# Find the longest trip
longest_trip = df_q3.agg(max("trip_duration_hours")).collect()[0][0]

print(f"The longest trip duration in the dataset is: {longest_trip:.2f} hours")

The longest trip duration in the dataset is: 162.62 hours


In [50]:
spark.stop()

In [52]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-05 15:27:57--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Auflösen des Hostnamens d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)… 2600:9000:25a2:1400:b:20a5:b140:21, 2600:9000:25a2:6800:b:20a5:b140:21, 2600:9000:25a2:6400:b:20a5:b140:21, ...
Verbindungsaufbau zu d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|2600:9000:25a2:1400:b:20a5:b140:21|:443 … verbunden.
HTTP-Anforderung gesendet, auf Antwort wird gewartet … 200 OK
Länge: 12331 (12K) [text/csv]
taxi_zone_lookup.csv: No such file or directory

Schreiben nach »taxi_zone_lookup.csv« nicht möglich (No such file or directory).


In [53]:
from pyspark.sql.functions import col, count, asc

# Initialize Spark session
spark = SparkSession.builder.appName("TaxiZoneAnalysis").getOrCreate()

# Load Yellow Taxi October 2024 data (Parquet)
yellow_tripdata_path = "yellow_taxi"  # Replace with actual path
df_trips = spark.read.parquet(yellow_tripdata_path)

# Load Taxi Zone Lookup data (CSV)
zone_lookup_path = "taxi_zone_lookup.csv"  # Replace with actual path
df_zones = spark.read.option("header", True).csv(zone_lookup_path)

# Show schemas to verify
df_trips.printSchema()
df_zones.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-

In [54]:
df_trips.createOrReplaceTempView("yellow_trips")
df_zones.createOrReplaceTempView("zone_lookup")

In [55]:
query = """
SELECT z.Zone, COUNT(t.PULocationID) as trip_count
FROM yellow_trips t
JOIN zone_lookup z
ON t.PULocationID = z.LocationID
GROUP BY z.Zone
ORDER BY trip_count ASC
LIMIT 1
"""

least_frequent_zone = spark.sql(query)
least_frequent_zone.show()



+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|Governor's Island...|         1|
+--------------------+----------+



                                                                                