### Data Transformation - Silver Layer
1. Read data from external table 
1. Rename columns for clarity
1. Add new columns, e.g. `service_type`, etc.
1. Update proper data types of columns in table
1. Remove rows with null values, duplicates and filter data pickup datetime according to year_month for which data is loaded.
1. Add Unique Identifier 
1. Keep only common data in both yellow and green taxi types.
1. Join with the `taxi_zone_lookup` table
1. Merge/upsert data into a managed Delta table


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
%run "../include/common_functions"

In [0]:
dbutils.widgets.text("p_file_date","2025-07-01")
v_file_date = dbutils.widgets.get("p_file_date")

In [0]:
v_file_date

In [0]:
year_month = v_file_date[:7]
print(year_month)

#### 1. Data loading

In [0]:
yellow_taxi_df = spark.sql(f"""SELECT * 
                          FROM taxi_trips_2025.bronze.yellow_taxi 
                          WHERE file_year_month = '{year_month}'""") 

In [0]:
green_taxi_df = spark.sql(f"""SELECT * 
                          FROM taxi_trips_2025.bronze.green_taxi 
                          WHERE file_year_month = '{year_month}'""") 

#### 2. Column Rename

In [0]:
yellow_taxi_df = yellow_taxi_df.withColumnRenamed("VendorID","vendor_id") \
.withColumnRenamed("tpep_pickup_datetime","pickup_datetime") \
.withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime") \
.withColumnRenamed("RatecodeID","rate_code_id") \
.withColumnRenamed("PULocationID","pickup_location_id") \
.withColumnRenamed("DOLocationID","dropoff_location_id") \
.withColumnRenamed("Airport_fee", "airport_fee") 

In [0]:
green_taxi_df = green_taxi_df.withColumnRenamed("VendorID","vendor_id") \
.withColumnRenamed("lpep_pickup_datetime","pickup_datetime") \
.withColumnRenamed("lpep_dropoff_datetime","dropoff_datetime") \
.withColumnRenamed("RatecodeID","rate_code_id") \
.withColumnRenamed("PULocationID","pickup_location_id") \
.withColumnRenamed("DOLocationID","dropoff_location_id")

#### 3. Add new columns

In [0]:
#1. For yellow taxi - trip_id
yellow_taxi_df = yellow_taxi_df.withColumn(
        "vendor_desc",
        when(col("vendor_id") == 1, "Creative Mobile Technologies, LLC")
        .when(col("vendor_id") == 2, "Curb Mobility, LLC")
        .when(col("vendor_id") == 6, "Myle Technologies Inc")
        .when(col("vendor_id") == 7, "Helix")
        .otherwise("Invalid")
    ) \
.withColumn(
        "rate_code_desc",
        when(col("rate_code_id") == 1, "Standard rate")
        .when(col("rate_code_id") == 2, "JFK")
        .when(col("rate_code_id") == 3, "Newark")
        .when(col("rate_code_id") == 4, "Nassau or Westchester")
        .when(col("rate_code_id") == 5, "Negotiated fare")
        .when(col("rate_code_id") == 6, "Group ride")
        .when(col("rate_code_id") == 99, "Null/Unknown")
        .otherwise("Invalid")
    ) \
.withColumn(
        "payment_desc",
        when(col("payment_type") == 0, "Flex Fare trip")
        .when(col("payment_type") == 1, "Credit card")
        .when(col("payment_type") == 2, "Cash")
        .when(col("payment_type") == 3, "No charge")
        .when(col("payment_type") == 4, "Dispute")
        .when(col("payment_type") == 5, "Unknown")
        .when(col("payment_type") == 6, "Voided trip")
        .otherwise("Invalid")
    ) \
.withColumn('service_type', lit("yellow")) 


In [0]:
green_taxi_df = green_taxi_df.withColumn(
        "vendor_desc",
        when(col("vendor_id") == 1, "Creative Mobile Technologies, LLC")
        .when(col("vendor_id") == 2, "Curb Mobility, LLC")
        .when(col("vendor_id") == 6, "Myle Technologies Inc")
        .otherwise("Invalid")
    ) \
.withColumn(
        "rate_code_desc",
        when(col("rate_code_id") == 1, "Standard rate")
        .when(col("rate_code_id") == 2, "JFK")
        .when(col("rate_code_id") == 3, "Newark")
        .when(col("rate_code_id") == 4, "Nassau or Westchester")
        .when(col("rate_code_id") == 5, "Negotiated fare")
        .when(col("rate_code_id") == 6, "Group ride")
        .when(col("rate_code_id") == 99, "Null/Unknown")
        .otherwise("Invalid")
    ) \
