In [46]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

25/02/27 19:09:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [47]:
!wget -O yellow_tripdata_2024-10.parquet "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet"

--2025-02-27 19:09:08--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.33.163.101, 13.33.163.188, 13.33.163.13, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.33.163.101|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 64346071 (61M) [binary/octet-stream]
Saving to: ‘yellow_tripdata_2024-10.parquet’


2025-02-27 19:09:48 (1.58 MB/s) - ‘yellow_tripdata_2024-10.parquet’ saved [64346071/64346071]



In [48]:
spark = SparkSession.builder \
    .appName("Read Parquet") \
    .getOrCreate()

df = spark.read.parquet("yellow_tripdata_2024-10.parquet")
df.show(5)

25/02/27 19:10:09 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [32]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [33]:
import pyspark.sql.functions as f


In [34]:
df=df.withColumn("pickup_date",f.to_date(df.tpep_pickup_datetime))\
.withColumn("dropoff_date",f.to_date(df.tpep_dropoff_datetime))

In [35]:
df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- dropoff_date: date (nullable = true)



In [36]:
df=df.repartition(4)

In [37]:
df.write.mode("overwrite").parquet('yellow_tripdata/2024/10/')

                                                                                

In [38]:
from pyspark.sql.functions import col

df.filter(col("pickup_date") == "2024-10-15").count()


128893

In [45]:

df_with_diff = df.withColumn(
    "hours_diff", 
    (f.unix_timestamp("tpep_dropoff_datetime") - f.unix_timestamp("tpep_pickup_datetime")) / 3600
)

df_with_diff.agg(f.max("hours_diff").alias("max_duration_hours")).show()






+------------------+
|max_duration_hours|
+------------------+
|162.61777777777777|
+------------------+



                                                                                

In [57]:
!wget -O zone_lookup.csv "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

--2025-02-27 19:28:11--  https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.33.163.188, 13.33.163.13, 13.33.163.58, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.33.163.188|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12331 (12K) [text/csv]
Saving to: ‘zone_lookup.csv’


2025-02-27 19:28:11 (50.7 MB/s) - ‘zone_lookup.csv’ saved [12331/12331]



In [58]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('zone_lookup.csv')

In [59]:
df_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [67]:
df_joined = df.join(df_zones, df.PULocationID == df_zones.LocationID, "left")
df_joined.groupby("Zone").count().orderBy("count").show(1)

+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|Governor's Island...|    1|
+--------------------+-----+
only showing top 1 row

