In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

print("NYC TAXI Bronze to Silver")

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

def standardize_schema(df):
    if 'Airport_fee' in df.columns:
        df = df.withColumnRenamed('Airport_fee', 'airport_fee')
    
    if 'airport_fee' not in df.columns:
        df = df.withColumn('airport_fee', F.lit(None).cast(DoubleType()))
    
    partition_cols = ['year', 'month', 'day']
    for col in partition_cols:
        if col in df.columns:
            df = df.drop(col)
    
    standardized = df.select(
        F.col('VendorID').cast(IntegerType()).alias('VendorID'),
        F.col('tpep_pickup_datetime').alias('tpep_pickup_datetime'),
        F.col('tpep_dropoff_datetime').alias('tpep_dropoff_datetime'),
        F.col('passenger_count').cast(IntegerType()).alias('passenger_count'),
        F.col('trip_distance').cast(DoubleType()).alias('trip_distance'),
        F.col('RatecodeID').cast(IntegerType()).alias('RatecodeID'),
        F.col('store_and_fwd_flag').alias('store_and_fwd_flag'),
        F.col('PULocationID').cast(IntegerType()).alias('PULocationID'),
        F.col('DOLocationID').cast(IntegerType()).alias('DOLocationID'),
        F.col('payment_type').cast(IntegerType()).alias('payment_type'),
        F.col('fare_amount').cast(DoubleType()).alias('fare_amount'),
        F.col('extra').cast(DoubleType()).alias('extra'),
        F.col('mta_tax').cast(DoubleType()).alias('mta_tax'),
        F.col('tip_amount').cast(DoubleType()).alias('tip_amount'),
        F.col('tolls_amount').cast(DoubleType()).alias('tolls_amount'),
        F.col('improvement_surcharge').cast(DoubleType()).alias('improvement_surcharge'),
        F.col('total_amount').cast(DoubleType()).alias('total_amount'),
        F.col('congestion_surcharge').cast(DoubleType()).alias('congestion_surcharge'),
        F.col('airport_fee').cast(DoubleType()).alias('airport_fee')
    )
    
    return standardized

problematic_years = [2020, 2023]

all_years = []

for year in range(2019, 2025):
    print(f"\nProcessing {year}...")
    
    try:
        year_path = f"Files/bronze/nyc_taxi/year={year}/"
        
        if year in problematic_years:
            print(f"  Using month-by-month processing for {year}")
            month_dfs = []
            
            for month in range(1, 13):
                try:
                    month_path = f"{year_path}month={month:02d}/"
                    df_month = spark.read.parquet(month_path)
                    df_month_std = standardize_schema(df_month)
                    month_dfs.append(df_month_std)
                    print(f"    Month {month:02d}")
                except Exception as e:
                    print(f"    Month {month:02d}: {str(e)[:80]}")
            
            if month_dfs:
                df_year_combined = month_dfs[0]
                for df in month_dfs[1:]:
                    df_year_combined = df_year_combined.union(df)
                
                print(f"  Columns before save: {len(df_year_combined.columns)}")
                

                temp_table = f"temp_year_{year}"
                df_year_combined.write \
                    .mode("overwrite") \
                    .format("delta") \
                    .option("overwriteSchema", "true") \
                    .saveAsTable(temp_table)
                
                df_materialized = spark.table(temp_table)
                
                print(f"  Columns after load: {len(df_materialized.columns)}")
                
                row_count = df_materialized.count()
                print(f"  {year}: {row_count:,} records processed")
                all_years.append(df_materialized)
            else:
                print(f"  {year}: No months successfully processed")
        else:
            df_year = spark.read.parquet(year_path)
            df_standardized = standardize_schema(df_year)
            
            print(f"  Columns before save: {len(df_standardized.columns)}")
            
            temp_table = f"temp_year_{year}"
            df_standardized.write \
                .mode("overwrite") \
                .format("delta") \
                .option("overwriteSchema", "true") \
                .saveAsTable(temp_table)
            
            df_materialized = spark.table(temp_table)
            
            print(f"  Columns after load: {len(df_materialized.columns)}")
            print(f"  Column names: {df_materialized.columns[:10]}...")
            
            row_count = df_materialized.count()
            print(f"  {year}: {row_count:,} records processed")
            all_years.append(df_materialized)
        
    except Exception as e:
        print(f"  {year}: Error - {str(e)[:150]}")
        continue

print("\nCombining all years...\n")

