# Data Access

In [0]:
spark.conf.set("fs.azure.account.auth.type.{storage-account}.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.{storage-account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.{storage-account}.dfs.core.windows.net", "87a72210-8873-47a8-9851-8aaeb44782aa")
spark.conf.set("fs.azure.account.oauth2.client.secret.{storage-account}.dfs.core.windows.net", "LcL8Q~F-xxAq.3Czqb7TPIkwnan71Gb3qGHo3cpa")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.{storage-account}.dfs.core.windows.net", "https://login.microsoftonline.com/7867077d-8cbc-47e2-bc35-88b07ce2955d/oauth2/token")

# Importing Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Reading CSV Data

## 1. Trip Type

In [0]:
df_trip_type = spark.read.format('csv')\
                    .option('inferSchema', True)\
                    .option('header', True)\
                    .load("abfss://{container-name}@{storage-account}.dfs.core.windows.net/trip_type/")

In [0]:
display(df_trip_type)

trip_type,description
1,Street-hail
2,Dispatch


## 2. Trip Zone

In [0]:
df_trip_zone = spark.read.format('csv')\
                    .option('inferSchema', True)\
                    .option('header', True)\
                    .load("abfss://{container-name}@{storage-account}.dfs.core.windows.net/trip_zone/")

In [0]:
display(df_trip_zone.limit(30))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


## 3. Trip 2025 data

In [0]:
myschema = """
                VendorID BIGINT,
                lpep_pickup_datetime TIMESTAMP,
                lpep_dropoff_datetime TIMESTAMP,
                store_and_fwd_flag STRING,
                RatecodeID BIGINT,
                PULocationID BIGINT,
                DOLocationID BIGINT,
                passenger_count BIGINT,
                trip_distance DOUBLE,
                fare_amount DOUBLE,
                extra DOUBLE,
                mta_tax DOUBLE,
                tip_amount DOUBLE,
                tolls_amount DOUBLE,
                ehail_fee DOUBLE,
                improvement_surcharge DOUBLE,
                total_amount DOUBLE,
                payment_type BIGINT,
                trip_type BIGINT,
                congestion_surcharge DOUBLE
"""


In [0]:
df_trip2025_data = spark.read.format('parquet')\
                        .schema(myschema)\
                        .option('header', True)\
                        .option('recursiveFileLookup', True)\
                        .load("abfss://{container-name}@{storage-account}.dfs.core.windows.net/trip2025data")

In [0]:
display(df_trip2025_data.limit(30))

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
2,2025-05-01T00:17:04Z,2025-05-01T00:56:06Z,N,1,25,216,1,9.34,44.3,1.0,0.5,0.0,0.0,,1.0,46.8,1,1,0.0
2,2025-05-01T00:56:16Z,2025-05-01T01:10:26Z,N,1,160,129,1,2.95,16.3,1.0,0.5,0.0,0.0,,1.0,18.8,2,1,0.0
1,2025-05-01T00:24:49Z,2025-05-01T00:42:29Z,N,1,260,179,1,3.0,18.4,1.0,1.5,0.0,0.0,,1.0,20.9,2,1,0.0
2,2025-05-01T00:27:11Z,2025-05-01T00:33:21Z,N,1,130,216,1,1.61,9.3,1.0,0.5,0.0,0.0,,1.0,11.8,2,1,0.0
2,2025-05-01T00:32:59Z,2025-05-01T00:41:34Z,N,1,244,151,2,3.44,15.6,1.0,0.5,4.52,0.0,,1.0,22.62,1,1,0.0
2,2025-04-30T23:58:57Z,2025-05-01T00:02:31Z,N,1,42,41,1,0.66,6.5,1.0,0.5,2.0,0.0,,1.0,11.0,1,1,0.0
2,2025-05-01T00:38:03Z,2025-05-01T00:43:28Z,N,1,240,265,1,1.63,9.3,1.0,0.5,0.0,0.0,,1.0,11.8,1,1,0.0
2,2025-05-01T00:13:48Z,2025-05-01T00:26:19Z,N,1,129,70,1,2.15,13.5,1.0,0.5,0.0,0.0,,1.0,16.0,2,1,0.0
2,2025-05-01T00:08:00Z,2025-05-01T00:22:00Z,N,1,244,42,1,2.87,15.6,1.0,0.5,0.0,0.0,,1.0,18.1,2,1,0.0
2,2025-05-01T00:48:03Z,2025-05-01T00:57:01Z,N,1,75,262,1,1.52,10.7,1.0,0.5,2.39,0.0,,1.0,18.34,1,1,2.75


