In [0]:
# Inspect a single raw HUD FMR JSON payload to confirm expected keys exist
# before writing Silver parsing logic.
# Expected structure:
# - Top-level: data
# - Geography fields: metro_name, area_name, counties_msa
# - Rent fields: data.basicdata with bedroom categories (Efficiency through Four-Bedroom) and year

row = spark.sql("""
SELECT raw_payload
FROM bronze_hud_fmr_raw
WHERE http_status = 200 AND raw_payload IS NOT NULL
LIMIT 1
""").collect()[0]

print(row["raw_payload"][:1200])

{"data": {"county_name": "", "counties_msa": "Callahan County, TX; Jones County, TX; and Taylor County, TX", "town_name": "", "metro_status": "1", "metro_name": "Abilene, TX", "area_name": "Abilene, TX MSA", "basicdata": {"Efficiency": 944, "One-Bedroom": 950, "Two-Bedroom": 1207, "Three-Bedroom": 1571, "Four-Bedroom": 1899, "year": "2025"}}}


In [0]:

# All sampled records parsed successfully and contained the expected "data" object.
# This confirms payload_schema is compatible and we can proceed to Silver extraction.
from pyspark.sql.functions import col, from_json

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

payload_schema = StructType([
    StructField(
        "data",
        StructType([
            StructField("county_name", StringType(), True),
            StructField("counties_msa", StringType(), True),
            StructField("town_name", StringType(), True),
            StructField("metro_status", StringType(), True),
            StructField("metro_name", StringType(), True),
            StructField("area_name", StringType(), True),
            StructField(
                "basicdata",
                StructType([
                    StructField("Efficiency", IntegerType(), True),
                    StructField("One-Bedroom", IntegerType(), True),
                    StructField("Two-Bedroom", IntegerType(), True),
                    StructField("Three-Bedroom", IntegerType(), True),
                    StructField("Four-Bedroom", IntegerType(), True),
                    StructField("year", StringType(), True)
                ]),
                True
            )
        ]),
        True
    )
])

test = spark.table("bronze_hud_fmr_raw").where(col("http_status")==200).limit(5)

t = test.select(
    col("entity_id"),
    from_json(col("raw_payload"), payload_schema).alias("j")
)

t.select(
    "entity_id",
    col("j").isNull().alias("j_is_null"),
    col("j.data").isNull().alias("data_is_null")
).show(truncate=False)

+----------------+---------+------------+
|entity_id       |j_is_null|data_is_null|
+----------------+---------+------------+
|METRO10180M10180|false    |false       |
|METRO29180N22001|false    |false       |
|METRO10380M10380|false    |false       |
|METRO10420M10420|false    |false       |
|METRO10500M10500|false    |false       |
+----------------+---------+------------+



In [0]:
base = spark.table("silver_hud_fmr_clean")

In [0]:
# BRONZE PAYLOAD INSPECTION
# Confirm the HUD API response shape and verify that rent benchmark values appear under `data.basicdata`.
# Note: The raw HUD payload does not include the literal string "fmr".
# FMR values are encoded as bedroom-category keys (e.g., "Two-Bedroom") inside `basicdata`.
import json

s = spark.sql("""
SELECT raw_payload
FROM bronze_hud_fmr_raw
WHERE http_status=200 AND raw_payload IS NOT NULL
LIMIT 1
""").collect()[0]["raw_payload"]

print(s[:2000])  # first chunk
print("\n--- contains 'fmr'? ---\n", "fmr" in s.lower())

{"data": {"county_name": "", "counties_msa": "Callahan County, TX; Jones County, TX; and Taylor County, TX", "town_name": "", "metro_status": "1", "metro_name": "Abilene, TX", "area_name": "Abilene, TX MSA", "basicdata": {"Efficiency": 944, "One-Bedroom": 950, "Two-Bedroom": 1207, "Three-Bedroom": 1571, "Four-Bedroom": 1899, "year": "2025"}}}

--- contains 'fmr'? ---
 False


In [0]:
# SILVER TRANSFORM INPUT
# Filter Bronze HUD FMR records to successful API responses with non-null payloads.
# ensures Silver is built only from complete, valid raw inputs.
from pyspark.sql.functions import col, expr

bronze = (
    spark.table("bronze_hud_fmr_raw")
    .where(col("http_status") == 200)
    .where(col("raw_payload").isNotNull())
)