.withColumn(
        "payment_desc",
        when(col("payment_type") == 0, "Flex Fare trip")
        .when(col("payment_type") == 1, "Credit card")
        .when(col("payment_type") == 2, "Cash")
        .when(col("payment_type") == 3, "No charge")
        .when(col("payment_type") == 4, "Dispute")
        .when(col("payment_type") == 5, "Unknown")
        .when(col("payment_type") == 6, "Voided trip")
        .otherwise("Invalid")
    ) \
.withColumn('service_type', lit("green")) 


In [0]:
yellow_taxi_df.show(2)

#### 4. Update data types

In [0]:
yellow_taxi_df = yellow_taxi_df.withColumn('passenger_count', col('passenger_count').cast(IntegerType())) \
.withColumn("rate_code_id", col("rate_code_id").cast(IntegerType())) \
.withColumn('payment_type', col('payment_type').cast(IntegerType())) \
.withColumn('fare_amount', col('fare_amount').cast(DecimalType(10,2))) \
.withColumn('extra', col('extra').cast(DecimalType(10,2))) \
.withColumn('mta_tax', col('mta_tax').cast(DecimalType(10,2))) \
.withColumn('tip_amount', col('tip_amount').cast(DecimalType(10,2))) \
.withColumn('improvement_surcharge', col('improvement_surcharge').cast(DecimalType(10,2))) \
.withColumn('total_amount', col('total_amount').cast(DecimalType(10,2))) \
.withColumn('congestion_surcharge', col('congestion_surcharge').cast(DecimalType(10,2))) \
.withColumn('airport_fee', col('airport_fee').cast(DecimalType(10,2))) \
.withColumn('cbd_congestion_fee', col('cbd_congestion_fee').cast(DecimalType(10,2))) \
.withColumn('tolls_amount', col('tolls_amount').cast(DecimalType(10,2))) 


In [0]:

green_taxi_df = green_taxi_df.withColumn('passenger_count', col('passenger_count').cast(IntegerType())) \
.withColumn("rate_code_id", col("rate_code_id").cast(IntegerType())) \
.withColumn('payment_type', col('payment_type').cast(IntegerType())) \
.withColumn('trip_type', col('trip_type').cast(IntegerType())) \
.withColumn('fare_amount', col('fare_amount').cast(DecimalType(10,2))) \
.withColumn('extra', col('extra').cast(DecimalType(10,2))) \
.withColumn('mta_tax', col('mta_tax').cast(DecimalType(10,2))) \
.withColumn('tip_amount', col('tip_amount').cast(DecimalType(10,2))) \
.withColumn('tip_amount', col('tip_amount').cast(DecimalType(10,2))) \
.withColumn('ehail_fee', col('ehail_fee').cast(DecimalType(10,2))) \
.withColumn('improvement_surcharge', col('improvement_surcharge').cast(DecimalType(10,2))) \
.withColumn('total_amount', col('total_amount').cast(DecimalType(10,2))) \
.withColumn('congestion_surcharge', col('congestion_surcharge').cast(DecimalType(10,2))) \
.withColumn('cbd_congestion_fee', col('cbd_congestion_fee').cast(DecimalType(10,2))) \
.withColumn('tolls_amount', col('tolls_amount').cast(DecimalType(10,2)))  

#### 5. Remove null values and filter pcikup_datetime 

In [0]:
yellow_taxi_df = yellow_taxi_df.filter(col("pickup_datetime") >= v_file_date)
green_taxi_df = green_taxi_df.filter(col("pickup_datetime") >= v_file_date)

In [0]:
yellow_taxi_df = yellow_taxi_df.dropDuplicates()
green_taxi_df = green_taxi_df.dropDuplicates()

In [0]:
yellow_taxi_df = yellow_taxi_df.filter(
    (col("passenger_count") >= 0) &
    (col("trip_distance") >= 0) &
    (col("fare_amount") >= 0) &
    (col("extra") >= 0) &
    (col("mta_tax") >= 0) &
    (col("tip_amount") >= 0) &
    (col("tolls_amount") >= 0) &
    (col("improvement_surcharge") >= 0) &
    (col("total_amount") >= 0) &
    (col("congestion_surcharge") >= 0) &
    (col("cbd_congestion_fee") >= 0))  


