## Plan
1. Normalize the 100-Patient Database
   1. Instead of using Python, we will create a staging database and load from that database.
   2. Mermaid JS -- how to display the database schema
2. indices
3. Streamlit (frontend) + ChatGPT
4. Tuesday -- Docker
5. Add more data validations (meaning, check that gender values are correct. Use initial load to get allowed values)
 


In [None]:
PatientID	AdmissionID	AdmissionStartDate	AdmissionEndDate
PatientID	AdmissionID	PrimaryDiagnosisCode	PrimaryDiagnosisDescription
PatientID	AdmissionID	LabName	LabValue	LabUnits	LabDateTime
PatientID	PatientGender	PatientDateOfBirth	PatientRace	PatientMaritalStatus	PatientLanguage	PatientPopulationPercentageBelowPoverty

In [None]:
PatientID	
PatientGender	
PatientDateOfBirth	
PatientRace	
PatientMaritalStatus	
PatientLanguage	
PatientPopulationPercentageBelowPoverty

- PatienID
- Gender
- Race
- Marital Status
- Language

In [None]:
Gender
- Male
- MALE
- MalE
- male
- mael
- Male--  
- M
- m

Race
- White
- W
- WiTE
- WHITE
---
GenderTable
- Male
- Female

-- 

In [None]:
PatientID	
AdmissionID	
PrimaryDiagnosisCode	
PrimaryDiagnosisDescription

# PrimaryDiagnosisCode	
PrimaryDiagnosisDescription

In [62]:
STAGING_CREATE_SQL = """
PRAGMA foreign_keys = ON;

CREATE TABLE IF NOT EXISTS stage_patients (
    PatientID                              TEXT, 
    PatientGender                          TEXT, 
    PatientDateOfBirth	                   TEXT, 
    PatientRace	                       TEXT, 
    PatientMaritalStatus	          TEXT, 
    PatientLanguage	                        TEXT, 
    PatientPopulationPercentageBelowPoverty TEXT
);
CREATE TABLE IF NOT EXISTS stage_admissions (
    PatientID                              TEXT, 
    AdmissionID                          TEXT, 
    AdmissionStartDate	                   TEXT, 
    AdmissionEndDate	                       TEXT
);

CREATE TABLE IF NOT EXISTS stage_diagnoses (
    PatientID                              TEXT, 
    AdmissionID                          TEXT, 
    PrimaryDiagnosisCode	                   TEXT, 
    PrimaryDiagnosisDescription	                       TEXT
);

CREATE TABLE IF NOT EXISTS stage_labs (
    PatientID                              TEXT, 
    AdmissionID                          TEXT, 
    LabName	                   TEXT, 
    LabValue	                       TEXT,
    LabUnits TEXT,
    LabDateTime TEXT
);
-- Lookup tables
CREATE TABLE IF NOT EXISTS genders (
    gender_id   INTEGER PRIMARY KEY,
    gender_desc TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS races (
    race_id     INTEGER PRIMARY KEY,
    race_desc   TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS marital_statuses (
    marital_status_id   INTEGER PRIMARY KEY,
    marital_status_desc TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS languages (
    language_id INTEGER PRIMARY KEY,
    language_desc TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS lab_units (
    unit_id     INTEGER PRIMARY KEY,
    unit_string TEXT NOT NULL UNIQUE
);
CREATE TABLE IF NOT EXISTS lab_tests (
    lab_test_id INTEGER PRIMARY KEY,
    lab_name    TEXT NOT NULL UNIQUE,
    unit_id     INTEGER NOT NULL,
    FOREIGN KEY (unit_id) REFERENCES lab_units(unit_id)
);
CREATE TABLE IF NOT EXISTS diagnosis_codes (
    diagnosis_code        TEXT PRIMARY KEY,
    diagnosis_description TEXT NOT NULL
);

-- Core tables
CREATE TABLE IF NOT EXISTS patients (
    patient_id     TEXT PRIMARY KEY,
    patient_gender INTEGER,
    patient_dob    TEXT NOT NULL,
    patient_race   INTEGER,
    patient_marital_status INTEGER,
    patient_language INTEGER,
    patient_population_pct_below_poverty REAL,
    FOREIGN KEY (patient_gender) REFERENCES genders(gender_id),
    FOREIGN KEY (patient_race) REFERENCES races(race_id),
    FOREIGN KEY (patient_marital_status) REFERENCES marital_statuses(marital_status_id),
    FOREIGN KEY (patient_language) REFERENCES languages(language_id)
);

CREATE TABLE IF NOT EXISTS admissions (
    patient_id      TEXT NOT NULL,
    admission_id    INTEGER NOT NULL,
    admission_start TEXT NOT NULL,
    admission_end   TEXT,
    PRIMARY KEY (patient_id, admission_id),
    FOREIGN KEY (patient_id) REFERENCES patients(patient_id)
);

CREATE TABLE IF NOT EXISTS admission_primary_diagnoses (
    patient_id     TEXT NOT NULL,
    admission_id   INTEGER NOT NULL,
    diagnosis_code TEXT NOT NULL,
    PRIMARY KEY (patient_id, admission_id),
    FOREIGN KEY (patient_id, admission_id) REFERENCES admissions(patient_id, admission_id),
    FOREIGN KEY (diagnosis_code) REFERENCES diagnosis_codes(diagnosis_code)
);

CREATE TABLE IF NOT EXISTS admission_lab_results (
    patient_id    TEXT NOT NULL,
    admission_id  INTEGER NOT NULL,
    lab_test_id   INTEGER NOT NULL,
    lab_value     REAL,
    lab_datetime  TEXT NOT NULL,
    FOREIGN KEY (patient_id, admission_id) REFERENCES admissions(patient_id, admission_id),
    FOREIGN KEY (lab_test_id) REFERENCES lab_tests(lab_test_id),
    UNIQUE (patient_id, admission_id, lab_test_id, lab_datetime)
);


"""

