In [0]:
# 🚕 Bronze to Silver - NYC Yellow Taxi (Azure Databricks)

This notebook performs the **Bronze → Silver** step:

- Reads raw Yellow Taxi data from **bronze** container
- Applies minor transformations
- Writes cleaned data to **silver** container
- Registers the table in metastore as `silver.nyc_yellow_taxi`

## Import Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import logging

## Configurations

#### Logger

In [0]:
# Set up logger
logger = logging.getLogger("BronzeToSilver")
if not logger.hasHandlers():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger.info("Logger initialized.")

## Constants and Parameters

In [0]:
# Define source and target paths (ADLS Gen2 paths with abfss)
bronze_path = "abfss://bronze@assportfolio.dfs.core.windows.net/nyc/taxi/yellow/raw/"
silver_path = "abfss://silver@assportfolio.dfs.core.windows.net/nyc/taxi/yellow/"
table_name = "silver.nyc_yellow_taxi"  # <catalog.schema.table>

## Main Process

#### Read Bronze Data

In [0]:
# Read from Bronze (Parquet)
df_bronze = spark.read.parquet(bronze_path)

logger.info(f"Read {df_bronze.count()} records from Bronze")
df_bronze.printSchema()
df_bronze.show(5)

#### Minor Transformations

In [0]:
# Sample transformation: rename columns, cast datatypes, drop nulls
df_silver = (
    df_bronze
    .withColumnRenamed("vendorid", "vendor_id")
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
    .dropna(subset=["pickup_datetime", "dropoff_datetime"])
)

logger.info(f"Transformed record count: {df_silver.count()}")

#### Write to Silver Layer

In [0]:
# Write to Silver as a Delta table (overwrite mode)
df_silver.write.format("delta").mode("overwrite").save(silver_path)

logger.info(f"Delta table written to: {silver_path}")

#### Register Delta Table in Metastore

In [0]:
# 1. Create schema if it doesn't exist
spark.sql("""
    CREATE SCHEMA IF NOT EXISTS silver
""")
logger.info("Schema `silver` created or already exists.")

# 2. Register Delta table inside the silver schema
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.nyc_yellow_taxi
    USING DELTA
    LOCATION '{silver_path}'
""")
logger.info("Delta table registered: silver.nyc_yellow_taxi")
