# Medical Model Monitor | M3
##### Capstone 4: Applied Model Application
---

## Use Case & Scenario
- **Problem Statement:**
Hospitals struggle to continuously monitor patients' vital signs and promptly identify those at risk of adverse events. Complex data and environmental factors can delay intervention, increase risk, and incur higher expense.

- **Solution:**
A mobile and desktop application that notifies staff of patients needing assistance. From a technical perspective, this involves training a range of models to establish a baseline of knowledge, creating a prediction for values based on historical analysis, and managing real-time inputs for validation.


---

## Approach
Considering the scope, data analysis will be logically separate from model development. This will help reduce overhead and visual complexity. As such, please make sure you're reviewing the correct notebook:

- Data Collection & Analysis `<- you are here`
- Model Development


## Notebook Structure
- Prerequisites; imports, generic functions
- Data import and extraction
- Analysis of raw data
- Cleaning and generalizations
- Analysis of transformed data
---

# Data Authorization and Access
Research-grade medical data is considered public, but retains certain safeguards to deter improper use and mitigate risk. This introduced some delay as various steps were completed, and requests were processed by third party teams and systems. These prerequisites were anticipated though, as outlined in the project proposal. 

To summarize, the following steps were completed in order to gain access to the `MIMIC IV` dataset:
1. Registration on PhysioNet website
2. PhysioNet application review and approval; use case and reference evaluation
3. Training Completion; CITI 'Data or Specimens Only Research Training'
4. Code of Conduct agreement
5. Credentialed Health Data Use Agreement (per dataset)

After all steps were completed, access was granted to `credentialed datasets`, to include the `MIMIC IV` dataset.

# Data Extraction and Loading

Post-authorization, I elected to locally store the ~120 GB dataset in `PostgreSQL` over cloud-based access. This incurred a restricted download (500 kb/s) over ~19 hours for all zipped tables to complete. The end result, two datasets, comprising over half a million records, for over a quarter-million individuals.


| Records (Qty) | Scope              |
|---------------|--------------------|
| 364,627       | unique individuals |
| 546,028       | hospitalizations   |
| 94,458        | unique ICU stays   |


> `hosp` contains `546,028 hospitalizations` for `223,452 unique individuals`  
> `icu` contains `94,458 ICU stays` for `65,366 unique individuals`

# Data Visual Inspection
With records unzipped and imported into `PostgreSQL` I could begin inspecting table columns.

## .High frequency of `null` values
Despite broad use, `null` values should remain in most cases.  
Larger tables combine various events that are inherently unique, and could be degraded in quality if subjected to rounding, interpolation or similar data manipulation. While handling varies by each case, generally speaking, high-level analysis may be inclined to `drop` such columns, whereas fine-grained analysis may `filter` records on specific data types and values.

## .Depersonalization; Modification of Date and Age values
Outlined in the official documentation, all personally identifiable information has been scrubbed from `MIMIC-IV`, and date/age values have been shifted at random, but retain their relation. These transformed values map to subsequent `anchor` columns, explained below:   

The `anchor_year` column is a deidentified year occurring sometime between 2100 - 2200.  
The `anchor_year_group` column is one of the following values: "2008 - 2010", "2011 - 2013", "2014 - 2016", "2017 - 2019", and "2020 - 2022".  
> Example: if a patient's `anchor_year` is 2158, and their `anchor_year_group` is 2011 - 2013, then any hospitalizations for the patient occurring in the year 2158 actually occurred sometime between 2011 - 2013.


The `anchor_age` provides the patient age in the given `anchor_year`.
> Example: If the patient was over 89 in the `anchor_year`, this `anchor_age` has been set to 91 (i.e. all patients over 89 have been grouped together into a single group with value 91, regardless of what their real age was).

## .Summary
To reiterate, retaining `null` values will vary by context. From basic analysis, it appears they will be ignored/dropped when performing broad-spectrum queries, where such detailed values would be irrelevant. Conversely, specific analysis, such as queries based on a specific condition, medication, or person, may benefit from retaining them, providing detailed insight on both condition and treatment.

This review affirms the machine learning algorithms and models chosen in the project proposal. There is a substantial amount of variability, with highly dimensional tables spanning a very broad range of topics and events. Selection of Random Forest, Gradient Descent and Neural Networks is far more applicable than their linear counterparts, which would struggle with overfitting and context development.

# Imports, Data & Helper Functions

In [11]:
# Common imports
import os
import json
import math
from dotenv import load_dotenv
from datetime import datetime
import logging
import time

# For db connectivity
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

# The rest; data, visualizations etc...
from functools import wraps
import pandas as pd
import numpy as np
import scipy
import matplotlib.pyplot as plt
%matplotlib inline

load_dotenv(verbose=True, encoding='utf-8')

True

## - Helpers

