In [2]:
import pandas as pd
import mysql.connector
from datetime import datetime
from dateutil import parser


#  Database Configuration


SOURCE_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'sher123'
}

DW_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'sher123',
    'database': 'DataWareHouse'
}


# 📤 Extract Phase


def extract_from_sources():
    print(" Extracting data from source systems...")

    patient_conn = mysql.connector.connect(**SOURCE_CONFIG, database='patient')
    billing_conn = mysql.connector.connect(**SOURCE_CONFIG, database='hospital_billing_system')
    pharmacy_conn = mysql.connector.connect(**SOURCE_CONFIG, database='pharmacy_inventory_system')

    df_patients = pd.read_sql("SELECT * FROM patient_records", patient_conn)
    df_billing = pd.read_sql("SELECT * FROM hospital_billing", billing_conn)
    df_pharmacy = pd.read_sql("SELECT * FROM pharmacy_inventory", pharmacy_conn)
    df_public = pd.read_csv("External_Public_Health_Data.csv")

    patient_conn.close()
    billing_conn.close()
    pharmacy_conn.close()

    print(" Extraction complete.")
    return df_patients, df_billing, df_pharmacy, df_public


#  Transform Phase


def transform_data(df_patients, df_billing, df_pharmacy, df_public):
    print(" Transforming data...")

    df_billing['billing_date'] = pd.to_datetime(df_billing['billing_date'], errors='coerce')
    df_pharmacy['dispensing_date'] = pd.to_datetime(df_pharmacy['dispensing_date'], errors='coerce')

    for col in ['procedure_cost', 'medication_cost', 'insurance_claim']:
        df_billing[col] = df_billing[col].fillna(0)

    df_patients['birth_date'] = pd.to_datetime('1990-01-01')
    df_patients['address'] = 'Unknown'

    df_public['cases'] = df_public['cases'].fillna(0).astype(int)
    df_public['month'] = pd.to_datetime(df_public['month'], format='%Y-%m', errors='coerce')

    print(" Transformation complete.")
    return df_patients, df_billing, df_pharmacy, df_public


#  Load Phase


