In [None]:
import os
import psycopg2
from pathlib import Path
import sys
import csv
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv


# set up
cwd = Path(os.getcwd())
project_root = cwd.parents[0]
sys.path.append(str(project_root))

from src.utils.db_loader import load_env_vars
env = load_env_vars()



# Build Postgres connection string
POSTGRES_USERNAME = env["POSTGRES_USERNAME"]
POSTGRES_PASSWORD = env["POSTGRES_PASSWORD"]
POSTGRES_SERVER   = env["POSTGRES_SERVER"]
POSTGRES_DATABASE = env["POSTGRES_DATABASE"]
POSTGRES_PORT     = env.get("POSTGRES_PORT", "5432")


db_url = (
    f"postgresql://{POSTGRES_USERNAME}:{POSTGRES_PASSWORD}"
    f"@{POSTGRES_SERVER}:{POSTGRES_PORT}/{POSTGRES_DATABASE}"
)
engine = create_engine(db_url, connect_args={"sslmode": "require"})



# -----------------------------
# Define data path from .env
# -----------------------------
data_path = Path(env["DATA_PATH"])


# Dictionary of files to load
files_to_load = {
    "beneficiary_2020": "beneficiary_2020.csv",
    "beneficiary_2021": "beneficiary_2021.csv",
    "beneficiary_2022": "beneficiary_2022.csv",
    "beneficiary_2023": "beneficiary_2023.csv",
    "inpatient": "inpatient.csv",
    "outpatient": "outpatient.csv",
    "carrier": "carrier.csv"
}


# -----------------------------
# Create schema
# -----------------------------

with engine.connect() as conn:
    # Drop old tables
    conn.execute(text("DROP TABLE IF EXISTS \"Revenue\" CASCADE;"))
    conn.execute(text("DROP TABLE IF EXISTS \"Provider\" CASCADE;"))
    conn.execute(text("DROP TABLE IF EXISTS \"ProcedureCode\" CASCADE;"))
    conn.execute(text("DROP TABLE IF EXISTS \"Diagnosis\" CASCADE;"))
    conn.execute(text("DROP TABLE IF EXISTS \"Claims\" CASCADE;"))
    conn.execute(text("DROP TABLE IF EXISTS \"Beneficiary\" CASCADE;"))


    # Beneficiary table
    conn.execute(text("""
    CREATE TABLE "Beneficiary" (
        "BeneficiaryID" SERIAL PRIMARY KEY,
        "BENE_ID" VARCHAR(15) NOT NULL,
        "AGE_AT_END_REF_YR" INT,
        "BENE_RACE_CD" CHAR(1),
        "SEX_IDENT_CD" CHAR(1),
        "STATE_CODE" CHAR(2),
        "YEAR" INT NOT NULL,
        UNIQUE ("BENE_ID","YEAR")
    );
    """))

    # Claims table
    conn.execute(text("""
    CREATE TABLE "Claims" (
        "BENE_ID" TEXT NOT NULL,             
        "ClaimID" SERIAL PRIMARY KEY,
        "CLM_ID" TEXT UNIQUE NOT NULL,
        "CLM_LINE_NUM" TEXT,  
        "CLM_TYPE" TEXT,                 
        "YEAR" INT NOT NULL,
        "CLM_FROM_DT" DATE,                  
        "CLM_THRU_DT" DATE,
        "ORG_NPI_NUM" TEXT,   
        "HCPCS_CD" TEXT,    
        "ICD_PRCDR_CD1" TEXT,                               
        "PRNCPAL_DGNS_CD" TEXT,
        "REV_CNTR" TEXT,   
        "CLM_PMT_AMT" NUMERIC(12,2),
        FOREIGN KEY ("BENE_ID","YEAR") REFERENCES "Beneficiary"("BENE_ID","YEAR")
    );
    """))         

    # Diagnosis table
    conn.execute(text("""
    CREATE TABLE "Diagnosis" (
        "DiagnosisID" SERIAL PRIMARY KEY,
        "CLM_ID" TEXT REFERENCES "Claims"("CLM_ID"),
        "ICD_DGNS_CD" TEXT
    );
    """))


    # Procedure table
    conn.execute(text("""
    CREATE TABLE "ProcedureCode" (
        "ProcedureCodeID" SERIAL PRIMARY KEY,
        "CLM_ID" TEXT REFERENCES "Claims"("CLM_ID"),
        "CODE" TEXT,
        "CODE_TYPE" TEXT   -- 'ICD' or 'HCPCS'
    );
    """))

    # Revenue table
    conn.execute(text("""
    CREATE TABLE "Revenue" (
        "RevenueID" SERIAL PRIMARY KEY,
        "CLM_ID" TEXT REFERENCES "Claims"("CLM_ID"),
        "REV_CNTR" TEXT
    );
    """))

    # Provider table
    conn.execute(text("""
    CREATE TABLE "Provider" (
        "ProviderID" SERIAL PRIMARY KEY,
        "ORG_NPI_NUM" TEXT,
        "CLM_ID" TEXT REFERENCES "Claims"("CLM_ID")
    );
    """))

    conn.commit()
    print("âœ… Normalized tables created with `Claims` as the central fact table")

