In [1]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql.functions import to_date, to_timestamp
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

25/03/02 15:26:51 WARN Utils: Your hostname, Sangs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.118 instead (on interface en0)
25/03/02 15:26:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/02 15:26:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.5.4'

In [4]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

--2025-03-02 15:26:52--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.126, 54.230.209.200, 54.230.209.72, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.126|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-03-02 15:26:54 (59.1 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [5]:
yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [6]:
df = spark.read.parquet("yellow_tripdata_2024-10.parquet", header=True, schema=yellow_schema)

In [7]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [8]:
# df.repartition(4).write.parquet("data", mode="overwrite")

In [None]:
# !ls -lh data/*.parquet

In [10]:
df \
    .withColumn('pickup_date', to_date(df.tpep_pickup_datetime)) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

128893

In [11]:
df.createOrReplaceTempView("data")

spark.sql("SELECT COUNT(1) FROM data WHERE to_date(tpep_pickup_datetime) = '2024-10-15'").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [12]:
df \
    .withColumn("duration", 
                    (to_timestamp(df.tpep_dropoff_datetime).cast("long") - 
                     to_timestamp(df.tpep_pickup_datetime).cast("long"))/3600) \
    .withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
    .groupBy('pickup_date') \
    .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+------------------+
|pickup_date|     max(duration)|
+-----------+------------------+
| 2024-10-16|162.61777777777777|
| 2024-10-03|           143.325|
| 2024-10-22|137.76055555555556|
| 2024-10-18|114.83472222222223|
| 2024-10-21| 89.89833333333333|
+-----------+------------------+



In [13]:
df.createOrReplaceTempView("data")

spark.sql("""
SELECT 
    to_date(tpep_pickup_datetime) AS pickup_date,
    ROUND(MAX(CAST(to_timestamp(tpep_dropoff_datetime) AS LONG) - CAST(to_timestamp(tpep_pickup_datetime) AS LONG))/3600,2) AS duration
FROM data
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10;
""").show()

+-----------+--------+
|pickup_date|duration|
+-----------+--------+
| 2024-10-16|  162.62|
| 2024-10-03|  143.33|
| 2024-10-22|  137.76|
| 2024-10-18|  114.83|
| 2024-10-21|    89.9|
| 2024-10-20|   89.45|
| 2024-10-12|   67.57|
| 2024-10-17|   66.07|
| 2024-10-24|   38.47|
| 2024-10-23|   33.95|
+-----------+--------+



In [14]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

--2025-03-02 15:26:59--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.209.126, 54.230.209.200, 54.230.209.72, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.209.126|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘taxi_zone_lookup.csv’


2025-03-02 15:26:59 (406 MB/s) - ‘taxi_zone_lookup.csv’ saved [12331/12331]



In [15]:
df_zone = spark.read.csv("taxi_zone_lookup.csv", header=True, inferSchema=True)

In [16]:
df_zone.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [17]:
df_result = df \
    .groupBy("PULocationID") \
    .count() \
    .orderBy('count', ascending=True) \
    .limit(1)

In [18]:
df_result = df_result.join(df_zone, df_result.PULocationID == df_zone.LocationID)

In [19]:
df_result.select("Zone").show(truncate=False)

+---------------------------------------------+
|Zone                                         |
+---------------------------------------------+
|Governor's Island/Ellis Island/Liberty Island|
+---------------------------------------------+