if len(all_years) > 0:
    df_combined = all_years[0]
    for df in all_years[1:]:
        df_combined = df_combined.union(df)
    
    total_records = df_combined.count()
    print(f"Total records: {total_records:,}")
    
    print("\nApplying filters and transformations...")
    
    df_silver = df_combined.filter(
        (F.year('tpep_pickup_datetime').between(2019, 2024)) &
        (F.year('tpep_dropoff_datetime').between(2019, 2024)) &
        (F.col('tpep_dropoff_datetime') > F.col('tpep_pickup_datetime')) &
        
        (F.col('fare_amount') >= 0) & (F.col('fare_amount') <= 500) &
        (F.col('trip_distance') > 0) & (F.col('trip_distance') <= 100) &
        (F.col('passenger_count') >= 1) & (F.col('passenger_count') <= 6)
    )
    
    df_silver = df_silver \
        .withColumn('trip_duration_minutes', 
                    F.round((F.unix_timestamp('tpep_dropoff_datetime') - 
                            F.unix_timestamp('tpep_pickup_datetime')) / 60, 2)) \
        .withColumn('pickup_date', F.to_date('tpep_pickup_datetime')) \
        .withColumn('pickup_year', F.year('tpep_pickup_datetime')) \
        .withColumn('pickup_month', F.month('tpep_pickup_datetime')) \
        .withColumn('pickup_day', F.dayofmonth('tpep_pickup_datetime')) \
        .withColumn('pickup_hour', F.hour('tpep_pickup_datetime')) \
        .withColumn('pickup_dayofweek', F.dayofweek('tpep_pickup_datetime')) \
        .withColumn('pickup_dayname', F.date_format('tpep_pickup_datetime', 'EEEE')) \
        .withColumn('is_weekend', 
                    F.when(F.col('pickup_dayofweek').isin([1, 7]), True).otherwise(False)) \
        .withColumn('speed_mph', 
                    F.round(F.when(F.col('trip_duration_minutes') > 0, 
                                  F.col('trip_distance') / (F.col('trip_duration_minutes') / 60))
                           .otherwise(0), 2))
    
    df_silver = df_silver.filter(F.col('speed_mph') <= 100)
    
    final_count = df_silver.count()
    print(f"After filters: {final_count:,} records")
    print(f"Records removed: {total_records - final_count:,} ({((total_records - final_count) / total_records * 100):.2f}%)")
    
    print("\nSaving to Silver...")
    
    spark.sql("DROP TABLE IF EXISTS silver_nyc_taxi")
    
    df_silver.write \
        .mode("overwrite") \
        .partitionBy("pickup_year", "pickup_month") \
        .format("delta") \
        .option("overwriteSchema", "true") \
        .saveAsTable("silver_nyc_taxi")
    
    print("YEAR-BY-YEAR SUMMARY")
    
    spark.sql("""
        SELECT 
            pickup_year,
            COUNT(*) as total_trips,
            COUNT(DISTINCT pickup_month) as months,
            ROUND(SUM(total_amount)/1000000, 2) as revenue_millions,
            ROUND(AVG(total_amount), 2) as avg_fare,
            ROUND(AVG(trip_distance), 2) as avg_distance_mi,
            ROUND(AVG(trip_duration_minutes), 1) as avg_duration_min,
            ROUND(AVG(speed_mph), 1) as avg_speed_mph
        FROM silver_nyc_taxi
        GROUP BY pickup_year
        ORDER BY pickup_year
    """).show()
    
    print("VALIDATION CHECKS")
    
    print("\nAirport fee coverage by year:")
    spark.sql("""
        SELECT 
            pickup_year,
            COUNT(*) as total_trips,
            SUM(CASE WHEN airport_fee IS NOT NULL AND airport_fee > 0 THEN 1 ELSE 0 END) as trips_with_airport_fee,
            ROUND(SUM(CASE WHEN airport_fee IS NOT NULL AND airport_fee > 0 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as pct_with_fee
        FROM silver_nyc_taxi
        GROUP BY pickup_year
        ORDER BY pickup_year
    """).show()
    
    print("TRANSFORMATION COMPLETE!")
    print(f"\nTable: silver_nyc_taxi")
    print(f"Total records: {final_count:,}")
    print(f"Years covered: 2019-2024")
    print(f"Partitioned by: pickup_year, pickup_month")
    
    print("\nCleaning up temporary tables...")
    for year in range(2019, 2025):
        spark.sql(f"DROP TABLE IF EXISTS temp_year_{year}")
    print("Cleanup complete!")
    
    spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")    
else:
    print("\nNo data was successfully processed!")

StatementMeta(, 14a22c48-e129-414d-959c-ab8343fac341, 3, Finished, Available, Finished)

NYC TAXI Bronze to Silver

Processing 2019...
  Columns before save: 19
  Columns after load: 19
  Column names: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type']...
  2019: 84,598,444 records processed

Processing 2020...
  Using month-by-month processing for 2020
    Month 01
    Month 02
    Month 03
    Month 04
    Month 05
    Month 06
    Month 07
    Month 08
    Month 09
    Month 10
    Month 11
    Month 12
  Columns before save: 19
  Columns after load: 19
  2020: 24,649,092 records processed

Processing 2021...
  Columns before save: 19
  Columns after load: 19
  Column names: ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type']...
  2021: 30,904,308 records processed

Processing 2022...
  Columns before save: 19
  C