# -----------------------------
# Insert Beneficiary data (2020â€“2023)
# -----------------------------
years_to_load = [2020, 2021, 2022, 2023]

for yr in years_to_load:
    bene_file = data_path / files_to_load[f"beneficiary_{yr}"]
    print(f"ðŸ“¥ Loading {bene_file} ...")
    
    df = pd.read_csv(bene_file, delimiter="|")
    df["YEAR"] = yr
    
    keep_cols = [
        "BENE_ID",
        "AGE_AT_END_REF_YR",
        "BENE_RACE_CD",
        "SEX_IDENT_CD",
        "STATE_CODE",
        "YEAR"
    ]
    df = df[keep_cols]
    
    df.to_sql(
        "Beneficiary",
        engine,
        if_exists="append",
        index=False,
        method="multi",
        chunksize=5000
    )
    print(f"âœ… Beneficiary {yr} data loaded ({len(df)} rows)")

# -----------------------------
# Validation: distinct counts by year
# -----------------------------
query = """
SELECT "YEAR", COUNT(DISTINCT "BENE_ID") AS distinct_beneficiaries,
       COUNT(*) AS total_rows
FROM "Beneficiary"
GROUP BY "YEAR"
ORDER BY "YEAR";
"""

df_counts = pd.read_sql(query, engine)
print("ðŸ“Š Distinct Beneficiary Counts by Year:")
print(df_counts)