def load_to_warehouse(df_patients, df_billing, df_pharmacy, df_public):
    print(" Loading data into Data Warehouse...")

    dw_conn = mysql.connector.connect(**DW_CONFIG)
    cursor = dw_conn.cursor()

    # Load Dim_Date
    date_range = pd.date_range(start='2025-01-01', end='2025-12-31')
    for idx, date in enumerate(date_range, start=1):
        cursor.execute("""
            INSERT IGNORE INTO Dim_Date (date_sk, full_date, day, month, year, weekday)
            VALUES (%s, %s, %s, %s, %s, %s)
        """, (idx, date.date(), date.day, date.month, date.year, date.day_name()))

    # Load Dim_Patient
    for _, row in df_patients.iterrows():
        cursor.execute("""
            INSERT INTO Dim_Patient (patient_id, name, gender, birth_date, address, start_date, end_date, is_current)
            VALUES (%s, %s, %s, %s, %s, CURDATE(), NULL, 1)
        """, (row['patient_id'], row['name'], row['gender'], row['birth_date'], row['address']))

    # Load Fact_Visits (based on billing table with diagnosis_code)
    for _, row in df_billing.iterrows():
        patient_id = row['patient_id']
        diagnosis_code = row['diagnosis_code'] if pd.notnull(row['diagnosis_code']) else 'D001'
        treatment = "Paracetamol + Rest"
        hospital_id = "HSP-001"
        visit_date = row['billing_date'].date() if not pd.isnull(row['billing_date']) else datetime(2025, 1, 1).date()

        cursor.execute("""
            INSERT INTO Fact_Visits (patient_sk, date_sk, diagnosis_sk, treatment, hospital_id)
            VALUES (
                (SELECT patient_sk FROM Dim_Patient WHERE patient_id = %s LIMIT 1),
                (SELECT date_sk FROM Dim_Date WHERE full_date = %s LIMIT 1),
                (SELECT diagnosis_sk FROM Dim_Diagnosis WHERE diagnosis_code = %s LIMIT 1),
                %s, %s
            )
        """, (patient_id, visit_date, diagnosis_code, treatment, hospital_id))

    # Load Fact_Billing
    for _, row in df_billing.iterrows():
        billing_date = row['billing_date']
        if pd.isnull(billing_date) or not hasattr(billing_date, 'date'):
            billing_date = datetime(2025, 1, 1).date()
        else:
            billing_date = billing_date.date()

        cursor.execute("""
            INSERT INTO Fact_Billing (patient_sk, date_sk, procedure_cost, medication_cost, insurance_claim)
            VALUES (
                (SELECT patient_sk FROM Dim_Patient WHERE patient_id = %s LIMIT 1),
                (SELECT date_sk FROM Dim_Date WHERE full_date = %s LIMIT 1),
                %s, %s, %s
            )
        """, (row['patient_id'], billing_date, row['procedure_cost'], row['medication_cost'], row['insurance_claim']))

    # Load Dim_Medication and Fact_Prescriptions
    for _, row in df_pharmacy.iterrows():
        dispensing_date = row['dispensing_date']
        if pd.isnull(dispensing_date) or not hasattr(dispensing_date, 'date'):
            dispensing_date = datetime(2025, 1, 1).date()
        else:
            dispensing_date = dispensing_date.date()

        cursor.execute("""
            INSERT IGNORE INTO Dim_Medication (medication_code, name, supplier, category)
            VALUES (%s, %s, %s, %s)
        """, (row['medication_code'], row['medication_name'], row['supplier'], 'General'))

        cursor.execute("""
            INSERT INTO Fact_Prescriptions (patient_sk, medication_sk, date_sk, quantity)
            VALUES (
                (SELECT patient_sk FROM Dim_Patient WHERE patient_id = %s LIMIT 1),
                (SELECT medication_sk FROM Dim_Medication WHERE medication_code = %s LIMIT 1),
                (SELECT date_sk FROM Dim_Date WHERE full_date = %s LIMIT 1),
                %s
            )
        """, (row['dispensing_visit_id'], row['medication_code'], dispensing_date, row['stock_level']))

    # Load Dim_Location and Fact_Public_Health
    for _, row in df_public.iterrows():
        cursor.execute("""
            INSERT IGNORE INTO Dim_Location (zip_code, region_name, city_name)
            VALUES (%s, %s, %s)
        """, (row['zip_code'], 'Default Region', 'Default City'))

        public_date = row['month'].date() if not pd.isnull(row['month']) else datetime(2025, 1, 1).date()

        cursor.execute("""
            INSERT INTO Fact_Public_Health (date_sk, location_sk, disease_name, case_count)
            VALUES (
                (SELECT date_sk FROM Dim_Date WHERE full_date = %s LIMIT 1),
                (SELECT location_sk FROM Dim_Location WHERE zip_code = %s LIMIT 1),
                %s, %s
            )
        """, (public_date, row['zip_code'], row['disease'], row['cases']))

    dw_conn.commit()
    cursor.close()
    dw_conn.close()
    print("Load complete. ETL finished successfully.")


#  Run ETL Pipeline


if __name__ == "__main__":
    try:
        print("🚀 Starting ETL process...\n")
        patients, billing, pharmacy, public_health = extract_from_sources()
        patients, billing, pharmacy, public_health = transform_data(patients, billing, pharmacy, public_health)
        load_to_warehouse(patients, billing, pharmacy, public_health)
        print("\n🎉 ETL process complete.")

    except mysql.connector.Error as err:
        print(f" MySQL Error: {err}")

    except Exception as e:
        print(f" General Error: {e}")


🚀 Starting ETL process...

 Extracting data from source systems...


  df_patients = pd.read_sql("SELECT * FROM patient_records", patient_conn)
  df_billing = pd.read_sql("SELECT * FROM hospital_billing", billing_conn)
  df_pharmacy = pd.read_sql("SELECT * FROM pharmacy_inventory", pharmacy_conn)


 Extraction complete.
 Transforming data...
 Transformation complete.
 Loading data into Data Warehouse...
Load complete. ETL finished successfully.

🎉 ETL process complete.
