In [1]:
import os
import sys
import pandas as pd
from sqlalchemy import create_engine, text, inspect
import pymysql
from dotenv import load_dotenv  # For securely managing credentials
import hashlib

In [2]:
# --- 1. CONFIGURATION: Load credentials from a .env file ---
# Create a file named ".env" in the same directory with:
# DB_USER="your_user"
# DB_PASS="your_password"
# DB_HOST="localhost"
# DB_NAME="HealthcareADT_DW"
#load_dotenv()

DB_USER = 'root'
DB_PASS = 'root@1234'
DB_HOST = 'localhost'
DB_NAME = 'HealthcareADT_DW'

if not all([DB_USER, DB_PASS, DB_HOST, DB_NAME]):
    print("Error: Database credentials are not set in .env file.")
    sys.exit(1)

In [3]:
# Encoding password
import urllib.parse

encoded_password = urllib.parse.quote(DB_PASS)

In [5]:
# Create a database connection engine
try:
    connection_string = f"mysql+pymysql://{DB_USER}:{encoded_password}@{DB_HOST}/{DB_NAME}"
    engine = create_engine(connection_string)
    print("Database connection engine created successfully.")
except Exception as e:
    print(f"Error creating database engine: {e}")
    sys.exit(1)

Database connection engine created successfully.


In [6]:
inspect(engine).get_table_names()

[]

In [7]:
# This is the User Defined Function responsible for extracting data from the main csv file.
def extract_and_validate_csv(file_path):
    """
    Extracts data from CSV, validates, cleans, transforms, and
    creates a unique business key.
    """
    print(f"Starting extraction and validation for: {file_path}")
    
    try:
        df = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None
    except Exception as e:
        print(f"Error reading CSV: {e}")
        return None
    # #######################
    # --- Amending the original dataset column names to fit the script names
    df.columns = [col.replace(" ", "_") for col in df.columns]
    # #######################
    # --- 1. Data Cleaning ---
    # Trim all string columns
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = df[col].str.strip()

    # Convert numeric and date columns, handling errors
    df['Billing_Amount'] = pd.to_numeric(
        df['Billing_Amount'].astype(str).str.replace(r'[$,]', '', regex=True),
        errors='coerce'  # Bad values become NaN (NULL)
    )
    df['Age'] = pd.to_numeric(df['Age'], errors='coerce')
    
    # Enforce date format
    df['Date_of_Admission'] = pd.to_datetime(
        df['Date_of_Admission'], 
        format='%m/%d/%Y',
        errors='coerce'  # Bad dates become NaT (NULL)
    )
    df['Discharge_Date'] = pd.to_datetime(
        df['Discharge_Date'], 
        format='%m/%d/%Y', 
        errors='coerce'
    )

    # --- 2. Validation: Drop rows with critical missing data ---
    initial_rows = len(df)
    df.dropna(subset=['Name', 'Date_of_Admission'], inplace=True)
    if initial_rows > len(df):
        print(f"Dropped {initial_rows - len(df)} rows due to missing Name or Admission Date.")

    # --- 3. Deduplication (within the batch) ---
    # Remove fully duplicated rows
    initial_rows = len(df)
    df.drop_duplicates(inplace=True)
    if initial_rows > len(df):
        print(f"Dropped {initial_rows - len(df)} fully duplicate rows.")

    # Keep min age for rows duplicated on all other fields
    initial_rows = len(df)
    group_cols = [col for col in df.columns if col not in ['Age']]
    
    if group_cols:
        # We must re-aggregate all non-group columns
        agg_dict = {col: 'first' for col in df.columns if col not in group_cols}
        agg_dict['Age'] = 'min'
        
        df = df.groupby(group_cols, as_index=False).agg(agg_dict)

    if initial_rows > len(df):
        print(f"Consolidated {initial_rows - len(df)} rows based on minimum age.")

    # --- 4. Create the Unique Business Key (SourceAdmissionID) ---
    # This key uniquely identifies an admission event for idempotency.
    key_cols = ['Name', 'Date_of_Admission', 'Doctor', 'Hospital', 'Medical_Condition']
    
    # Create a stable composite key string
    df['composite_key'] = df[key_cols].fillna('').astype(str).apply(lambda x: '|'.join(x), axis=1)
    
    # Create a SHA-256 hash (64 chars)
    df['SourceAdmissionID'] = df['composite_key'].apply(
        lambda x: hashlib.sha256(x.encode()).hexdigest()
    )
    
    # Drop the temporary helper column
    df.drop(columns=['composite_key'], inplace=True)

    print(f"Validation complete. {len(df)} clean rows ready for staging.")
    return df