In [0]:
green_taxi_df = green_taxi_df.filter(
    (col("passenger_count") >= 0) &
    (col("trip_distance") >= 0) &
    (col("fare_amount") >= 0) &
    (col("extra") >= 0) &
    (col("mta_tax") >= 0) &
    (col("tip_amount") >= 0) &
    (col("tolls_amount") >= 0) &
    (col("improvement_surcharge") >= 0) &
    (col("total_amount") >= 0) &
    (col("congestion_surcharge") >= 0) &
    (col("cbd_congestion_fee") >= 0))

In [0]:
for column in yellow_taxi_df.columns:
    null_count = yellow_taxi_df.filter(col(column).isNull()).count()
    print(f"{column} - {null_count}")

In [0]:
yellow_taxi_df = yellow_taxi_df.filter(col("passenger_count").isNotNull())

In [0]:
yellow_taxi_df.show(2)

In [0]:
for column in green_taxi_df.columns:
    null_count = green_taxi_df.filter(col(column).isNull()).count()
    print(f"{column} - {null_count}")

In [0]:
green_taxi_df = green_taxi_df.filter(col("passenger_count").isNotNull())

In [0]:
yellow_taxi_df.show(5)

In [0]:
yellow_taxi_df = yellow_taxi_df.withColumn(
    "trip_id",
    sha2(concat_ws("_",
                   "vendor_id",
                   "pickup_datetime",
                   "dropoff_datetime",
                   "passenger_count",
                   "trip_distance",
                    "pickup_location_id",
                    "dropoff_location_id",
                    "service_type"
                   ), 256)
) 

In [0]:
green_taxi_df = green_taxi_df.withColumn(
    "trip_id",
    sha2(concat_ws("_",
                   "vendor_id",
                   "pickup_datetime",
                   "dropoff_datetime",
                   "passenger_count",
                   "trip_distance",
                    "pickup_location_id",
                    "dropoff_location_id",
                    "service_type"
                   ), 256)
) 

####![](path) 6. Keeping common columns in both df's

In [0]:
yellow_taxi_df.groupBy(
"trip_id"
).agg(
    count("*").alias("cnt")
).filter(
    col("cnt") > 1
).show()

In [0]:
yellow_df_col = yellow_taxi_df.columns
green_df_col = green_taxi_df.columns

common_cols = list(set(yellow_df_col).intersection(set(green_df_col)))

In [0]:
common_cols

In [0]:
# Order the columns in each df
yellow_taxi_df = yellow_taxi_df.select([
 'trip_id',
 'vendor_id',
 'vendor_desc',
 'pickup_datetime',
 'dropoff_datetime',
 'file_year_month',
 'service_type',
 'passenger_count',
 'trip_distance',
 'rate_code_id',
 'rate_code_desc',
 'store_and_fwd_flag',
 'pickup_location_id',
 'dropoff_location_id',
 'payment_type',
 'payment_desc',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'cbd_congestion_fee'])
green_taxi_df = green_taxi_df.select([
 'trip_id',
 'vendor_id',
 'vendor_desc',
 'pickup_datetime',
 'dropoff_datetime',
 'file_year_month',
 'service_type',
 'passenger_count',
 'trip_distance',
 'rate_code_id',
 'rate_code_desc',
 'store_and_fwd_flag',
 'pickup_location_id',
 'dropoff_location_id',
 'payment_type',
 'payment_desc',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'cbd_congestion_fee'])

In [0]:
yellow_taxi_df.groupBy(
    "vendor_id",
    "pickup_datetime",
    "dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "pickup_location_id",
    "dropoff_location_id"
).agg(
    count("*").alias("cnt")
).filter(
    col("cnt") > 1
).show()

In [0]:
green_taxi_df.groupBy(
    "vendor_id",
    "pickup_datetime",
    "dropoff_datetime",
    "passenger_count",
    "trip_distance",
    "pickup_location_id",
    "dropoff_location_id"
).agg(
    count("*").alias("cnt")
).filter(
    col("cnt") > 1
).show()

#### 7. Joining data with taxi zone lookup data

In [0]:
taxi_zone_df = spark.sql("SELECT * from taxi_trips_2025.bronze.taxi_zone_lookup")

In [0]:
taxi_zone_df.show(5)