# -----------------------------
# Loader for Claims + Normalized Tables
# -----------------------------
def load_claims(file_path, claim_type):
    df = pd.read_csv(file_path, delimiter="|")
    
    # Rename LINE_NUM â†’ CLM_LINE_NUM for Carrier
    if claim_type == "Carrier" and "LINE_NUM" in df.columns:
        df.rename(columns={"LINE_NUM": "CLM_LINE_NUM"}, inplace=True)

    # Dates and year
    df["CLM_THRU_DT"] = pd.to_datetime(df.get("CLM_THRU_DT"), errors="coerce")
    df["CLM_FROM_DT"] = pd.to_datetime(df.get("CLM_FROM_DT"), errors="coerce")
    df["YEAR"] = df["CLM_THRU_DT"].dt.year

    # -----------------------------
    # Filter claims to match Beneficiary years (2020â€“2023)
    # -----------------------------
    df = df[df["YEAR"].between(2020, 2023)]

    # -----------------------------
    # Claims header
    # -----------------------------
    claims_cols = [
         "BENE_ID", "CLM_ID", "CLM_LINE_NUM", "YEAR", "CLM_FROM_DT", "CLM_THRU_DT", "ORG_NPI_NUM", 
         "HCPCS_CD", "ICD_PRCDR_CD1",  "PRNCPAL_DGNS_CD", "REV_CNTR","CLM_PMT_AMT"
    ]
    available_cols = [c for c in claims_cols if c in df.columns]
    df_claims = df[available_cols].copy()
    df_claims["CLM_TYPE"] = claim_type
    df_claims = df_claims.drop_duplicates(subset=["CLM_ID"])

    df_claims.to_sql("Claims", engine, if_exists="append", index=False, method="multi", chunksize=5000)



    # -----------------------------
    # Diagnosis (use only primary diagnosis code)
    # -----------------------------
    if "PRNCPAL_DGNS_CD" in df.columns:
        df_diag = df[["CLM_ID", "PRNCPAL_DGNS_CD"]].dropna()
        df_diag.rename(columns={"PRNCPAL_DGNS_CD": "ICD_DGNS_CD"}, inplace=True)
        df_diag.to_sql(
            "Diagnosis",  engine,  if_exists="append",   index=False,  method="multi", chunksize=5000 )

    # -----------------------------
    # Procedure + HCPCS (Unified Table)
    # -----------------------------

    # procedure codes
    if "ICD_PRCDR_CD1" in df.columns:
        df_icd = df[["CLM_ID", "ICD_PRCDR_CD1"]].dropna()
        df_icd.rename(columns={"ICD_PRCDR_CD1": "CODE"}, inplace=True)
        df_icd["CODE_TYPE"] = "ICD"

        df_icd.to_sql(
            "ProcedureCode", engine,
            if_exists="append", index=False, method="multi", chunksize=5000
        )

    # HCPCS codes (from Carrier + Outpatient)
    if "HCPCS_CD" in df.columns:
        df_hcpcs = df[["CLM_ID", "HCPCS_CD"]].dropna()
        df_hcpcs.rename(columns={"HCPCS_CD": "CODE"}, inplace=True)
        df_hcpcs["CODE_TYPE"] = "HCPCS"

        df_hcpcs.to_sql(
            "ProcedureCode", engine,
            if_exists="append", index=False, method="multi", chunksize=5000  )


    # -----------------------------
    # Revenue (REV_CNTR only)
    # -----------------------------
    if "REV_CNTR" in df.columns:
        df_rev = df[["CLM_ID", "REV_CNTR"]].dropna()
        df_rev.to_sql(
            "Revenue", engine,
            if_exists="append", index=False, method="multi", chunksize=5000    )

    # -----------------------------
    # Provider
    # -----------------------------
    if "ORG_NPI_NUM" in df.columns:
        df_provider = df[["CLM_ID","ORG_NPI_NUM"]].dropna()
        df_provider.to_sql("Provider", engine, if_exists="append", index=False, method="multi", chunksize=5000)

    print(f"âœ… {claim_type} claims loaded ({len(df)} rows)")

# -----------------------------
# Load Inpatient, Outpatient, Carrier
# -----------------------------
load_claims(data_path / files_to_load["inpatient"], "Inpatient")
load_claims(data_path / files_to_load["outpatient"], "Outpatient")
load_claims(data_path / files_to_load["carrier"], "Carrier")



In [22]:
# -----------------------------
# Load NYU Mapping File
# -----------------------------

# Path to Excel file
ed_file = data_path / "NYU_ED_Algorithm_ICD10.xlsx"

# Load the first sheet (or specify sheet name if needed)
df_ed = pd.read_excel(ed_file, sheet_name=0)

df_ed.columns = [
    "ICD10",
    "ICD10_Description",
    "Non_Emergent",
    "Emergent_PC_Treatable",
    "ED_Care_Needed_Preventable_Avoidable",
    "ED_Care_Needed_Not_Preventable",
    "Alcohol",
    "Drug",
    "Injury",
    "Psych",
    "Unclassified"
]

df_ed.to_sql(
    "ED_Algorithm_ICD10",
    engine,
    if_exists="append",
    index=False,
    method="multi",
    chunksize=5000
)

print("âœ… NYU ED Algorithm ICD-10 codes loaded from Excel")

