In [1]:
from sqlalchemy import create_engine, text
engine = create_engine("postgresql+psycopg2://dquser:dqpass@localhost:5432/healthcare")
print("DB engine ready.")


DB engine ready.


In [2]:
## DDL

ddl = """
CREATE SCHEMA IF NOT EXISTS warehouse;

CREATE TABLE IF NOT EXISTS warehouse.dim_patient (
    patient_key       BIGSERIAL PRIMARY KEY,
    patient_id        INT UNIQUE,
    gender            TEXT,
    insurance_type    TEXT,
    smoking_status    TEXT,
    income_band       TEXT
);

CREATE TABLE IF NOT EXISTS warehouse.dim_hospital (
    hospital_key   BIGSERIAL PRIMARY KEY,
    hospital_id    INT UNIQUE,
    region         TEXT
);

CREATE TABLE IF NOT EXISTS warehouse.dim_condition_treatment (
    ct_key            BIGSERIAL PRIMARY KEY,
    medical_condition TEXT,
    treatment         TEXT,
    UNIQUE (medical_condition, treatment)
);

CREATE TABLE IF NOT EXISTS warehouse.dim_ageband (
    ageband_key  BIGSERIAL PRIMARY KEY,
    age_band     TEXT UNIQUE
);

CREATE TABLE IF NOT EXISTS warehouse.fact_encounter (
    encounter_key   BIGSERIAL PRIMARY KEY,
    patient_key     BIGINT REFERENCES warehouse.dim_patient(patient_key),
    hospital_key    BIGINT REFERENCES warehouse.dim_hospital(hospital_key),
    ct_key          BIGINT REFERENCES warehouse.dim_condition_treatment(ct_key),
    ageband_key     BIGINT REFERENCES warehouse.dim_ageband(ageband_key),
    length_of_stay  INT,
    outcome         TEXT
);
"""
with engine.begin() as conn:
    conn.execute(text(ddl))
print("Warehouse tables created (or already present).")

Warehouse tables created (or already present).


In [4]:
## Seed Dimensions from staging.patients_staging

seed_dims_lower = """
INSERT INTO warehouse.dim_patient (patient_id, gender, insurance_type, smoking_status, income_band)
SELECT DISTINCT
    s.patient_id,
    s.gender,
    s.insurance_type,
    s.smoking_status,
    CASE 
        WHEN s.income < 20000 THEN '<20k'
        WHEN s.income < 40000 THEN '20–40k'
        WHEN s.income < 60000 THEN '40–60k'
        WHEN s.income < 80000 THEN '60–80k'
        WHEN s.income < 100000 THEN '80–100k'
        ELSE '100k+'
    END AS income_band
FROM staging.patients_staging s
ON CONFLICT (patient_id) DO NOTHING;

INSERT INTO warehouse.dim_hospital (hospital_id, region)
SELECT DISTINCT s.hospital_id, s.region
FROM staging.patients_staging s
ON CONFLICT (hospital_id) DO NOTHING;

INSERT INTO warehouse.dim_condition_treatment (medical_condition, treatment)
SELECT DISTINCT s.medical_condition, s.treatment
FROM staging.patients_staging s
ON CONFLICT (medical_condition, treatment) DO NOTHING;

WITH bands AS (
  SELECT '0–17' AS band UNION ALL
  SELECT '18–34' UNION ALL
  SELECT '35–49' UNION ALL
  SELECT '50–64' UNION ALL
  SELECT '65–79' UNION ALL
  SELECT '80+'
)
INSERT INTO warehouse.dim_ageband (age_band)
SELECT band FROM bands
ON CONFLICT (age_band) DO NOTHING;
"""

with engine.begin() as conn:
    conn.execute(text(seed_dims_lower))

print("Dimensions seeded.")

Dimensions seeded.


In [7]:
## Fact Table