In [0]:
pickup_taxi_zone_lookup_df = taxi_zone_df.withColumnRenamed('LocationID',"location_id") \
.withColumnRenamed('Borough','pickup_borough') \
.withColumnRenamed('service_zone','pickup_service_zone') \
.withColumnRenamed("Zone","pickup_zone") 

In [0]:
dropoff_taxi_zone_lookup_df =taxi_zone_df.withColumnRenamed('LocationID',"location_id") \
.withColumnRenamed('Borough','dropoff_borough') \
.withColumnRenamed('service_zone','dropoff_service_zone') \
.withColumnRenamed("Zone","dropoff_zone") 

In [0]:
yellow_taxi_df = yellow_taxi_df.join(pickup_taxi_zone_lookup_df, yellow_taxi_df.pickup_location_id == pickup_taxi_zone_lookup_df.location_id, "left")

In [0]:
yellow_taxi_df = yellow_taxi_df.drop("location_id")

In [0]:
yellow_taxi_df = yellow_taxi_df.join(dropoff_taxi_zone_lookup_df, yellow_taxi_df.dropoff_location_id == dropoff_taxi_zone_lookup_df.location_id, "left")

In [0]:
yellow_taxi_df = yellow_taxi_df.drop("location_id")

In [0]:
yellow_taxi_df.count()

In [0]:
# set yellow_taxi_df - columns
yellow_taxi_df = yellow_taxi_df.select(['trip_id',
 'vendor_id',
 'vendor_desc',
 'pickup_datetime',
 'dropoff_datetime',
 'file_year_month',
 'service_type',
 'passenger_count',
 'trip_distance',
 'rate_code_id',
 'rate_code_desc',
 'store_and_fwd_flag',
 'pickup_location_id',
 'pickup_borough',
 'pickup_zone',
 'pickup_service_zone',
 'dropoff_location_id',
 'dropoff_borough',
 'dropoff_zone',
 'dropoff_service_zone',
 'payment_type',
 'payment_desc',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'cbd_congestion_fee']
)

In [0]:
# green taxi  
green_taxi_df = green_taxi_df.join(pickup_taxi_zone_lookup_df, green_taxi_df.pickup_location_id == pickup_taxi_zone_lookup_df.location_id, "left")

In [0]:
green_taxi_df =green_taxi_df.drop("location_id")

In [0]:
green_taxi_df = green_taxi_df.join(dropoff_taxi_zone_lookup_df, green_taxi_df.dropoff_location_id == dropoff_taxi_zone_lookup_df.location_id, "left")

In [0]:
green_taxi_df =green_taxi_df.drop("location_id")

In [0]:
green_taxi_df.count()

In [0]:
# set green_taxi-df_columns
green_taxi_df = green_taxi_df.select(['trip_id',
 'vendor_id',
 'vendor_desc',
 'pickup_datetime',
 'dropoff_datetime',
 'file_year_month',
 'service_type',
 'passenger_count',
 'trip_distance',
 'rate_code_id',
 'rate_code_desc',
 'store_and_fwd_flag',
 'pickup_location_id',
 'pickup_borough',
 'pickup_zone',
 'pickup_service_zone',
 'dropoff_location_id',
 'dropoff_borough',
 'dropoff_zone',
 'dropoff_service_zone',
 'payment_type',
 'payment_desc',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'cbd_congestion_fee']
)

#### 8. Merge upsert data in silver layer

In [0]:
silver_folder_path = "abfss://silver@nyctaxitrips2025.dfs.core.windows.net/"

In [0]:
merge_condition = "tgt.trip_id = src.trip_id AND tgt.file_year_month = src.file_year_month"
merge_delta_data(
    yellow_taxi_df, 
    'taxi_trips_2025', 
    'silver', 
    'yellow_taxi_trips', 
    silver_folder_path, 
    merge_condition, 
    "file_year_month")

In [0]:
%sql
select count(1) 
from taxi_trips_2025.silver.green_taxi_trips
where file_year_month = "2025-08"

In [0]:
merge_delta_data(
    green_taxi_df, 
    'taxi_trips_2025', 
    'silver', 
    'green_taxi_trips', 
    silver_folder_path, 
    merge_condition, 
    "file_year_month")

In [0]:
%sql
select * 
from taxi_trips_2025.silver.green_taxi_trips
where file_year_month = "2025-08"
limit 10;

In [0]:
dbutils.notebook.exit("Success")