In [0]:
spark.sql("SELECT COUNT(*) FROM bronze_nyc_evictions").show()

+--------+
|COUNT(*)|
+--------+
|  121046|
+--------+



In [0]:
spark.table("bronze_nyc_evictions").printSchema()

root
 |-- court_index_number: string (nullable = true)
 |-- docket_number: long (nullable = true)
 |-- eviction_address: string (nullable = true)
 |-- eviction_apt_num: string (nullable = true)
 |-- executed_date: timestamp (nullable = true)
 |-- marshal_first_name: string (nullable = true)
 |-- marshal_last_name: string (nullable = true)
 |-- residential_commercial_ind: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- eviction_zip: long (nullable = true)
 |-- ejectment: string (nullable = true)
 |-- eviction_possession: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- community_board: long (nullable = true)
 |-- council_district: long (nullable = true)
 |-- census_tract: long (nullable = true)
 |-- bin: long (nullable = true)
 |-- bbl: long (nullable = true)
 |-- nta: string (nullable = true)



In [0]:
%sql
SELECT
  eviction_zip,
  evictions
FROM gold_evictions_by_zip
ORDER BY evictions DESC
LIMIT 15

eviction_zip,evictions
10467,3089
10458,2902
10453,2704
10456,2698
10457,2516
11226,2442
10468,2339
11212,2303
11207,2252
10452,2177


Databricks visualization. Run in Databricks to view.

In [0]:
#Silver layer to apply standardization, deduplication, and data quality rules so downstream analytics are consistent and reusable
from pyspark.sql.functions import col, to_date, trim

silver_df = (
    spark.table("bronze_nyc_evictions")
    #standardize strings
    .withColumn("court_index_number", trim(col("court_index_number")))
    .withColumn("borough", trim(col("borough")))
    
    #standardize dates
    .withColumn("executed_date", to_date(col("executed_date")))
    
    #basic quality rules
    .filter(col("executed_date").isNotNull())
    .filter(col("borough").isNotNull())
    
    #de-duplication (conservative)
    .dropDuplicates(["court_index_number"])
)

silver_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_nyc_evictions")

silver_df.select(
    "court_index_number",
    "borough",
    "executed_date"
).show(10, truncate=False)

+------------------+-------------+-------------+
|court_index_number|borough      |executed_date|
+------------------+-------------+-------------+
|57274/18-1        |QUEENS       |2018-09-06   |
|B60300/18         |BRONX        |2019-04-11   |
|10047/23          |STATEN ISLAND|2024-06-07   |
|B327948/23        |BRONX        |2025-10-22   |
|K066137/17        |BROOKLYN     |2017-07-14   |
|312237/24         |QUEENS       |2025-01-21   |
|51700/18          |BRONX        |2019-05-06   |
|305830/21         |MANHATTAN    |2023-04-25   |
|30072/19          |BRONX        |2019-11-22   |
|B15943/19         |BRONX        |2019-09-09   |
+------------------+-------------+-------------+
only showing top 10 rows


In [0]:
#After Silver transformations, I run validation checks to confirm record counts, deduplication behavior, and date ranges before promoting data to analytics-ready tables
spark.sql("""
SELECT
  COUNT(*) AS row_count,
  COUNT(DISTINCT court_index_number) AS distinct_cases,
  MIN(executed_date) AS min_date,
  MAX(executed_date) AS max_date
FROM silver_nyc_evictions
""").show()

+---------+--------------+----------+----------+
|row_count|distinct_cases|  min_date|  max_date|
+---------+--------------+----------+----------+
|   111307|        111307|2017-01-03|2025-12-30|
+---------+--------------+----------+----------+