In [63]:
import sqlite3
import csv
from pathlib import Path
import os
import time

DB_PATH = "patient.db"

if os.path.exists(DB_PATH):
    os.remove(DB_PATH)
    
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.executescript(STAGING_CREATE_SQL)
conn.close()

In [64]:
FILES = {
    "patients": {
        "filename": "PatientCorePopulatedTable.txt",
     },
    "admissions": {
        "filename": "AdmissionsCorePopulatedTable.txt",
     },
    "diagnoses": {
        "filename": "AdmissionsDiagnosesCorePopulatedTable.txt",
     },
    "labs": {
        "filename": "LabsCorePopulatedTable.txt",
        "batch_size": 100_000,
     }
}

EXPECTED_COLUMNS = {
    "patients": [
        "PatientID",
        "PatientGender" ,
        "PatientDateOfBirth",
        "PatientRace" ,
        "PatientMaritalStatus",
        "PatientLanguage"	  ,
        "PatientPopulationPercentageBelowPoverty",
    ],
    "admissions": [
        "PatientID",
        "AdmissionID",
        "AdmissionStartDate",
        "AdmissionEndDate",
    ],
    "diagnoses": [
        "PatientID", 
        "AdmissionID", 
        "PrimaryDiagnosisCode", 
        "PrimaryDiagnosisDescription",               
    ],
    "labs": [
        "PatientID",
        "AdmissionID",
        "LabName",
        "LabValue",
        "LabUnits",
        "LabDateTime",
    ]
}


def load_tsv_to_stage(conn, filepath, stage_table, expected_columns, batch_size=5_000):
    path = Path(filepath)
    if not path.exists():
        raise FileNotFoundError("Missing file: {filepath}")

    with path.open("r", encoding="utf-8-sig") as csvfile:
        csv_reader = csv.DictReader(csvfile, delimiter='\t')
        # validate columns
        missing =  sorted(set(expected_columns) - set(csv_reader.fieldnames))
        if missing:
            raise ValueError(f"{filepath} missing expected columns: {missing}")

        placeholders = ", ".join(["?"] * len(expected_columns))
        sql = f"INSERT INTO {stage_table} ({', '.join(expected_columns)}) VALUES ({placeholders})"
        rows = []
        row_count = 0 
        total_count = 0
        cursor = conn.cursor()
        with conn:
            cursor.execute(f"DELETE FROM {stage_table}") # delete all rows in table, but not the table itself
            print(f"Cleaned up rows from {stage_table}")
        log_template = "Inserted another batch of {:,} rows; total: {:,}"
        for row in csv_reader:
            rows.append([row.get(c, None) for c in expected_columns])
            row_count += 1

            if row_count == batch_size:
                with conn:
                    cursor.executemany(sql, rows)
                total_count += len(rows)
                row_count = 0 
                rows = []  
                print(log_template.format(batch_size, total_count))

        if rows:
            with conn:
                cursor.executemany(sql, rows)
                total_count += len(rows)  
                print(log_template.format(batch_size, total_count))

        cursor.close()
        print(f"Finished loading data into {stage_table}")
        

start_time = time.monotonic()
conn = sqlite3.connect(DB_PATH)
for name in FILES:
    load_tsv_to_stage(conn, FILES[name]["filename"], f"stage_{name}", EXPECTED_COLUMNS[name], FILES[name].get("batch_size", 5_000))

conn.close()
end_time = time.monotonic()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")


Cleaned up rows from stage_patients
Inserted another batch of 5,000 rows; total: 100
Finished loading data into stage_patients
Cleaned up rows from stage_admissions
Inserted another batch of 5,000 rows; total: 372
Finished loading data into stage_admissions
Cleaned up rows from stage_diagnoses
Inserted another batch of 5,000 rows; total: 372
Finished loading data into stage_diagnoses
Cleaned up rows from stage_labs
Inserted another batch of 100,000 rows; total: 100,000
Inserted another batch of 100,000 rows; total: 111,483
Finished loading data into stage_labs
Elapsed time: 3.8154355459992075 seconds


