In [0]:
# Source Schema Implementation
# Copy data from raw data store to Source table in Databricks

# Read data from raw data store
df = spark.read.option(
    "inferSchema", "true"
).option(
    "header", "true"
).option(
    "sep", ","
).csv(
    "/Volumes/workspace/fi_dc_schema/rawdatastore/food_inspections.csv"
)

# Write data to Databricks Table
df.write.mode("overwrite").saveAsTable("workspace.fi_dc_schema.source_food_inspections_data")

In [0]:
%sql
DESCRIBE TABLE EXTENDED workspace.fi_dc_schema.source_food_inspections_data

In [0]:
%sql
ALTER TABLE workspace.fi_dc_schema.source_food_inspections_data 
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:
%sql
DESCRIBE TABLE EXTENDED workspace.fi_dc_schema.source_food_inspections_data

In [0]:
%sql
SELECT critical_violation, COUNT(*) 
FROM workspace.fi_dc_schema.source_food_inspections_data
GROUP BY critical_violation
ORDER BY COUNT(*) DESC;

In [0]:
%sql
SELECT urgent_violation, COUNT(*) 
FROM workspace.fi_dc_schema.source_food_inspections_data
GROUP BY urgent_violation
ORDER BY COUNT(*) DESC;

In [0]:
%sql
SELECT * 
FROM workspace.fi_dc_schema.silver_food_inspections
WHERE int(violation_score) >= 90 and int(violation_count) > 3;

In [0]:
%sql
SELECT COUNT(*), city FROM workspace.fi_dc_schema.source_food_inspections_data
GROUP BY city
ORDER BY COUNT(*) DESC;

In [0]:
%sql
SELECT COUNT(*), source_city FROM workspace.fi_dc_schema.source_food_inspections_data
GROUP BY source_city
ORDER BY COUNT(*) DESC;

In [0]:
%sql
-- Check dim_restaurant current state
SELECT 
    restaurant_key,
    restaurant_id,
    dba_name,
    facility_type,
    risk_category,
    effective_date,
    expiration_date,
    is_current,
    row_version
FROM workspace.fi_dc_schema.dim_restaurant
WHERE restaurant_id IN (
    SELECT restaurant_id 
    FROM workspace.fi_dc_schema.dim_restaurant
    GROUP BY restaurant_id
    HAVING COUNT(*) > 1
    LIMIT 5
)
ORDER BY restaurant_id, row_version;

In [0]:
%sql
-- Pick a restaurant to test SCD2
SELECT 
    license_number,
    dba_name,
    facility_type,
    risk_category,
    city,
    COUNT(*) as inspection_count
FROM workspace.fi_dc_schema.silver_food_inspections_transformed
GROUP BY license_number, dba_name, facility_type, risk_category, city
HAVING COUNT(*) > 5  -- Pick one with multiple inspections
LIMIT 1;

In [0]:
%sql
-- Show the versions for one restaurant
SELECT 
    restaurant_key,
    restaurant_id,
    dba_name,
    facility_type,
    risk_category,
    effective_date,
    expiration_date,
    is_current,
    row_version
FROM workspace.fi_dc_schema.dim_restaurant
WHERE restaurant_id = '2205833-chicago'
ORDER BY row_version;

In [0]:
%sql
-- Count unique inspections in Silver
SELECT 
    COUNT(DISTINCT inspection_id) as unique_inspections_silver,
    COUNT(*) as total_violation_records_silver
FROM workspace.fi_dc_schema.silver_food_inspections_transformed;

In [0]:
%sql
-- Count in fact table
SELECT 
    COUNT(*) as fact_table_inspections
FROM workspace.fi_dc_schema.fact_food_inspections;

In [0]:
%sql
DESCRIBE TABLE workspace.fi_dc_schema.fact_food_inspections;

In [0]:
%sql
-- Check fact table
SELECT 
    inspection_key,
    inspection_business_key,
    inspection_id,
    city,
    COUNT(*) OVER() as total_rows
FROM workspace.fi_dc_schema.fact_food_inspections
LIMIT 10;