# Extract core identifiers and fields from the raw JSON:
# - metro_id: HUD CBSA-style metro identifier from Bronze
# - program_year: FMR vintage year
# - metro_name: human-readable metro name from the payload
# - bd: JSON fragment containing bedroom-level rent benchmarks (basicdata)
base = bronze.select(
    col("entity_id").alias("metro_id"),
    col("year").cast("int").alias("program_year"),
    expr("trim(get_json_object(raw_payload, '$.data.metro_name'))").alias("metro_name"),
    expr("get_json_object(raw_payload, '$.data.basicdata')").alias("bd")
)

# Pull values from the JSON text using regex (robust + fast enough for demo size)
base = base.select(
    "metro_id",
    "program_year",
    "metro_name",
    expr("regexp_extract(metro_name, ',\\s*([A-Z]{2})\\s*$', 1)").alias("state"),

    expr("cast(regexp_extract(bd, '\"Efficiency\"\\s*:\\s*([0-9]+)', 1) as double)").alias("fmr_eff"),
    expr("cast(regexp_extract(bd, '\"One-Bedroom\"\\s*:\\s*([0-9]+)', 1) as double)").alias("fmr_1br"),
    expr("cast(regexp_extract(bd, '\"Two-Bedroom\"\\s*:\\s*([0-9]+)', 1) as double)").alias("fmr_2br"),
    expr("cast(regexp_extract(bd, '\"Three-Bedroom\"\\s*:\\s*([0-9]+)', 1) as double)").alias("fmr_3br"),
    expr("cast(regexp_extract(bd, '\"Four-Bedroom\"\\s*:\\s*([0-9]+)', 1) as double)").alias("fmr_4br")
)
# Normalize the wide bedroom columns into a long, analytics-friendly format:
# one row per (metro_id, year, bedroom_count) with a single FMR value.
# This format supports aggregation and visualization (e.g., average FMR by bedroom type).
silver = base.selectExpr(
    "metro_id",
    "program_year",
    "metro_name",
    "state",
    """
    stack(
      5,
      0, fmr_eff,
      1, fmr_1br,
      2, fmr_2br,
      3, fmr_3br,
      4, fmr_4br
    ) as (bedrooms, fmr)
    """
)

silver_valid = silver.where(col("fmr").isNotNull() & (col("fmr") > 0))
silver_bad = silver.where(col("fmr").isNull() | (col("fmr") <= 0))

silver_valid.write.format("delta").mode("overwrite").saveAsTable("silver_hud_fmr")
silver_bad.write.format("delta").mode("overwrite").saveAsTable("silver_hud_fmr_quarantine")

In [0]:
# SILVER DATA QUALITY VALIDATION
# Purpose:
# Confirm Silver normalization produced the expected number of rows and bedroom coverage.
#
# Expectations for the current run:
# - METRO_LIMIT = 50 metros
# - 5 bedroom categories per metro (0..4)
# => expected valid rows = 50 * 5 = 250
#
# Also verify quarantine is empty (no null or non-positive FMR values).
# These checks validate Silver integrity before Gold aggregation and downstream joins.

spark.sql("SELECT COUNT(*) AS valid_rows FROM silver_hud_fmr").show()
spark.sql("SELECT COUNT(*) AS bad_rows FROM silver_hud_fmr_quarantine").show()

spark.sql("""
SELECT bedrooms, COUNT(*) AS rows
FROM silver_hud_fmr
GROUP BY bedrooms
ORDER BY bedrooms
""").show()

+----------+
|valid_rows|
+----------+
|       250|
+----------+

+--------+
|bad_rows|
+--------+
|       0|
+--------+

+--------+----+
|bedrooms|rows|
+--------+----+
|       0|  50|
|       1|  50|
|       2|  50|
|       3|  50|
|       4|  50|
+--------+----+



In [0]:
# GOLD LAYER: DIMENSION TABLES
# Purpose:
# Create reusable dimensions for geography, time, and bedroom categories.
# These dimensions standardize labels and support consistent dashboarding and analysis.

# Dimension: Geography (metro-level)
# Provides stable metro identifiers and human-readable labels for reporting.
spark.sql("""
CREATE OR REPLACE TABLE gold_dim_geography AS
SELECT DISTINCT
  metro_id AS geography_id,
  metro_name,
  state
FROM silver_hud_fmr
""")

# Dimension: Time (program year)
# Defines the FMR vintage year used in analysis.
spark.sql("""
CREATE OR REPLACE TABLE gold_dim_time AS
SELECT DISTINCT
  program_year AS year
FROM silver_hud_fmr
""")