pd.read_sql('SELECT COUNT(*) FROM "ED_Algorithm_ICD10";', engine)

âœ… NYU ED Algorithm ICD-10 codes loaded from Excel


Unnamed: 0,count
0,75242


In [28]:
# -----------------------------
# Load ICD-10 Chronic Indicator file (correct CSV parsing)
# -----------------------------
chronic_file = data_path / "icd10diag_chronic_indicator.csv"
print(f"ðŸ“¥ Loading {chronic_file} ...")

# Read CSV, skip first 2 rows, comma-delimited
df_chronic = pd.read_csv(
    chronic_file,
    skiprows=2,
    delimiter=",",
    dtype=str,
    engine="python"
)

# Clean column names: remove single quotes and whitespace
df_chronic.columns = [col.replace("'", "").strip() for col in df_chronic.columns]

print("âœ… Cleaned column names:", df_chronic.columns.tolist())

# Clean values: remove single quotes and whitespace
df_chronic = df_chronic.applymap(lambda x: x.replace("'", "").strip() if isinstance(x, str) else x)

# Rename columns
df_chronic.rename(columns={
    "ICD-10-CM CODE": "ICD10",
    "ICD-10-CM CODE DESCRIPTION": "DESCRIPTION",
    "CHRONIC INDICATOR": "CHRONIC_FLAG"
}, inplace=True)

print("âœ… Final columns:", df_chronic.columns.tolist())

# Convert CHRONIC_FLAG to integer
df_chronic["CHRONIC_FLAG"] = pd.to_numeric(df_chronic["CHRONIC_FLAG"], errors="coerce")

# Create table and load into PostgreSQL
with engine.connect() as conn:
    conn.execute(text('DROP TABLE IF EXISTS "ICD10_ChronicIndicator" CASCADE;'))
    conn.execute(text("""
        CREATE TABLE "ICD10_ChronicIndicator" (
            "ICD10" TEXT PRIMARY KEY,
            "DESCRIPTION" TEXT,
            "CHRONIC_FLAG" INT
        );
    """))
    conn.commit()

df_chronic.to_sql(
    "ICD10_ChronicIndicator",
    engine,
    if_exists="append",
    index=False,
    method="multi",
    chunksize=5000
)

print(f"âœ… Chronic indicator file loaded ({len(df_chronic)} rows)")

ðŸ“¥ Loading C:\Users\bruck\EAS503\Final_Project\Data\Raw_CSV\icd10diag_chronic_indicator.csv ...
âœ… Cleaned column names: ['ICD-10-CM CODE', 'ICD-10-CM CODE DESCRIPTION', 'CHRONIC INDICATOR']
âœ… Final columns: ['ICD10', 'DESCRIPTION', 'CHRONIC_FLAG']


  df_chronic = df_chronic.applymap(lambda x: x.replace("'", "").strip() if isinstance(x, str) else x)


âœ… Chronic indicator file loaded (75725 rows)


In [39]:
# -----------------------------
# Load ICD-10 body system mapping
# -----------------------------
 

# Define path
ccsr_file = data_path / "DXCCSR_v2026-1.csv"
print(f"ðŸ”„ Loading {ccsr_file} ...")

# Load and clean CCSR file
ccsr_raw = pd.read_csv(ccsr_file, dtype=str, engine="python")
ccsr_raw.columns = [col.replace("'", "").strip() for col in ccsr_raw.columns]
ccsr_raw = ccsr_raw.applymap(lambda x: x.replace("'", "").strip() if isinstance(x, str) else x)

# Select relevant columns
ccsr = ccsr_raw[["ICD-10-CM CODE", "CCSR CATEGORY 1"]].copy()

