# Bronze Ingestion — Gulf to Bay Databricks

This notebook performs the Bronze‑layer ingestion for the Gulf to Bay modernization pipeline using real CSV assets stored in a Unity Catalog managed volume. The goal is to land raw operational data into Delta format with minimal transformation, preserving source fidelity while adding ingestion metadata for downstream Silver refinement.

The notebook reads the raw CSV files from the `raw_files` volume, infers schema, appends an ingestion timestamp, and writes each dataset as a managed Delta table inside the `gulf_to_bay_databricks.default` schema. These Bronze tables serve as the immutable, auditable foundation for the medallion architecture and will be consumed by the Silver transformation notebook.

In [None]:
# ============================================================
#  BRONZE INGESTION — REAL FABRIC CSVs
#  Gulf to Bay Databricks Modernization Pipeline
#  Kill and fill using TRUNCATE + exception logging
# ============================================================

from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, FloatType
)
from pyspark.sql.functions import col

# ============================================================
# 1. Schemas
# ============================================================

customers_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("address1", StringType(), True),
    StructField("address2", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state_province", StringType(), True),
    StructField("country", StringType(), True),
    StructField("postal_code", StringType(), True)
])

products_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_number", StringType(), True),
    StructField("color", StringType(), True),
    StructField("standard_cost", FloatType(), True),
    StructField("list_price", FloatType(), True),
    StructField("size", StringType(), True),
    StructField("weight", FloatType(), True),
    StructField("category", StringType(), True),
    StructField("subcategory", StringType(), True)
])

sales_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", FloatType(), True),
    StructField("discount", FloatType(), True),
    StructField("line_total", FloatType(), True)
])

# ============================================================
# 2. Create Bronze + exception tables IF NOT EXISTS
# ============================================================

spark.sql("""
    CREATE TABLE IF NOT EXISTS gulf_to_bay_databricks.default.bronze_customers (
          customer_id      string
        , first_name       string
        , last_name        string
        , address1         string
        , address2         string
        , city             string
        , state_province   string
        , country          string
        , postal_code      string
    ) USING DELTA
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gulf_to_bay_databricks.default.bronze_products (
          product_id     string
        , product_name   string
        , product_number string
        , color          string
        , standard_cost  float
        , list_price     float
        , size           string
        , weight         float
        , category       string
        , subcategory    string
    ) USING DELTA
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gulf_to_bay_databricks.default.bronze_sales (
          order_id     string
        , order_date   string
        , customer_id  string
        , product_id   string
        , quantity     int
        , unit_price   float
        , discount     float
        , line_total   float
    ) USING DELTA
""")

# Exception tables (include _corrupt_record)
spark.sql("""
    CREATE TABLE IF NOT EXISTS gulf_to_bay_databricks.default.exceptions_customers (
          customer_id      string
        , first_name       string
        , last_name        string
        , address1         string
        , address2         string
        , city             string
        , state_province   string
        , country          string
        , postal_code      string
        , _corrupt_record  string
    ) USING DELTA
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gulf_to_bay_databricks.default.exceptions_products (
          product_id       string
        , product_name     string
        , product_number   string
        , color            string
        , standard_cost    float
        , list_price       float
        , size             string
        , weight           float
        , category         string
        , subcategory      string
        , _corrupt_record  string
    ) USING DELTA
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS gulf_to_bay_databricks.default.exceptions_sales (
          order_id         string
        , order_date       string
        , customer_id      string
        , product_id       string
        , quantity         int
        , unit_price       float
        , discount         float
        , line_total       float
        , _corrupt_record  string
    ) USING DELTA
""")

# ============================================================
# 3. TRUNCATE all Bronze + exception tables
# ============================================================

for tbl in [
    "bronze_customers", "exceptions_customers",
    "bronze_products", "exceptions_products",
    "bronze_sales", "exceptions_sales"
]:
    spark.sql(f"TRUNCATE TABLE gulf_to_bay_databricks.default.{tbl}")

# ============================================================
# 4. Read CSVs with permissive mode and split clean vs exceptions
# ============================================================

def read_with_exceptions(path, schema):
    df = (
        spark.read
             .schema(schema)
             .option("header", True)
             .option("mode", "PERMISSIVE")
             .option("columnNameOfCorruptRecord", "_corrupt_record")
             .csv(path)
    )

    # CASE A — _corrupt_record exists
    if "_corrupt_record" in df.columns:
        clean = df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")

        exception_schema = schema.add("_corrupt_record", StringType())
        exceptions = (
            df.filter(col("_corrupt_record").isNotNull())
              .select([col(c) for c in exception_schema.fieldNames()])
        )

    # CASE B — no _corrupt_record column (file fully clean)
    else:
        clean = df
        exception_schema = schema.add("_corrupt_record", StringType())
        exceptions = spark.createDataFrame([], exception_schema)

    return clean, exceptions

VOLUME_BASE = "/Volumes/gulf_to_bay_databricks/default/raw_files"

customers_df, exceptions_customers_df = read_with_exceptions(f"{VOLUME_BASE}/customers.csv", customers_schema)
products_df, exceptions_products_df   = read_with_exceptions(f"{VOLUME_BASE}/products.csv", products_schema)
sales_df, exceptions_sales_df         = read_with_exceptions(f"{VOLUME_BASE}/sales.csv", sales_schema)

# ============================================================
# 5. Append clean + exception rows
# ============================================================

customers_df.write.insertInto("gulf_to_bay_databricks.default.bronze_customers")
products_df.write.insertInto("gulf_to_bay_databricks.default.bronze_products")
sales_df.write.insertInto("gulf_to_bay_databricks.default.bronze_sales")

exceptions_customers_df.write.insertInto("gulf_to_bay_databricks.default.exceptions_customers")
exceptions_products_df.write.insertInto("gulf_to_bay_databricks.default.exceptions_products")
exceptions_sales_df.write.insertInto("gulf_to_bay_databricks.default.exceptions_sales")

# ============================================================
# 6. Validate counts
# ============================================================

for tbl in [
    "bronze_customers", "exceptions_customers",
    "bronze_products", "exceptions_products",
    "bronze_sales", "exceptions_sales"
]:
    count = spark.table(f"gulf_to_bay_databricks.default.{tbl}").count()
    print(f"{tbl}: {count}")

print("\n✅ Bronze ingestion complete!")
display(spark.table("gulf_to_bay_databricks.default.bronze_sales").limit(10))