# Dimension: Bedroom category
# Maps numeric bedroom counts to readable labels (0 = Efficiency/studio).
spark.sql("""
CREATE OR REPLACE TABLE gold_dim_bedroom AS
SELECT DISTINCT
  bedrooms,
  CASE bedrooms
    WHEN 0 THEN 'Efficiency'
    WHEN 1 THEN '1 Bedroom'
    WHEN 2 THEN '2 Bedroom'
    WHEN 3 THEN '3 Bedroom'
    WHEN 4 THEN '4 Bedroom'
  END AS bedroom_label
FROM silver_hud_fmr
""")

# Gold fact: FMR benchmarks
# Grain: (geography_id, year, bedrooms)
spark.sql("""
CREATE OR REPLACE TABLE gold_fact_fmr AS
SELECT
  metro_id AS geography_id,
  program_year AS year,
  bedrooms,
  fmr
FROM silver_hud_fmr
""")

# Serving table: dashboard-ready FMR dataset
# Denormalizes fact + dimensions to simplify BI tools and notebook visualizations.
spark.sql("""
CREATE OR REPLACE TABLE gold_fmr_dashboard AS
SELECT
  f.year,
  g.state,
  g.metro_name,
  f.bedrooms,
  b.bedroom_label,
  f.fmr
FROM gold_fact_fmr f
JOIN gold_dim_geography g
  ON f.geography_id = g.geography_id
JOIN gold_dim_bedroom b
  ON f.bedrooms = b.bedrooms
""")

# --- Metadata: table + column comments (executed as SQL) ---

spark.sql("""
COMMENT ON TABLE gold_dim_geography IS
'Gold dimension: metro-level geography labels and attributes derived from silver_hud_fmr.'
""")

spark.sql("""
COMMENT ON TABLE gold_dim_time IS
'Gold dimension: program year (FMR vintage) derived from silver_hud_fmr.'
""")

spark.sql("""
COMMENT ON TABLE gold_dim_bedroom IS
'Gold dimension: bedroom category mapping where 0=Efficiency (studio) and 1-4 represent bedroom unit sizes.'
""")

spark.sql("""
COMMENT ON TABLE gold_fact_fmr IS
'Gold fact table: HUD Fair Market Rent (FMR) benchmarks at grain (geography_id, year, bedrooms). Source: silver_hud_fmr.'
""")

spark.sql("""
COMMENT ON TABLE gold_fmr_dashboard IS
'Gold serving table: denormalized rent benchmark dataset for dashboards and exploratory analysis. Joins FMR fact with geography and bedroom labels.'
""")

spark.sql("ALTER TABLE gold_fact_fmr ALTER COLUMN geography_id COMMENT 'Metro identifier (CBSA-style) used as the geography join key.'")
spark.sql("ALTER TABLE gold_fact_fmr ALTER COLUMN year COMMENT 'FMR program year (vintage).'")
spark.sql("ALTER TABLE gold_fact_fmr ALTER COLUMN bedrooms COMMENT 'Bedroom count category: 0=Efficiency (studio), 1-4=bedrooms.'")
spark.sql("ALTER TABLE gold_fact_fmr ALTER COLUMN fmr COMMENT 'HUD Fair Market Rent benchmark (USD/month).'")

spark.sql("ALTER TABLE gold_fmr_dashboard ALTER COLUMN bedroom_label COMMENT 'Readable bedroom category label used in reporting.'")
spark.sql("ALTER TABLE gold_fmr_dashboard ALTER COLUMN fmr COMMENT 'HUD Fair Market Rent benchmark (USD/month) for the given metro/year/bedroom.'")

DataFrame[]

In [0]:
%sql
SELECT *
FROM gold_fmr_dashboard
LIMIT 30;

year,state,metro_name,bedrooms,bedroom_label,fmr
2025,,"Beaumont-Port Arthur, TX",4,4 Bedroom,1676.0
2025,,"Kansas City, MO-KS",4,4 Bedroom,1496.0
2025,,"Indianapolis-Carmel-Anderson, IN",4,4 Bedroom,1397.0
2025,,"Abilene, TX",4,4 Bedroom,1899.0
2025,,"Pittsburgh, PA",4,4 Bedroom,1493.0
2025,,"Houston-The Woodlands-Sugar Land, TX",4,4 Bedroom,1635.0
2025,,"Anchorage, AK",4,4 Bedroom,2625.0
2025,,"Barnstable Town, MA",4,4 Bedroom,3308.0
2025,,"Albany-Schenectady-Troy, NY",4,4 Bedroom,1971.0
2025,,"Anniston-Oxford, AL",4,4 Bedroom,1394.0