In [8]:
# This is the User Defined Function responsible for loading the cleaned data into the staging
# table in the database
def load_to_staging(df, db_engine):
    """
    Loads the clean DataFrame into the Staging_Admissions table.
    This process is a full TRUNCATE and load of the *batch*.
    """
    print("Loading data to staging table...")
    try:
        with db_engine.connect() as conn:
            # We use a transaction for the staging load
            with conn.begin():
                conn.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
                conn.execute(text("TRUNCATE TABLE Staging_Admissions;"))
                conn.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
                
                # Load the *cleaned* dataframe
                df.to_sql('Staging_Admissions', con=conn, if_exists='append', index=False)
        
        print(f"Successfully loaded {len(df)} rows to Staging_Admissions.")
    except Exception as e:
        print(f"Error loading to staging: {e}")
        raise

In [9]:
# Start loading the data warehouse from the staging admission table into the fact and dimensions
def run_data_warehouse_load(db_engine):
    """
    Executes the incremental stored procedure in MySQL.
    """
    print("Calling stored procedure sp_LoadDataWarehouse_Incremental...")
    try:
        with db_engine.connect() as conn:
            with conn.begin():
                # The stored procedure handles its own transaction
                conn.execute(text("CALL sp_LoadDataWarehouse_Incremental();"))
        
        print("Data warehouse load procedure executed successfully.")
    except Exception as e:
        print(f"Error running stored procedure: {e}")
        raise

In [10]:
# --- Main execution block ---
if __name__ == "__main__":
    
    # You would pass this in as an argument in a real script
    csv_file_path = '/Users/waleedmouhammed/Library/CloudStorage/OneDrive-Personal/Desktop/تكامل/Health Care Analytics/Raw Data/Raw Data CSV.csv' 
    
    try:
        # 1. Extract, Transform, Validate
        clean_df = extract_and_validate_csv(csv_file_path)
        
        if clean_df is not None and not clean_df.empty:
            # 2. Load to Staging
            load_to_staging(clean_df, engine)
            
            # 3. Load to Data Warehouse
            run_data_warehouse_load(engine)
            
            print("\nETL process completed successfully!")
        
        else:
            print("ETL process halted: No valid data to load.")

    except Exception as e:
        print(f"\nETL process FAILED: {e}")
        sys.exit(1)

Starting extraction and validation for: /Users/waleedmouhammed/Library/CloudStorage/OneDrive-Personal/Desktop/تكامل/Health Care Analytics/Raw Data/Raw Data CSV.csv
Dropped 534 fully duplicate rows.
Consolidated 4966 rows based on minimum age.
Validation complete. 50000 clean rows ready for staging.
Loading data to staging table...
Error loading to staging: (pymysql.err.ProgrammingError) (1146, "Table 'healthcareadt_dw.staging_admissions' doesn't exist")
[SQL: TRUNCATE TABLE Staging_Admissions;]
(Background on this error at: https://sqlalche.me/e/20/f405)

ETL process FAILED: (pymysql.err.ProgrammingError) (1146, "Table 'healthcareadt_dw.staging_admissions' doesn't exist")
[SQL: TRUNCATE TABLE Staging_Admissions;]
(Background on this error at: https://sqlalche.me/e/20/f405)


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
