# Silver Layer: Clean & Transform
                     


In [None]:
from pyspark.sql.functions import *

# Read Parquet
df = spark.read.parquet("Files/nyc_tax")


# Filter using actual camelCase column names
df = df.filter(
    col("passengerCount").isNotNull() &
    col("tripDistance").isNotNull() &
    col("totalAmount").isNotNull() &
    col("lpepPickupDatetime").isNotNull() &
    col("lpepDropoffDatetime").isNotNull()
)

# Rename to snake_case and standardize
df = df.withColumnRenamed("lpepPickupDatetime", "pickup_datetime") \
       .withColumnRenamed("lpepDropoffDatetime", "dropoff_datetime") \
       .withColumnRenamed("passengerCount", "passenger_count") \
       .withColumnRenamed("tripDistance", "trip_distance") \
       .withColumnRenamed("totalAmount", "total_amount") \
       .withColumnRenamed("vendorID", "VendorID") \
       .withColumnRenamed("rateCodeID", "RatecodeID") \
       .withColumnRenamed("storeAndFwdFlag", "store_and_fwd_flag") \
       .withColumnRenamed("puLocationId", "PULocationID") \
       .withColumnRenamed("doLocationId", "DOLocationID") \
       .withColumnRenamed("paymentType", "payment_type") \
       .withColumnRenamed("fareAmount", "fare_amount") \
       .withColumnRenamed("extra", "extra") \
       .withColumnRenamed("mtaTax", "mta_tax") \
       .withColumnRenamed("tipAmount", "tip_amount") \
       .withColumnRenamed("tollsAmount", "tolls_amount") \
       .withColumnRenamed("improvementSurcharge", "improvement_surcharge")

# Convert strings to timestamps
df = df.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime"))) \
       .withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime")))

# Select final columns
df = df.select(
    "VendorID", "pickup_datetime", "dropoff_datetime", "passenger_count",
    "trip_distance", "RatecodeID", "store_and_fwd_flag", "PULocationID", 
    "DOLocationID", "payment_type", "fare_amount", "extra", "mta_tax", 
    "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount"
)

# Write to Delta table
df.write.format("delta").mode("overwrite").saveAsTable("nyc_taxi_silver")