# Data Transformation

## 1. Trip Type

In [0]:
df_trip_type_data = df_trip_type.withColumnRenamed('description', 'trip_description')

In [0]:
display(df_trip_type_data)

trip_type,trip_description
1,Street-hail
2,Dispatch


In [0]:
df_trip_type_data.write.format('parquet')\
                 .mode('append')\
                 .save('abfss://{container-name}@{storage-account}.dfs.core.windows.net/trip_type_data')

## 2. Trip Zone

In [0]:
df_trip_zone_data = df_trip_zone.withColumn('zone1', split(col('Zone'), '/')[0])\
                                .withColumn('zone2', split(col('Zone'), '/')[1])

In [0]:
display(df_trip_zone_data.limit(30))

LocationID,Borough,Zone,service_zone,zone1,zone2
1,EWR,Newark Airport,EWR,Newark Airport,
2,Queens,Jamaica Bay,Boro Zone,Jamaica Bay,
3,Bronx,Allerton/Pelham Gardens,Boro Zone,Allerton,Pelham Gardens
4,Manhattan,Alphabet City,Yellow Zone,Alphabet City,
5,Staten Island,Arden Heights,Boro Zone,Arden Heights,
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone,Arrochar,Fort Wadsworth
7,Queens,Astoria,Boro Zone,Astoria,
8,Queens,Astoria Park,Boro Zone,Astoria Park,
9,Queens,Auburndale,Boro Zone,Auburndale,
10,Queens,Baisley Park,Boro Zone,Baisley Park,


In [0]:
df_trip_zone_data.write.format('parquet')\
                .mode('append')\
                .save('abfss://{container-name}@{storage-account}.dfs.core.windows.net/trip_zone_data')

## 3. Trip 2025 Data

In [0]:
df_trip_2025_trip_data = df_trip2025_data.withColumn('trip_date', to_date('lpep_pickup_datetime'))

In [0]:
df_trip_2025_trip_data = df_trip_2025_trip_data.drop("RatecodeID")

In [0]:
from pyspark.sql.functions import date_format
df_trip_2025_trip_data = df_trip_2025_trip_data.withColumn("pick-up_time", date_format(col("lpep_pickup_datetime"), "HH:mm:ss"))\
                                               .withColumn("drop-off_time", date_format(col("lpep_dropoff_datetime"), "HH:mm:ss"))\
                                               .drop('lpep_pickup_datetime', 'lpep_dropoff_datetime')

In [0]:
display(df_trip_2025_trip_data.limit(30))

VendorID,store_and_fwd_flag,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,trip_date,pick-up_time,drop-off_time
2,N,25,216,1,9.34,44.3,1.0,0.5,0.0,0.0,,1.0,46.8,1,1,0.0,2025-05-01,00:17:04,00:56:06
2,N,160,129,1,2.95,16.3,1.0,0.5,0.0,0.0,,1.0,18.8,2,1,0.0,2025-05-01,00:56:16,01:10:26
1,N,260,179,1,3.0,18.4,1.0,1.5,0.0,0.0,,1.0,20.9,2,1,0.0,2025-05-01,00:24:49,00:42:29
2,N,130,216,1,1.61,9.3,1.0,0.5,0.0,0.0,,1.0,11.8,2,1,0.0,2025-05-01,00:27:11,00:33:21
2,N,244,151,2,3.44,15.6,1.0,0.5,4.52,0.0,,1.0,22.62,1,1,0.0,2025-05-01,00:32:59,00:41:34
2,N,42,41,1,0.66,6.5,1.0,0.5,2.0,0.0,,1.0,11.0,1,1,0.0,2025-04-30,23:58:57,00:02:31
2,N,240,265,1,1.63,9.3,1.0,0.5,0.0,0.0,,1.0,11.8,1,1,0.0,2025-05-01,00:38:03,00:43:28
2,N,129,70,1,2.15,13.5,1.0,0.5,0.0,0.0,,1.0,16.0,2,1,0.0,2025-05-01,00:13:48,00:26:19
2,N,244,42,1,2.87,15.6,1.0,0.5,0.0,0.0,,1.0,18.1,2,1,0.0,2025-05-01,00:08:00,00:22:00
2,N,75,262,1,1.52,10.7,1.0,0.5,2.39,0.0,,1.0,18.34,1,1,2.75,2025-05-01,00:48:03,00:57:01


In [0]:
df_trip_2025_trip_data.write.format('parquet')\
                .mode('append')\
                .save('abfss://{container-name}@{storage-account}.dfs.core.windows.net/trip_2025_trip_data')