In [12]:
# ANSI color codes for different log levels
COLOR_CODES = {
    'DEBUG': '\033[90m',  # Gray (subtle)
    'INFO': '\033[37m',  # White (base)
    'SUCCESS': '\033[92m',  # Green (base)
    'WARNING': '\033[93m',  # Yellow (base)
    'ERROR': '\033[91m',  # Red (strong)
    'CRITICAL': '\033[95m',  # Magenta (strong)
    'VERBOSE': '\033[94m',  # Blue
}
TEXT_SUBTLE = '\033[90m'  # Subtle for debug logs
TEXT_BASE = '\033[37m'  # Base color for info, success, warning, and verbose
TEXT_STRONG = '\033[97m'  # Strong color for error and critical logs
RESET = '\033[0m'  # Reset to default


class CustomColorFormatter(logging.Formatter):
    """
    Custom logging formatter to colorize level indicator and timestamp.
    """

    def format(self, record):
        # Get base message value, and append any args
        message = record.getMessage()

        if record.args:
            message += f" {record.args}"

        # Determine level-specific colors
        level_color = COLOR_CODES.get(record.levelname, RESET)
        message_color = (
            TEXT_SUBTLE if record.levelname == 'DEBUG' else
            TEXT_BASE if record.levelname in ['INFO'] else
            TEXT_STRONG if record.levelname in ['SUCCESS', 'WARNING', 'VERBOSE'] else
            level_color
        )

        # Format the level indicator
        colored_level = f"{level_color}[{record.levelname}]{RESET}"

        # Format the timestamp in gray
        timestamp = self.formatTime(record)
        colored_timestamp = f"{TEXT_SUBTLE}{timestamp}{RESET}"

        # Format the message        
        colored_message = f"{message_color}{message}{RESET}"

        # Construct the final log message
        formatted_message = f"{colored_level} {colored_timestamp} {colored_message}"
        return formatted_message

    def formatTime(self, record, datefmt=None):
        """
        Format the time in local time, as HH:mm:ss.
        """
        local_time = time.localtime(record.created)
        return time.strftime("%H:%M:%S", local_time)  # HH:mm:ss


# Logger setup
# Add support for SUCCESS and VERBOSE log levels, between INFO (20) and WARNING (30)
SUCCESS_LEVEL = 24
logging.addLevelName(SUCCESS_LEVEL, "SUCCESS")


def success(self, message, *args, **kwargs):
    if self.isEnabledFor(SUCCESS_LEVEL):
        self._log(SUCCESS_LEVEL, message, args, **kwargs)


logging.Logger.success = success

VERBOSE_LEVEL = 26
logging.addLevelName(VERBOSE_LEVEL, "VERBOSE")


def verbose(self, message, *args, **kwargs):
    if self.isEnabledFor(VERBOSE_LEVEL):
        self._log(VERBOSE_LEVEL, message, args, **kwargs)


logging.Logger.verbose = verbose


def setup_logger():
    # Create a logger
    logger = logging.getLogger("ColorLogger")
    logger.setLevel(logging.DEBUG)

    # Create a console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)

    # Set the custom formatter
    formatter = CustomColorFormatter()
    console_handler.setFormatter(formatter)

    # Add the handler to the logger
    logger.addHandler(console_handler)

    return logger


logger = setup_logger()
logger.debug('Logger initialized!')