load_fact = """
INSERT INTO warehouse.fact_encounter
    (patient_key, hospital_key, ct_key, ageband_key, length_of_stay, outcome)
SELECT 
    dp.patient_key,
    dh.hospital_key,
    dct.ct_key,
    da.ageband_key,
    s."length_of_stay",
    s."outcome"
FROM staging.patients_staging s
JOIN warehouse.dim_patient dp
  ON dp.patient_id = s."patient_id"
JOIN warehouse.dim_hospital dh
  ON dh.hospital_id = s."hospital_id"
JOIN warehouse.dim_condition_treatment dct
  ON dct.medical_condition = s."medical_condition"
 AND dct.treatment         = s."treatment"
JOIN warehouse.dim_ageband da
  ON da.age_band = CASE 
                     WHEN s."age" < 18 THEN '0–17'
                     WHEN s."age" < 35 THEN '18–34'
                     WHEN s."age" < 50 THEN '35–49'
                     WHEN s."age" < 65 THEN '50–64'
                     WHEN s."age" < 80 THEN '65–79'
                     ELSE '80+'
                   END;
"""
with engine.begin() as conn:
    conn.execute(text(load_fact))
print("Fact loaded.")

Fact loaded.


In [10]:
## QA checks

from sqlalchemy import text

# rows and dimension
with engine.connect() as conn:
    rows_staging = conn.execute(text("select count(*) from staging.patients_staging")).scalar()
    rows_fact    = conn.execute(text("select count(*) from warehouse.fact_encounter")).scalar()
    dims = conn.execute(text("""
        select
          (select count(*) from warehouse.dim_patient)              as dim_patient,
          (select count(*) from warehouse.dim_hospital)             as dim_hospital,
          (select count(*) from warehouse.dim_condition_treatment)  as dim_ct,
          (select count(*) from warehouse.dim_ageband)              as dim_ageband
    """)).mappings().one()

print("Rows in staging:", rows_staging)
print("Rows in warehouse.fact_encounter:", rows_fact)
print("Dim counts:", dict(dims))

# foreign key
fk_checks_sql = """
select
  sum(case when dp.patient_key is null then 1 else 0 end) as missing_patient,
  sum(case when dh.hospital_key is null then 1 else 0 end) as missing_hospital,
  sum(case when dct.ct_key      is null then 1 else 0 end) as missing_ct,
  sum(case when dab.ageband_key is null then 1 else 0 end) as missing_ageband
from staging.patients_staging s
left join warehouse.dim_patient dp
  on dp.patient_id = s.patient_id
left join warehouse.dim_hospital dh
  on dh.hospital_id = s.hospital_id
left join warehouse.dim_condition_treatment dct
  on dct.medical_condition = s.medical_condition
 and dct.treatment = s.treatment
left join warehouse.dim_ageband dab
  on dab.age_band = case
        when s.age < 18 then '0–17'
        when s.age < 35 then '18–34'
        when s.age < 50 then '35–49'
        when s.age < 65 then '50–64'
        when s.age < 80 then '65–79'
        else '80+'
     end;
"""
with engine.connect() as conn:
    fk_missing = conn.execute(text(fk_checks_sql)).mappings().one()

print("FK coverage (0 is good):", dict(fk_missing))

# integrity
index_sql = """
create index if not exists ix_dim_patient_patient_id on warehouse.dim_patient (patient_id);
create index if not exists ix_dim_hospital_hospital_id on warehouse.dim_hospital (hospital_id);
create index if not exists ix_dim_ct_condition_treatment on warehouse.dim_condition_treatment (medical_condition, treatment);

create index if not exists ix_fact_patient_key   on warehouse.fact_encounter (patient_key);
create index if not exists ix_fact_hospital_key  on warehouse.fact_encounter (hospital_key);
create index if not exists ix_fact_ct_key        on warehouse.fact_encounter (ct_key);
create index if not exists ix_fact_ageband_key   on warehouse.fact_encounter (ageband_key);
"""
with engine.begin() as conn:
    conn.execute(text(index_sql))

print("Indexes ensured.")


Rows in staging: 1000
Rows in warehouse.fact_encounter: 1000
Dim counts: {'dim_patient': 1000, 'dim_hospital': 993, 'dim_ct': 195, 'dim_ageband': 6}
FK coverage (0 is good): {'missing_patient': 0, 'missing_hospital': 0, 'missing_ct': 0, 'missing_ageband': 0}
Indexes ensured.