# Map CCSR prefix to body system
body_system_map = {
    "BLD": "Blood/Immune",
    "CIR": "Circulatory",
    "DEN": "Dental",
    "DIG": "Digestive",
    "EAR": "Ear",
    "END": "Endocrine/Metabolic",
    "EXT": "External Causes",
    "EYE": "Eye",
    "FAC": "Health Status/Contact",
    "GEN": "Genitourinary",
    "INF": "Infectious",
    "INJ": "Injury/Poisoning",
    "MAL": "Congenital",
    "MBD": "Mental/Behavioral",
    "MUS": "Musculoskeletal",
    "NEO": "Neoplasms",
    "NVS": "Nervous System",
    "PNL": "Perinatal",
    "PRG": "Pregnancy/Childbirth",
    "RSP": "Respiratory",
    "SKN": "Skin/Subcutaneous",
    "SYM": "Symptoms/Signs",
    "XXX": "Unacceptable Diagnosis"
}

ccsr["CCSR_PREFIX"] = ccsr["CCSR CATEGORY 1"].str[:3]
ccsr["Body_System"] = ccsr["CCSR_PREFIX"].map(body_system_map)

# Rename columns for SQL compatibility
ccsr = ccsr.rename(columns={
    "ICD-10-CM CODE": "ICD10",
    "CCSR CATEGORY 1": "CCSR_CATEGORY_1"
})

# Create table and load into PostgreSQL
with engine.connect() as conn:
    conn.execute(text('DROP TABLE IF EXISTS "CCSR_ICD10" CASCADE;'))
    conn.execute(text("""
        CREATE TABLE "CCSR_ICD10" (
            "ICD10" TEXT PRIMARY KEY,
            "CCSR_CATEGORY_1" TEXT,
            "CCSR_PREFIX" TEXT,
            "Body_System" TEXT
        );
    """))
    conn.commit()

ccsr.to_sql(
    "CCSR_ICD10",
    engine,
    if_exists="append",
    index=False,
    method="multi",
    chunksize=5000
)

print("âœ… CCSR_ICD10 table created and loaded into SQL")



# Count ICD-10 codes grouped by category and body system
ccsr_counts = (
    ccsr.groupby(["CCSR_CATEGORY_1", "Body_System"])
    .size()
    .reset_index(name="ICD10_Code_Count")
    .sort_values(by="ICD10_Code_Count", ascending=False)
)

print("âœ… ICD-10 code counts by CCSR category and body system:")
print(ccsr_counts.head(20))


ðŸ”„ Loading C:\Users\bruck\EAS503\Final_Project\Data\Raw_CSV\DXCCSR_v2026-1.csv ...


  ccsr_raw = ccsr_raw.applymap(lambda x: x.replace("'", "").strip() if isinstance(x, str) else x)


âœ… CCSR_ICD10 table created and loaded into SQL
âœ… ICD-10 code counts by CCSR category and body system:
    CCSR_CATEGORY_1       Body_System  ICD10_Code_Count
266          INJ073  Injury/Poisoning              7572
235          INJ042  Injury/Poisoning              4702
234          INJ041  Injury/Poisoning              4348
116          EXT029   External Causes              2415
117          EXT030   External Causes              2415
268          INJ075  Injury/Poisoning              1621
200          INJ004  Injury/Poisoning              1555
201          INJ005  Injury/Poisoning              1435
215          INJ019  Injury/Poisoning               936
236          INJ043  Injury/Poisoning               832
249          INJ056  Injury/Poisoning               824
118          EYE001               Eye               810
247          INJ054  Injury/Poisoning               765
213          INJ017  Injury/Poisoning               765
310          MUS007   Musculoskeletal               72

In [None]:
# ---------------------------------------------------------
# Load demographic mapping dictionaries into PostgreSQL
# ---------------------------------------------------------

import pandas as pd
from sqlalchemy import text