[90m[DEBUG][0m [90m23:35:31[0m [90mLogger initialized![0m
[90m[DEBUG][0m [90m23:35:31[0m [90mLogger initialized![0m


In [14]:
def log(label: str = "ok", level: int = logging.INFO):
    """
    Decorator to log the result of a function at the specified log level.
    Logs exceptions as warnings or errors.

    Args:
        label (str): A label to include in the log message.
        level (int): The log level for successful execution (default: INFO).

    Returns:
        The decorated function.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                # Execute the decorated function
                result = func(*args, **kwargs)

                # Log the result at the specified log level
                logger.log(level, f"[{label}] {result}")
                return result
            except Exception as e:
                # Log exceptions as errors or warnings
                logger.error(f"[{label}] Exception occurred: {e}", exc_info=True)
                raise  # Re-raise the exception after logging

        return wrapper

    return decorator

In [15]:
# Establish local postgresql db connection
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT")
db_name = os.getenv("DB_NAME")

db_conn_string = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
db_conn_query = """
select implementation_info_name key, character_value as value
from mimiciv.information_schema.sql_implementation_info
where implementation_info_name like 'DBMS %'
"""


@log('DB', 24)
def setup_db_connection(conn_string: str, conn_query: str):
    try:
        base_engine = create_engine(conn_string)
        with base_engine.connect() as conn:
            conn.execute(text(conn_query))
        return base_engine
    except Exception as e:
        # Catch any unexpected exceptions and escalate as critical
        raise


engine = setup_db_connection(conn_string=db_conn_string, conn_query=db_conn_query)

[92m[SUCCESS][0m [90m23:35:49[0m [97m[DB] Engine(postgresql://postgres:***@localhost:5432/mimiciv)[0m
[92m[SUCCESS][0m [90m23:35:49[0m [97m[DB] Engine(postgresql://postgres:***@localhost:5432/mimiciv)[0m


In [16]:
def calc_runtime(func):
    """
      Decorator to track runtime of a given function.
      Args:
          func (function): A function to wrap for timing.
      Returns:
          The decorated function.
    """

    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        runtime = end_time - start_time
        logger.debug(f"[ok] '{func.__name__}' in {runtime:.4f} seconds.")
        return result

    return wrapper


In [17]:
def run_sql(func):
    def wrapper(*args, **kwargs):
        try:
            sql_query = func(*args, **kwargs)
            return pd.read_sql_query(sql_query, engine)
        except Exception as e:
            logger.error(f"Error executing query: {e}")
            return None
    return wrapper


@run_sql
def get_hospital_admissions():
    return "SELECT * FROM mimiciv_hosp.admissions LIMIT 10;"


# Function call to get the data
admissions_data = get_hospital_admissions()


## - Plotting

In [18]:
# generic plots
# plot_histogram(data, 'age', bins=10, title="Age Distribution", xlabel="Age (years)")
def plot_histogram(data, column, bins=20, title=None, xlabel=None, ylabel="Frequency"):
    """
    Creates a histogram for a specific column in the dataset.
    
    Parameters:
    - data: pandas DataFrame
    - column: str, column name for which the histogram is created
    - bins: int, number of bins for the histogram (default: 20)
    - title: str, title of the plot (default: None)
    - xlabel: str, label for the x-axis (default: None)
    - ylabel: str, label for the y-axis (default: "Frequency")
    """
    plt.figure(figsize=(8, 6))
    plt.hist(data[column].dropna(), bins=bins, alpha=0.7, color='blue', edgecolor='black')
    plt.title(title if title else f"Histogram of {column}")
    plt.xlabel(xlabel if xlabel else column)
    plt.ylabel(ylabel)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()


# plot_scatter(data, 'age', 'blood_pressure', title="Blood Pressure vs Age", xlabel="Age", ylabel="Blood Pressure")
def plot_scatter(data, x_col, y_col, title=None, xlabel=None, ylabel=None, alpha=0.7):
    """
    Creates a scatter plot for two columns in the dataset.
    
    Parameters:
    - data: pandas DataFrame
    - x_col: str, column name for the x-axis
    - y_col: str, column name for the y-axis
    - title: str, title of the plot (default: None)
    - xlabel: str, label for the x-axis (default: None)
    - ylabel: str, label for the y-axis (default: None)
    - alpha: float, transparency of the points (default: 0.7)
    """
    plt.figure(figsize=(8, 6))
    plt.scatter(data[x_col], data[y_col], alpha=alpha, color='blue')
    plt.title(title if title else f"Scatter Plot of {y_col} vs {x_col}")
    plt.xlabel(xlabel if xlabel else x_col)
    plt.ylabel(ylabel if ylabel else y_col)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()


# plot_line(data, 'timestamp', 'heart_rate', title="Heart Rate Over Time", xlabel="Timestamp", ylabel="Heart Rate (bpm)")
def plot_line(data, x_col, y_col, title=None, xlabel=None, ylabel=None):
    """
    Creates a line plot for two columns in the dataset.
    
    Parameters:
    - data: pandas DataFrame
    - x_col: str, column name for the x-axis
    - y_col: str, column name for the y-axis
    - title: str, title of the plot (default: None)
    - xlabel: str, label for the x-axis (default: None)
    - ylabel: str, label for the y-axis (default: None)
    """
    plt.figure(figsize=(8, 6))
    plt.plot(data[x_col], data[y_col], color='blue', marker='o', linestyle='-')
    plt.title(title if title else f"Line Plot of {y_col} over {x_col}")
    plt.xlabel(xlabel if xlabel else x_col)
    plt.ylabel(ylabel if ylabel else y_col)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()

## - SQL Queries (param-driven)

In [19]:
def query_demographics(pt_subject_id: int) -> str:
    return f"""
    SELECT subject_id, gender, anchor_age AS age, anchor_year_group AS cohort
    FROM mimiciv_hosp.patients
    WHERE subject_id = {pt_subject_id};
    """

In [20]:
def query_chart_events(pt_subject_id: int) -> str:
    return f"""
    SELECT 
        ce.subject_id,
        di.label,
        AVG(ce.valuenum) AS mean_value,
        STDDEV(ce.valuenum) AS std_dev_value,
        MIN(ce.valuenum) AS min_value,
        MAX(ce.valuenum) AS max_value
    FROM mimiciv_icu.chartevents ce
    JOIN mimiciv_icu.d_items di ON ce.itemid = di.itemid
    WHERE ce.subject_id = {pt_subject_id} AND di.label IN ('Heart Rate', 'Systolic Blood Pressure', 'Respiratory Rate')
    GROUP BY ce.subject_id, di.label;
    """

In [21]:
def query_labs(pt_subject_id: int) -> str:
    return f"""
    SELECT 
        le.subject_id,
        di.label,
        AVG(le.valuenum) AS mean_value,
        STDDEV(le.valuenum) AS std_dev_value,
        MIN(le.valuenum) AS min_value,
        MAX(le.valuenum) AS max_value
    FROM mimiciv_hosp.labevents le
    JOIN mimiciv_hosp.d_labitems di ON le.itemid = di.itemid
    WHERE le.subject_id = {pt_subject_id} AND di.label IN ('Creatinine', 'Glucose', 'White Blood Cell Count', 'Hemoglobin')
    GROUP BY le.subject_id, di.label;
    """

In [22]:
def query_icu_stay(pt_subject_id: int) -> str:
    return f"""
    SELECT 
        icu.subject_id,
        AVG(icu.los) AS avg_los,
        MAX(icu.los) AS max_los
    FROM mimiciv_icu.icustays icu
    WHERE icu.subject_id = {pt_subject_id}
    GROUP BY icu.subject_id;
    """

In [23]:
def query_diagnoses(pt_subject_id: int) -> str:
    return f"""
    SELECT 
        d.subject_id,
        d_icd.icd_code,
        d_icd.long_title,
        COUNT(d.seq_num) AS diagnosis_count
    FROM mimiciv_hosp.diagnoses_icd d
    JOIN mimiciv_hosp.d_icd_diagnoses d_icd ON d.icd_code = d_icd.icd_code
    WHERE d.subject_id = {pt_subject_id}
    GROUP BY d.subject_id, d_icd.icd_code, d_icd.long_title;
    """

In [24]:
def query_procedures(pt_subject_id: int) -> str:
    return f"""
    SELECT 
        p.subject_id,
        p_icd.icd_code,
        p_icd.long_title,
        COUNT(p.seq_num) AS procedure_count
    FROM mimiciv_hosp.procedures_icd p
    JOIN mimiciv_hosp.d_icd_procedures p_icd ON p.icd_code = p_icd.icd_code
    WHERE p.subject_id = {pt_subject_id}
    GROUP BY p.subject_id, p_icd.icd_code, p_icd.long_title;
    """

In [25]:
# Function to get patient data for a specific subject_id
def get_patient_data(pt_subject_id: int):
    def _inner_runtime_logs(component: str, runtime: float):
        print(f'[{component}] - completed in {runtime:.4f} seconds')

    start_time = time.time()

    demographics_df = pd.read_sql_query(query_demographics(pt_subject_id=pt_subject_id), engine)
    _inner_runtime_logs('Demographics', time.time() - start_time)
    chart_df = pd.read_sql_query(query_chart_events(pt_subject_id=pt_subject_id), engine)
    _inner_runtime_logs('Charts', time.time() - start_time)
    lab_df = pd.read_sql_query(query_labs(pt_subject_id=pt_subject_id), engine)
    _inner_runtime_logs('Labs', time.time() - start_time)
    icu_df = pd.read_sql_query(query_icu_stay(pt_subject_id=pt_subject_id), engine)
    _inner_runtime_logs('ICU Stay', time.time() - start_time)
    diagnosis_df = pd.read_sql_query(query_diagnoses(pt_subject_id=pt_subject_id), engine)
    _inner_runtime_logs('Diagnoses', time.time() - start_time)
    procedures_df = pd.read_sql_query(query_procedures(pt_subject_id=pt_subject_id), engine)
    _inner_runtime_logs('Procedures', time.time() - start_time)

    return {
        "demographics": demographics_df,
        "charts": chart_df,
        "labs": lab_df,
        "icu": icu_df,
        "diagnoses": diagnosis_df,
        "procedures": procedures_df,
    }

In [26]:
# With known-good subject_id, test the upstream sql queries and view resulting dataframes
pdf = get_patient_data(pt_subject_id=10000032)
pdf['demographics'].head()
pdf['charts'].head()
pdf['labs'].head()
pdf['diagnoses'].head()
pdf['procedures'].head()

[Demographics] - completed in 0.0021 seconds
[Charts] - completed in 20.9938 seconds
[Labs] - completed in 31.6106 seconds
[ICU Stay] - completed in 31.6197 seconds
[Diagnoses] - completed in 31.9899 seconds
[Procedures] - completed in 32.0918 seconds


Unnamed: 0,subject_id,icd_code,long_title,procedure_count
0,10000032,5491,Percutaneous abdominal drainage,3