In [69]:
def build_dimensions(conn):
    # contain lookup values or reference 
    cur = conn.cursor()
    with conn:
        # Genders
        cur.execute("""
            INSERT OR IGNORE INTO genders(gender_desc)
            SELECT DISTINCT PatientGender FROM stage_patients WHERE PatientGender IS NOT NULL AND PatientGender <> '';
        """)
        # Races
        cur.execute("""
            INSERT OR IGNORE INTO races(race_desc)
            SELECT DISTINCT PatientRace FROM stage_patients WHERE PatientRace IS NOT NULL AND PatientRace <> '';
        """)
        # Marital statuses
        cur.execute("""
            INSERT OR IGNORE INTO marital_statuses(marital_status_desc)
            SELECT DISTINCT PatientMaritalStatus FROM stage_patients WHERE PatientMaritalStatus IS NOT NULL AND PatientMaritalStatus <> '';
        """)
        # Languages
        cur.execute("""
            INSERT OR IGNORE INTO languages(language_desc)
            SELECT DISTINCT PatientLanguage FROM stage_patients WHERE PatientLanguage IS NOT NULL AND PatientLanguage <> '';
        """)
        # Lab units
        cur.execute("""
            INSERT OR IGNORE INTO lab_units(unit_string)
            SELECT DISTINCT LabUnits FROM stage_labs WHERE LabUnits IS NOT NULL AND LabUnits <> '';
        """)
        # Lab tests (LabName -> Unit)
        cur.execute("""
            INSERT OR IGNORE INTO lab_tests(lab_name, unit_id)
            SELECT DISTINCT s.LabName, u.unit_id
            FROM stage_labs s
            JOIN lab_units u ON u.unit_string = s.LabUnits
            WHERE s.LabName IS NOT NULL AND s.LabName <> '';
        """)
        # Diagnosis codes
        cur.execute("""
            INSERT OR IGNORE INTO diagnosis_codes(diagnosis_code, diagnosis_description)
            SELECT DISTINCT PrimaryDiagnosisCode, PrimaryDiagnosisDescription
            FROM stage_diagnoses
            WHERE PrimaryDiagnosisCode IS NOT NULL AND PrimaryDiagnosisCode <> '';
        """)
conn = sqlite3.connect(DB_PATH)
build_dimensions(conn)
conn.close()

In [71]:
def load_entities(conn):
    # Entity table
    # Core enties -- patients and admissions
    cur = conn.cursor()
    with conn:
        # Patients
        cur.execute("""
            INSERT OR IGNORE INTO patients (
                patient_id, patient_gender, patient_dob, patient_race,
                patient_marital_status, patient_language, patient_population_pct_below_poverty
            )
            SELECT
                s.PatientID,
                g.gender_id,
                s.PatientDateOfBirth,
                r.race_id,
                m.marital_status_id,
                l.language_id,
                NULLIF(s.PatientPopulationPercentageBelowPoverty, '')  -- keep null if blank
            FROM stage_patients s
            LEFT JOIN genders g ON g.gender_desc = s.PatientGender
            LEFT JOIN races r ON r.race_desc = s.PatientRace
            LEFT JOIN marital_statuses m ON m.marital_status_desc = s.PatientMaritalStatus
            LEFT JOIN languages l ON l.language_desc = s.PatientLanguage;
        """)
        # Admissions
        cur.execute("""
            INSERT OR IGNORE INTO admissions (patient_id, admission_id, admission_start, admission_end)
            SELECT
                s.PatientID,
                CAST(s.AdmissionID AS INTEGER),
                s.AdmissionStartDate,
                s.AdmissionEndDate
            FROM stage_admissions s;
        """)

conn = sqlite3.connect(DB_PATH)
load_entities(conn)
conn.close()

In [74]:
# fact tables
## contains metrics/measures -- measurable events
## admission_lab_results
## addmision_primary_diagnoses

def build_facts(conn):
    cur = conn.cursor()
    with conn:
        cur.execute("""
            INSERT OR IGNORE INTO admission_primary_diagnoses (patient_id, admission_id, diagnosis_code)
            SELECT
                s.PatientID,
                CAST(s.AdmissionID AS INTEGER),
                s.PrimaryDiagnosisCode
            FROM stage_diagnoses s
            JOIN diagnosis_codes d ON d.diagnosis_code = s.PrimaryDiagnosisCode
        """)
        # Lab results
        cur.execute("""
            INSERT OR IGNORE INTO admission_lab_results (
                patient_id, admission_id, lab_test_id, lab_value, lab_datetime
            )
            SELECT
                s.PatientID,
                CAST(s.AdmissionID AS INTEGER),
                lt.lab_test_id,
                NULLIF(s.LabValue, ''),
                s.LabDateTime
            FROM stage_labs s
            JOIN lab_tests lt ON lt.lab_name = s.LabName
        """)

conn = sqlite3.connect(DB_PATH)
build_facts(conn)
conn.close()