# ---------------------------------------------------------
# 1. Reusable function to load ANY mapping dict into SQL
# ---------------------------------------------------------
def load_mapping_to_sql(mapping_dict, table_name, engine):
    """
    Loads a simple key/value mapping dictionary into a SQL table.
    Schema:
        CODE (TEXT PRIMARY KEY)
        LABEL (TEXT)
    """
    # Convert dict â†’ DataFrame
    df_map = pd.DataFrame(
        [(str(k), v) for k, v in mapping_dict.items()],
        columns=["CODE", "LABEL"]
    )

    # Drop + recreate table
    with engine.connect() as conn:
        conn.execute(text(f'DROP TABLE IF EXISTS "{table_name}" CASCADE;'))
        conn.execute(text(f"""
            CREATE TABLE "{table_name}" (
                "CODE" TEXT PRIMARY KEY,
                "LABEL" TEXT
            );
        """))
        conn.commit()

    # Bulk insert
    df_map.to_sql(
        table_name,
        engine,
        if_exists="append",
        index=False,
        method="multi",
        chunksize=5000
    )

    print(f"âœ… {table_name} table created and loaded into SQL")


# ---------------------------------------------------------
# 2. Define demographic mapping dictionaries
# ---------------------------------------------------------

# Sex (Gender) mapping
sex_map = {
    0: "Unknown",
    1: "Male",
    2: "Female"
}

# Race mapping
race_map = {
    0: "Unknown",
    1: "White",
    2: "Black",
    3: "Other",
    4: "Asian",
    5: "Hispanic",
    6: "North American Native"
}

# State mapping (full list)
state_map = {
    1: "Alabama", 2: "Alaska", 3: "Arizona", 4: "Arkansas", 5: "California",
    6: "Colorado", 7: "Connecticut", 8: "Delaware", 9: "District of Columbia",
    10: "Florida", 11: "Georgia", 12: "Hawaii", 13: "Idaho", 14: "Illinois",
    15: "Indiana", 16: "Iowa", 17: "Kansas", 18: "Kentucky", 19: "Louisiana",
    20: "Maine", 21: "Maryland", 22: "Massachusetts", 23: "Michigan",
    24: "Minnesota", 25: "Mississippi", 26: "Missouri", 27: "Montana",
    28: "Nebraska", 29: "Nevada", 30: "New Hampshire", 31: "New Jersey",
    32: "New Mexico", 33: "New York", 34: "North Carolina", 35: "North Dakota",
    36: "Ohio", 37: "Oklahoma", 38: "Oregon", 39: "Pennsylvania",
    40: "Puerto Rico", 41: "Rhode Island", 42: "South Carolina",
    43: "South Dakota", 44: "Tennessee", 45: "Texas", 46: "Utah",
    47: "Vermont", 48: "Virgin Islands", 49: "Virginia", 50: "Washington",
    51: "West Virginia", 52: "Wisconsin", 53: "Wyoming", 54: "Africa",
    55: "Asia", 56: "Canada and Islands",
    57: "Central America and West Indies", 58: "Europe", 59: "Mexico",
    60: "Oceania", 61: "Philippines", 62: "South America",
    63: "U.S. Possessions", 64: "American Samoa", 65: "Guam",
    66: "Northern Marianas Islands", 67: "Texas", 68: "Florida",
    69: "Florida", 70: "Kansas", 71: "Louisiana", 72: "Ohio",
    73: "Pennsylvania", 74: "Texas", 80: "Maryland",
    97: "Northern Marianas", 98: "Guam", 99: "Unknown or American Samoa"
}

print("âœ… Mapping dictionaries defined (sex_map, race_map, state_map)")


# ---------------------------------------------------------
# 3. Load all mapping tables into PostgreSQL
# ---------------------------------------------------------

load_mapping_to_sql(sex_map, "SEX_MAP", engine)
load_mapping_to_sql(race_map, "RACE_MAP", engine)
load_mapping_to_sql(state_map, "STATE_MAP", engine)

print("âœ… All demographic mapping tables loaded successfully")

# load_mapping_to_sql(mapping_dict, table_name, engine)

âœ… Mapping dictionaries defined (sex_map, race_map, state_map)
âœ… SEX_MAP table created and loaded into SQL
âœ… RACE_MAP table created and loaded into SQL
âœ… STATE_MAP table created and loaded into SQL
âœ… All demographic mapping tables loaded successfully
