In [1]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/11 05:51:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/11 05:51:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Read data

In [3]:
df_yellow = spark.read.parquet('../data/raw/yellow/2024/10/yellow_tripdata_2024-10.parquet')

                                                                                

In [4]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



### Partition data and write it to storage

In [6]:
year = 2024
month = 10
print(f'processing data for {year}/{month}')

input_path = f'../data/raw/yellow/{year}/{month:02d}/yellow_tripdata_{year}-{month:02d}.parquet'
output_path = f'../data/pq/yellow/{year}/{month:02d}/'

df_yellow = spark.read.parquet(input_path)
df_yellow.printSchema()

df_yellow \
    .repartition(4) \
    .write.parquet(output_path, mode='overwrite')

processing data for 2024/10
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



                                                                                

# Use SQL with data

In [7]:
df_yellow.createOrReplaceTempView('yellow_trips_data')

In [8]:
spark.sql("""
SELECT
    count(1)
FROM
    yellow_trips_data
WHERE
    tpep_pickup_datetime >= '2024-10-15' AND
    tpep_pickup_datetime < '2024-10-16'    
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



# Longest distance

In [31]:
spark.sql("""
SELECT
    trip_distance,
    DATEDIFF(hour, tpep_pickup_datetime, tpep_dropoff_datetime) as trip_duration_h
FROM
    yellow_trips_data
SORT BY trip_duration_h DESC
""").show()

+-------------+---------------+
|trip_distance|trip_duration_h|
+-------------+---------------+
|          0.0|            143|
|         0.96|             26|
|         1.91|             23|
|         1.93|             23|
|         2.52|             23|
|         5.32|             23|
|         0.73|             23|
|        18.58|             23|
|         1.49|             23|
|          0.7|             23|
|         2.21|             23|
|         3.37|             23|
|         1.95|             23|
|         1.06|             23|
|         3.29|             23|
|         4.57|             23|
|         1.55|             23|
|         0.82|             23|
|        14.24|             23|
|         1.98|             23|
+-------------+---------------+
only showing top 20 rows



# Longest Duration in hours

In [29]:
spark.sql("""
SELECT
    MAX(DATEDIFF(hour, tpep_pickup_datetime, tpep_dropoff_datetime)) AS max_date_diff
FROM
    yellow_trips_data
""").show()



+-------------+
|max_date_diff|
+-------------+
|          162|
+-------------+



                                                                                