In [0]:
spark.sql("""
CREATE OR REPLACE TABLE gold_evictions_by_zip AS
SELECT
  eviction_zip,
  COUNT(*) AS evictions
FROM silver_nyc_evictions
WHERE eviction_zip IS NOT NULL
GROUP BY eviction_zip
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("""
SELECT *
FROM gold_evictions_by_zip
ORDER BY evictions DESC
LIMIT 20
""").show(truncate=False)

+------------+---------+
|eviction_zip|evictions|
+------------+---------+
|10467       |3089     |
|10458       |2902     |
|10453       |2704     |
|10456       |2698     |
|10457       |2516     |
|11226       |2442     |
|10468       |2339     |
|11212       |2303     |
|11207       |2252     |
|10452       |2177     |
|10460       |2134     |
|11208       |1949     |
|11233       |1821     |
|10462       |1747     |
|10466       |1663     |
|11203       |1663     |
|10459       |1634     |
|10472       |1525     |
|10451       |1502     |
|11213       |1501     |
+------------+---------+



In [0]:
spark.sql("""
CREATE OR REPLACE TABLE gold_evictions_by_borough_month AS
SELECT
  borough,
  date_trunc('month', executed_date) AS month,
  COUNT(*) AS evictions
FROM silver_nyc_evictions
GROUP BY borough, date_trunc('month', executed_date)
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# Data Quality results table
spark.sql("""
CREATE OR REPLACE TABLE gold_dq_nyc_evictions AS
SELECT
  current_timestamp() AS run_ts,

  -- overall volume
  COUNT(*) AS total_rows,

  -- key completeness checks
  SUM(CASE WHEN court_index_number IS NULL THEN 1 ELSE 0 END) AS null_court_index_number,
  SUM(CASE WHEN docket_number IS NULL THEN 1 ELSE 0 END) AS null_docket_number,
  SUM(CASE WHEN eviction_address IS NULL THEN 1 ELSE 0 END) AS null_eviction_address,
  SUM(CASE WHEN executed_date IS NULL THEN 1 ELSE 0 END) AS null_executed_date,
  SUM(CASE WHEN eviction_zip IS NULL THEN 1 ELSE 0 END) AS null_eviction_zip,
  SUM(CASE WHEN borough IS NULL THEN 1 ELSE 0 END) AS null_borough,

  -- date coverage
  MIN(executed_date) AS min_executed_date,
  MAX(executed_date) AS max_executed_date

FROM silver_nyc_evictions
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
#“Silver contains fewer records than Bronze because I enforce minimum analytical viability and deduplicate by business key.”
# 	Records without an executed_date were filtered out
#	Records without a borough were filtered out
#	Duplicates by court_index_number were removed

spark.sql("SELECT * FROM gold_dq_nyc_evictions").show(truncate=False)

+--------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+
|run_ts                    |total_rows|null_court_index_number|null_docket_number|null_eviction_address|null_executed_date|null_eviction_zip|null_borough|min_executed_date|max_executed_date|
+--------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+
|2026-01-03 06:28:42.669972|111307    |0                      |0                 |0                    |0                 |0                |0           |2017-01-03       |2025-12-30       |
+--------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+



In [0]:
# retain data quality history per run for monitoring and audit.
spark.sql("""
INSERT INTO gold_dq_nyc_evictions
SELECT
  current_timestamp() AS run_ts,
  COUNT(*) AS total_rows,
  SUM(CASE WHEN court_index_number IS NULL THEN 1 ELSE 0 END),
  SUM(CASE WHEN docket_number IS NULL THEN 1 ELSE 0 END),
  SUM(CASE WHEN eviction_address IS NULL THEN 1 ELSE 0 END),
  SUM(CASE WHEN executed_date IS NULL THEN 1 ELSE 0 END),
  SUM(CASE WHEN eviction_zip IS NULL THEN 1 ELSE 0 END),
  SUM(CASE WHEN borough IS NULL THEN 1 ELSE 0 END),
  MIN(executed_date),
  MAX(executed_date)
FROM silver_nyc_evictions
""")

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
# View all DQ runs
spark.sql("""
SELECT *
FROM gold_dq_nyc_evictions
ORDER BY run_ts DESC
""").show(truncate=False)

+--------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+
|run_ts                    |total_rows|null_court_index_number|null_docket_number|null_eviction_address|null_executed_date|null_eviction_zip|null_borough|min_executed_date|max_executed_date|
+--------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+
|2026-01-03 06:46:41.14065 |111307    |0                      |0                 |0                    |0                 |0                |0           |2017-01-03       |2025-12-30       |
|2026-01-03 06:28:42.669972|111307    |0                      |0                 |0                    |0                 |0                |0           |2017-01-03       |2025-12-30       |
+--------------------------+----------+------

In [0]:
spark.sql("""
SELECT *
FROM gold_dq_nyc_evictions
ORDER BY run_ts DESC
LIMIT 1
""").show(truncate=False)

+-------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+
|run_ts                   |total_rows|null_court_index_number|null_docket_number|null_eviction_address|null_executed_date|null_eviction_zip|null_borough|min_executed_date|max_executed_date|
+-------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+
|2026-01-03 06:46:41.14065|111307    |0                      |0                 |0                    |0                 |0                |0           |2017-01-03       |2025-12-30       |
+-------------------------+----------+-----------------------+------------------+---------------------+------------------+-----------------+------------+-----------------+-----------------+