In [0]:
spark.sql("SELECT COUNT(*) AS rows FROM gold_fmr_dashboard").show()
spark.sql("SELECT * FROM gold_fmr_dashboard LIMIT 100").show(truncate=False)

+----+
|rows|
+----+
| 250|
+----+

+----+-----+------------------------------------+--------+-------------+------+
|year|state|metro_name                          |bedrooms|bedroom_label|fmr   |
+----+-----+------------------------------------+--------+-------------+------+
|2025|     |Beaumont-Port Arthur, TX            |4       |4 Bedroom    |1676.0|
|2025|     |Kansas City, MO-KS                  |4       |4 Bedroom    |1496.0|
|2025|     |Indianapolis-Carmel-Anderson, IN    |4       |4 Bedroom    |1397.0|
|2025|     |Abilene, TX                         |4       |4 Bedroom    |1899.0|
|2025|     |Pittsburgh, PA                      |4       |4 Bedroom    |1493.0|
|2025|     |Houston-The Woodlands-Sugar Land, TX|4       |4 Bedroom    |1635.0|
|2025|     |Anchorage, AK                       |4       |4 Bedroom    |2625.0|
|2025|     |Barnstable Town, MA                 |4       |4 Bedroom    |3308.0|
|2025|     |Albany-Schenectady-Troy, NY         |4       |4 Bedroom    |1971.0|
|202

In [0]:
import pandas as pd

pdf = spark.table("gold_fmr_dashboard").toPandas()

out_path = "/tmp/gold_fmr_dashboard.csv"
pdf.to_csv(out_path, index=False)

out_path

'/tmp/gold_fmr_dashboard.csv'

In [0]:
displayHTML('<a href="files/gold_fmr_dashboard.csv" target="_blank">Download gold_fmr_dashboard.csv</a>')

In [0]:

%sql
SELECT
  metro_name,
  state,
  fmr
FROM gold_fmr_dashboard
WHERE bedrooms = 2
ORDER BY fmr DESC
LIMIT 10;

metro_name,state,fmr
"Barnstable Town, MA",,2346.0
"New York-Newark-Jersey City, NY-NJ-PA",,2072.0
"Baltimore-Columbia-Towson, MD",,1965.0
"Austin-Round Rock-Georgetown, TX",,1949.0
"Atlanta-Sandy Springs-Alpharetta, GA",,1830.0
"Hilton Head Island-Bluffton, SC",,1822.0
"Atlantic City-Hammonton, NJ",,1719.0
"Pittsfield, MA",,1674.0
"Bend, OR",,1667.0
"Bellingham, WA",,1642.0


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
WITH stats AS (
  SELECT
    bedrooms,
    percentile_approx(fmr, 0.75) AS p75
  FROM gold_fmr_dashboard
  GROUP BY bedrooms
)
SELECT
  d.metro_name,
  d.state,
  d.bedrooms,
  d.fmr
FROM gold_fmr_dashboard d
JOIN stats s
  ON d.bedrooms = s.bedrooms
WHERE d.fmr > s.p75
ORDER BY d.fmr DESC;

metro_name,state,bedrooms,fmr
"Barnstable Town, MA",,4,3308.0
"New York-Newark-Jersey City, NY-NJ-PA",,4,3225.0
"Austin-Round Rock-Georgetown, TX",,4,2882.0
"Barnstable Town, MA",,3,2879.0
"Baltimore-Columbia-Towson, MD",,4,2826.0
"Pittsfield, MA",,4,2811.0
"Hilton Head Island-Bluffton, SC",,4,2803.0
"Bend, OR",,4,2799.0
"Bellingham, WA",,4,2757.0
"Atlantic City-Hammonton, NJ",,4,2654.0


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT
  bedroom_label,
  fmr
FROM gold_fmr_dashboard
WHERE year = 2025
ORDER BY bedroom_label, fmr;

bedroom_label,fmr
1 Bedroom,428.0
1 Bedroom,465.0
1 Bedroom,490.0
1 Bedroom,668.0
1 Bedroom,679.0
1 Bedroom,688.0
1 Bedroom,730.0
1 Bedroom,752.0
1 Bedroom,762.0
1 Bedroom,769.0


Databricks visualization. Run in Databricks to view.