# ICU Mortality Model - Feature Engineering

This notebook loads the ICU cohort and creates hourly wide dataset for the first 24 hours of ICU stay.

## Objective
- Load ICU cohort from 01_cohort.ipynb
- Use pyCLIF to extract features from CLIF tables
- Create hourly wide dataset for the first 24 hours
- Filter to encounters with complete 24-hour data
- Save features for modeling

## Feature Sources
- **Vitals**: All vital_category values
- **Labs**: All lab_category values
- **Patient Assessments**: GCS_total, RASS
- **Respiratory Support**: Mode, FiO2, PEEP, ventilator settings (with one-hot encoding)
- **Medications**: All vasoactives and sedatives

## Setup and Configuration

**Memory Management Notes:**
- This notebook processes a large dataset (54K+ hospitalizations) 
- If you encounter kernel crashes or memory errors:
  1. Set `USE_SAMPLE_DATA=True` in the configuration cell below
  2. Increase `memory_limit` parameter in the hourly aggregation function
  3. Reduce `batch_size` parameters if needed
- The hourly aggregation function uses DuckDB for optimal performance and automatically handles batching for large datasets

In [1]:
import sys
import os
sys.path.append(os.path.join('..', 'src'))

import pandas as pd
import numpy as np
from pyclif import CLIF
from pyclif.utils.wide_dataset import convert_wide_to_hourly
import json
import warnings
warnings.filterwarnings('ignore')
# Ensure the directory exists
output_dir = os.path.join('..', 'output', 'preprocessing')
os.makedirs(output_dir, exist_ok=True)
print("=== ICU Mortality Model - Feature Engineering ===")
print("Setting up environment...")

=== ICU Mortality Model - Feature Engineering ===
Setting up environment...


In [2]:
def load_config():
    """Load configuration from config.json or config_demo.json"""
    # Try config.json first, then config_demo.json
    config_path = os.path.join("config.json")
    if not os.path.exists(config_path):
        config_path = os.path.join("config_demo.json")
    
    if os.path.exists(config_path):
        with open(config_path, 'r') as file:
            config = json.load(file)
        print(f"✅ Loaded configuration from {os.path.basename(config_path)}")
    else:
        raise FileNotFoundError("Configuration file not found. Please create config.json or config_demo.json based on the config_template.")
    
    return config

# Load configuration
config = load_config()
print(f"Site: {config['site']}")
print(f"Data path: {config['clif2_path']}")
print(f"File type: {config['filetype']}")

✅ Loaded configuration from config_demo.json
Site: MIMIC
Data path: /Users/sudo_sage/Documents/WORK/clif_mimic
File type: parquet


In [3]:
# Initialize pyCLIF
clif = CLIF(
    data_dir=config['clif2_path'],
    filetype=config['filetype'],
    timezone="US/Eastern"
)

print("✅ pyCLIF initialized successfully")

CLIF Object Initialized.
✅ pyCLIF initialized successfully


## Load ICU Cohort

In [4]:
# Load ICU cohort from 01_cohort.ipynb
cohort_path = os.path.join('..', 'output', 'preprocessing', 'icu_cohort.parquet')

if os.path.exists(cohort_path):
    cohort_df = pd.read_parquet(cohort_path)
    
    # Convert datetime columns
    datetime_cols = ['start_dttm', 'hour_24_start_dttm', 'hour_24_end_dttm']
    for col in datetime_cols:
        cohort_df[col] = pd.to_datetime(cohort_df[col])
    
    print(f"✅ Loaded ICU cohort: {len(cohort_df)} hospitalizations")
    print(f"Mortality rate: {cohort_df['disposition'].mean():.3f}")
    print(f"Time range: {cohort_df['start_dttm'].min()} to {cohort_df['start_dttm'].max()}")
    
else:
    raise FileNotFoundError(f"Cohort file not found at {cohort_path}. Please run 01_cohort.ipynb first.")

# Display sample
print("\nSample cohort records:")
cohort_df.head()

✅ Loaded ICU cohort: 54509 hospitalizations
Mortality rate: 0.110
Time range: 2105-10-04 22:27:12+00:00 to 2214-05-03 22:09:18+00:00

Sample cohort records:


Unnamed: 0,hospitalization_id,start_dttm,hour_24_start_dttm,hour_24_end_dttm,disposition
0,25860671,2150-11-03 00:37:00+00:00,2150-11-03 00:37:00+00:00,2150-11-04 00:37:00+00:00,0
1,24597018,2157-11-21 00:18:02+00:00,2157-11-21 00:18:02+00:00,2157-11-22 00:18:02+00:00,0
2,25563031,2110-04-11 20:52:22+00:00,2110-04-11 20:52:22+00:00,2110-04-12 20:52:22+00:00,0
3,23581541,2160-05-18 15:00:53+00:00,2160-05-18 15:00:53+00:00,2160-05-19 15:00:53+00:00,0
4,27793700,2162-02-18 04:30:00+00:00,2162-02-18 04:30:00+00:00,2162-02-19 04:30:00+00:00,0


## Feature Extraction Configuration

In [5]:
# Define feature extraction configuration
print("Configuring feature extraction...")

# OPTION: Set to True for development/testing with smaller dataset
USE_SAMPLE_DATA = False  # Set to True to use sample for faster processing
SAMPLE_SIZE = 1000  # Number of hospitalizations to sample

# Get hospitalization IDs from cohort
if USE_SAMPLE_DATA:
    print(f"⚠️ Using sample data with {SAMPLE_SIZE} hospitalizations for testing")
    cohort_sample = cohort_df.sample(n=min(SAMPLE_SIZE, len(cohort_df)), random_state=42)
    cohort_ids = cohort_sample['hospitalization_id'].astype(str).unique().tolist()
    print(f"Sampled {len(cohort_ids)} hospitalizations from {len(cohort_df)} total")
else:
    cohort_ids = cohort_df['hospitalization_id'].astype(str).unique().tolist()
    print(f"Using full dataset: {len(cohort_ids)} hospitalizations")

# Define category filters for each table
category_filters = {
    'vitals': [  # Common vital signs
        'heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c',
        'weight_kg', 'height_cm'
    ],
    'labs': [  # Common lab values
        "albumin", "alkaline_phosphatase", "alt", "ast", "basophils_percent", "basophils_absolute", 
        "bicarbonate", "bilirubin_total", "bilirubin_conjugated", "bilirubin_unconjugated",
        "bun", "calcium_total", "calcium_ionized", "chloride", "creatinine", "crp", 
        "eosinophils_percent", "eosinophils_absolute", "esr", "ferritin", "glucose_f≠ingerstick", 
        "glucose_serum", "hemoglobin", "phosphate", "inr", "lactate", "ldh",
        "lymphocytes_percent", "lymphocytes_absolute", "magnesium", "monocytes_percent", 
        "monocytes_absolute", "neutrophils_percent", "neutrophils_absolute",
        "pco2_arterial", "po2_arterial", "pco2_venous", "ph_arterial", "ph_venous", 
        "platelet_count", "potassium", "procalcitonin", "pt", "ptt", 
        "so2_arterial", "so2_mixed_venous", "so2_central_venous", "sodium",
        "total_protein", "troponin_i", "troponin_t", "wbc"
    ],
    'medication_admin_continuous': [  # Vasoactives and sedatives
        "norepinephrine", "epinephrine", "phenylephrine", "angiotensin", "vasopressin",
        "dopamine", "dobutamine", "milrinone", "isoproterenol",
        "propofol", "dexmedetomidine", "ketamine", "midazolam", "fentanyl",
        "hydromorphone", "morphine", "remifentanil", "pentobarbital", "lorazepam"
    ],
    'respiratory_support': [  # All respiratory support categories
        'mode_category', 'device_category', 'fio2'
    ]
}

print("\nFeature extraction configuration:")
for table, categories in category_filters.items():
    print(f"  {table}: {len(categories)} categories")

print(f"\nExtracting features for {len(cohort_ids)} hospitalizations")

Configuring feature extraction...
Using full dataset: 54509 hospitalizations

Feature extraction configuration:
  vitals: 7 categories
  labs: 52 categories
  medication_admin_continuous: 19 categories
  respiratory_support: 3 categories

Extracting features for 54509 hospitalizations


## Create Wide Dataset Using pyCLIF

### Performance Optimization with Cohort Time Filtering

The `create_wide_dataset` function now supports an optional `cohort_df` parameter that allows filtering data to specific time windows **before** creating the wide dataset. This significantly improves performance and reduces memory usage when you only need data from specific time periods.

**Benefits:**
- Reduces data volume before pivoting operations
- Significantly lower memory usage
- Faster processing time
- Particularly useful for ICU mortality models where we only need the first 24 hours

**Required columns in cohort_df:**
- `hospitalization_id`: Unique identifier for each hospitalization
- `start_time`: Start of the time window (datetime)
- `end_time`: End of the time window (datetime)

In [6]:
# Create wide dataset for cohort hospitalizations
print("Creating wide dataset using pyCLIF...")

# Prepare cohort_df with required columns for time filtering
# This will significantly reduce memory usage by filtering data to only the 24-hour windows
cohort_time_filter = cohort_df[['hospitalization_id', 'hour_24_start_dttm', 'hour_24_end_dttm']].copy()
cohort_time_filter.columns = ['hospitalization_id', 'start_time', 'end_time']  # Rename to match expected columns

print(f"Using cohort_df time filtering for {len(cohort_time_filter)} hospitalizations")
print(f"This will filter data to 24-hour windows before creating the wide dataset")

wide_df = clif.create_wide_dataset(
    hospitalization_ids=cohort_ids,
    cohort_df=cohort_time_filter,  # Pass cohort_df for time window filtering
    category_filters=category_filters,  
    save_to_data_location=False,
    batch_size=10000,
    memory_limit='6GB',
    threads=4,
    show_progress=True
)

print(f"✅ Wide dataset created successfully")
print(f"Shape: {wide_df.shape}")
print(f"Hospitalizations: {wide_df['hospitalization_id'].nunique()}")
print(f"Date range: {wide_df['event_time'].min()} to {wide_df['event_time'].max()}")

Creating wide dataset using pyCLIF...
Using cohort_df time filtering for 54509 hospitalizations
This will filter data to 24-hour windows before creating the wide dataset
Auto-loading required base table: patient
Loading clif_patient.parquet
Data loaded successfully from clif_patient.parquet
Validation completed with 8 error(s). See `errors` attribute.
Auto-loading required base table: hospitalization
Loading clif_hospitalization.parquet
Data loaded successfully from clif_hospitalization.parquet
Validation completed with 1 error(s). See `errors` attribute.
Auto-loading required base table: adt
Loading clif_adt.parquet
Data loaded successfully from clif_adt.parquet
Validation completed with 3 error(s). See `errors` attribute.
Auto-loading table: vitals
Loading clif_vitals.parquet


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Data loaded successfully from clif_vitals.parquet
Validation completed with 9 error(s).
  - 9 range validation error(s)
See `errors` and `range_validation_errors` attributes for details.
Auto-loading table: labs
Loading clif_labs.parquet


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Data loaded successfully from clif_labs.parquet
Validation completed with 25 error(s).
  - 8 schema validation error(s)
  - 17 reference unit error(s)
See `errors` and `unit_validation_errors` attributes for details.
Auto-loading table: respiratory_support
Loading clif_respiratory_support.parquet
Data loaded successfully from clif_respiratory_support.parquet
Validation completed with 2 error(s). See `errors` attribute.
Auto-loading table: medication_admin_continuous
Loading clif_medication_admin_continuous.parquet
Data loaded successfully from clif_medication_admin_continuous.parquet
Validation completed with 4 error(s). See `errors` attribute.
Starting wide dataset creation...
Using cohort_df with time windows for 54509 hospitalizations
Filtering to specific hospitalization IDs: 54509 encounters

Loading and filtering base tables...
Base tables filtered - Hospitalization: 54509, Patient: 364627, ADT: 217179
Processing 54509 hospitalizations in batches of 10000


Processing batches:   0%|          | 0/6 [00:00<?, ?it/s]


Processing batch 1/6 (10000 hospitalizations)

=== Processing Tables ===
Base cohort created with 10000 records

Processing vitals...
Loaded 7657754 records from vitals
  Time filtering: 7657754 → 1746779 records
Filtering vitals categories to: ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c', 'weight_kg', 'height_cm']
Pivoted vitals: 400969 combo_ids with 7 category columns

Processing labs...
Loaded 2634760 records from labs
  Time filtering: 2634760 → 550776 records
Filtering labs categories to: ['albumin', 'alkaline_phosphatase', 'alt', 'ast', 'basophils_percent', 'basophils_absolute', 'bicarbonate', 'bilirubin_total', 'bilirubin_conjugated', 'bilirubin_unconjugated', 'bun', 'calcium_total', 'calcium_ionized', 'chloride', 'creatinine', 'crp', 'eosinophils_percent', 'eosinophils_absolute', 'esr', 'ferritin', 'glucose_f≠ingerstick', 'glucose_serum', 'hemoglobin', 'phosphate', 'inr', 'lactate', 'ldh', 'lymphocytes_percent', 'lymphocytes_absolute', 'magnesium', 'monocytes_pe

Processing batches:  17%|█▋        | 1/6 [00:47<03:55, 47.02s/it]


Processing batch 2/6 (10000 hospitalizations)

=== Processing Tables ===
Base cohort created with 10000 records

Processing vitals...
Loaded 7504295 records from vitals
  Time filtering: 7504295 → 1737265 records
Filtering vitals categories to: ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c', 'weight_kg', 'height_cm']
Pivoted vitals: 401585 combo_ids with 7 category columns

Processing labs...
Loaded 2625159 records from labs
  Time filtering: 2625159 → 546137 records
Filtering labs categories to: ['albumin', 'alkaline_phosphatase', 'alt', 'ast', 'basophils_percent', 'basophils_absolute', 'bicarbonate', 'bilirubin_total', 'bilirubin_conjugated', 'bilirubin_unconjugated', 'bun', 'calcium_total', 'calcium_ionized', 'chloride', 'creatinine', 'crp', 'eosinophils_percent', 'eosinophils_absolute', 'esr', 'ferritin', 'glucose_f≠ingerstick', 'glucose_serum', 'hemoglobin', 'phosphate', 'inr', 'lactate', 'ldh', 'lymphocytes_percent', 'lymphocytes_absolute', 'magnesium', 'monocytes_pe

Processing batches:  33%|███▎      | 2/6 [01:08<02:07, 31.94s/it]

Added missing column: eosinophils_absolute
Added missing column: glucose_f≠ingerstick
Added missing column: lymphocytes_absolute
Added missing column: monocytes_absolute
Added missing column: neutrophils_absolute
Added missing column: procalcitonin
Added missing column: troponin_i
Added missing column: isoproterenol
Added missing column: remifentanil
Added missing column: fio2
Wide dataset created: 679346 records with 91 columns
Batch 2 completed: 679346 records

Processing batch 3/6 (10000 hospitalizations)

=== Processing Tables ===
Base cohort created with 10000 records

Processing vitals...
Loaded 7515249 records from vitals
  Time filtering: 7515249 → 1739151 records
Filtering vitals categories to: ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c', 'weight_kg', 'height_cm']
Pivoted vitals: 399835 combo_ids with 7 category columns

Processing labs...
Loaded 2599708 records from labs
  Time filtering: 2599708 → 549518 records
Filtering labs categories to: ['albumin', 'alkal

Processing batches:  50%|█████     | 3/6 [01:28<01:20, 26.68s/it]

Added missing column: eosinophils_absolute
Added missing column: glucose_f≠ingerstick
Added missing column: lymphocytes_absolute
Added missing column: monocytes_absolute
Added missing column: neutrophils_absolute
Added missing column: procalcitonin
Added missing column: troponin_i
Added missing column: angiotensin
Added missing column: isoproterenol
Added missing column: remifentanil
Added missing column: fio2
Wide dataset created: 678341 records with 91 columns
Batch 3 completed: 678341 records

Processing batch 4/6 (10000 hospitalizations)

=== Processing Tables ===
Base cohort created with 10000 records

Processing vitals...
Loaded 7707424 records from vitals
  Time filtering: 7707424 → 1732914 records
Filtering vitals categories to: ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c', 'weight_kg', 'height_cm']
Pivoted vitals: 397802 combo_ids with 7 category columns

Processing labs...
Loaded 2625007 records from labs
  Time filtering: 2625007 → 543365 records
Filtering labs

Processing batches:  67%|██████▋   | 4/6 [01:50<00:49, 24.74s/it]

Added missing column: eosinophils_absolute
Added missing column: glucose_f≠ingerstick
Added missing column: lymphocytes_absolute
Added missing column: monocytes_absolute
Added missing column: neutrophils_absolute
Added missing column: procalcitonin
Added missing column: troponin_i
Added missing column: isoproterenol
Added missing column: remifentanil
Added missing column: lorazepam
Added missing column: fio2
Wide dataset created: 677298 records with 91 columns
Batch 4 completed: 677298 records

Processing batch 5/6 (10000 hospitalizations)

=== Processing Tables ===
Base cohort created with 10000 records

Processing vitals...
Loaded 7382649 records from vitals
  Time filtering: 7382649 → 1734866 records
Filtering vitals categories to: ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c', 'weight_kg', 'height_cm']
Pivoted vitals: 402462 combo_ids with 7 category columns

Processing labs...
Loaded 2559777 records from labs
  Time filtering: 2559777 → 537632 records
Filtering labs c

Processing batches:  83%|████████▎ | 5/6 [02:12<00:23, 23.57s/it]

Added missing column: eosinophils_absolute
Added missing column: glucose_f≠ingerstick
Added missing column: lymphocytes_absolute
Added missing column: monocytes_absolute
Added missing column: neutrophils_absolute
Added missing column: procalcitonin
Added missing column: troponin_i
Added missing column: isoproterenol
Added missing column: remifentanil
Added missing column: lorazepam
Added missing column: fio2
Wide dataset created: 675040 records with 91 columns
Batch 5 completed: 675040 records

Processing batch 6/6 (4509 hospitalizations)

=== Processing Tables ===
Base cohort created with 4509 records

Processing vitals...
Loaded 3508848 records from vitals
  Time filtering: 3508848 → 779939 records
Filtering vitals categories to: ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c', 'weight_kg', 'height_cm']
Pivoted vitals: 179605 combo_ids with 7 category columns

Processing labs...
Loaded 1198700 records from labs
  Time filtering: 1198700 → 244208 records
Filtering labs cate

Processing batches: 100%|██████████| 6/6 [02:23<00:00, 23.96s/it]

Added missing column: eosinophils_absolute
Added missing column: glucose_f≠ingerstick
Added missing column: lymphocytes_absolute
Added missing column: monocytes_absolute
Added missing column: neutrophils_absolute
Added missing column: procalcitonin
Added missing column: troponin_i
Added missing column: isoproterenol
Added missing column: remifentanil
Added missing column: fio2
Wide dataset created: 303809 records with 91 columns
Batch 6 completed: 303809 records

Combining 6 batch results...





Final dataset: 3694576 records with 91 columns
✅ Wide dataset created successfully
Shape: (3694576, 91)
Hospitalizations: 54509
Date range: 2105-10-04 16:27:12-06:00 to 2214-05-07 12:50:17-06:00


In [7]:
wide_df.columns

Index(['hospitalization_id', 'patient_id', 'age_at_admission', 'event_time',
       'mode_category', 'device_category', 'angiotensin', 'dexmedetomidine',
       'dobutamine', 'dopamine', 'epinephrine', 'fentanyl', 'hydromorphone',
       'ketamine', 'lorazepam', 'midazolam', 'milrinone', 'morphine',
       'norepinephrine', 'phenylephrine', 'propofol', 'vasopressin', 'albumin',
       'alkaline_phosphatase', 'alt', 'ast', 'basophils_absolute',
       'basophils_percent', 'bicarbonate', 'bilirubin_conjugated',
       'bilirubin_total', 'bilirubin_unconjugated', 'bun', 'calcium_ionized',
       'calcium_total', 'chloride', 'creatinine', 'crp', 'eosinophils_percent',
       'esr', 'ferritin', 'glucose_serum', 'hemoglobin', 'inr', 'lactate',
       'ldh', 'lymphocytes_percent', 'magnesium', 'monocytes_percent',
       'neutrophils_percent', 'pco2_arterial', 'pco2_venous', 'ph_arterial',
       'ph_venous', 'phosphate', 'platelet_count', 'po2_arterial', 'potassium',
       'pt', 'ptt', 'so2

In [8]:
wide_df[[ 'angiotensin', 'dexmedetomidine',
       'dobutamine', 'dopamine', 'epinephrine', 'fentanyl', 'hydromorphone',
       'ketamine', 'lorazepam', 'midazolam', 'milrinone', 'morphine',
       'norepinephrine', 'pentobarbital', 'phenylephrine', 'propofol',
       'vasopressin']].describe()

Unnamed: 0,angiotensin,dexmedetomidine,dobutamine,dopamine,epinephrine,fentanyl,hydromorphone,ketamine,lorazepam,midazolam,milrinone,morphine,norepinephrine,pentobarbital,phenylephrine,propofol,vasopressin
count,55.0,16110.0,1959.0,7394.0,10732.0,36743.0,605.0,954.0,32.0,17197.0,1286.0,561.0,98877.0,58.0,88924.0,125945.0,6486.0
mean,15.205185,0.593659,4.568852,7.20413,0.106587,121.776761,2.290408,1.483141,1.88719,3.718082,0.439542,6.327015,0.150532,40.639446,0.998636,37.925861,2.07293
std,18.681054,0.599438,3.916653,49.01995,0.510442,1495.878735,2.645807,5.50927,1.420547,38.330868,3.12141,7.344227,0.395288,207.083008,4.179108,512.61675,7.858685
min,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.657216,0.0,0.0
25%,0.027509,0.300595,2.499743,2.506073,0.015016,25.000002,0.5,0.152991,0.875,1.0,0.249975,2.0,0.049994,0.339147,0.30003,19.82017,1.2
50%,10.024252,0.592681,4.004278,5.003784,0.030084,50.064121,1.500154,0.300311,2.0,2.0,0.251238,4.001383,0.100018,2.004924,0.599865,30.155683,2.4
75%,20.021649,0.802063,5.021847,9.97161,0.09548,100.056763,3.090909,0.500701,2.629917,4.000432,0.394384,8.002672,0.200139,3.676606,1.198607,50.020006,2.400814
max,83.689354,38.726226,30.218054,4000.0,41.142862,75000.03125,23.999998,92.526512,4.032787,3370.979736,99.206001,82.946678,61.439026,1430.0,833.333254,135793.85376,428.571411


In [9]:
# Safely inspect a subset of the data to avoid memory issues
print("Inspecting data sample...")

# Check dataset size first
print(f"Wide dataset shape: {wide_df.shape}")
print(f"Memory usage: {wide_df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

# Sample a specific hospitalization safely
sample_hosp_id = wide_df['hospitalization_id'].iloc[0]
print(f"Examining data for hospitalization: {sample_hosp_id}")

try:
    # Use query method which is more memory efficient for large datasets
    temp = wide_df.query(f"hospitalization_id == '{sample_hosp_id}'")
    print(f"Records for {sample_hosp_id}: {len(temp)}")
    
    if len(temp) > 0:
        print("Time range:", temp['event_time'].min(), "to", temp['event_time'].max())
        print("Columns with data:", (temp.notna().sum() > 0).sum())
    
except Exception as e:
    print(f"Error inspecting data: {str(e)}")
    print("Dataset might be too large for this operation")

Inspecting data sample...
Wide dataset shape: (3694576, 91)
Memory usage: 3350.6 MB
Examining data for hospitalization: 20001770
Records for 20001770: 63
Time range: 2117-01-25 13:23:00-06:00 to 2117-01-28 16:35:58-06:00
Columns with data: 37


In [10]:
# Define aggregation configuration - FIXED to match available columns
print("Defining aggregation configuration...")

# Build aggregation config based on what we actually have
aggregation_config = {
    # Apply multiple aggregations to vital signs and labs that are actually present
    "max": [col for col in category_filters['vitals'] + category_filters['labs'] if col in wide_df.columns],
    "min": [col for col in category_filters['vitals'] + category_filters['labs'] if col in wide_df.columns],
   # "mean": [col for col in category_filters['vitals'] + category_filters['labs'] if col in wide_df.columns],
    "median": [col for col in category_filters['vitals'] + category_filters['labs'] if col in wide_df.columns],
    # Boolean aggregation for medications (1 if present in hour, 0 otherwise)
    "boolean": [col for col in category_filters['medication_admin_continuous'] if col in wide_df.columns],
    # One-hot encode categorical respiratory support columns
    "one_hot_encode": [col for col in ["mode_category", "device_category"] if col in wide_df.columns]
}

# Print what will actually be aggregated
print("Aggregation configuration:")
for method, cols in aggregation_config.items():
    print(f"  {method}: {len(cols)} columns")
    if len(cols) <= 10:
        print(f"    {cols}")
    else:
        print(f"    {cols[:5]}...{cols[-2:]} (showing first 5 and last 2)")

# Convert to hourly using optimized DuckDB function
print(f"\nProcessing {len(wide_df):,} records to hourly aggregation...")

hourly_df = convert_wide_to_hourly(
    wide_df, 
    aggregation_config, 
    memory_limit='6GB',      # Set memory limit for DuckDB
    batch_size=10000          # Process in batches for large datasets
)

print("✅ Hourly aggregation completed!")

Defining aggregation configuration...
Aggregation configuration:
  max: 59 columns
    ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c']...['troponin_t', 'wbc'] (showing first 5 and last 2)
  min: 59 columns
    ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c']...['troponin_t', 'wbc'] (showing first 5 and last 2)
  median: 59 columns
    ['heart_rate', 'map', 'respiratory_rate', 'spo2', 'temp_c']...['troponin_t', 'wbc'] (showing first 5 and last 2)
  boolean: 19 columns
    ['norepinephrine', 'epinephrine', 'phenylephrine', 'angiotensin', 'vasopressin']...['pentobarbital', 'lorazepam'] (showing first 5 and last 2)
  one_hot_encode: 2 columns
    ['mode_category', 'device_category']

Processing 3,694,576 records to hourly aggregation...
Starting optimized hourly aggregation using DuckDB...
Input dataset shape: (3694576, 91)
Memory limit: 6GB
Processing in batches of 10000 hospitalizations...


Processing batch 1/6:   0%|          | 0/6 [00:00<?, ?batch/s]


--- Batch 1/6 (10000 hospitalizations) ---
Creating hourly buckets...
Calculating nth_hour...
Columns not in aggregation_config, defaulting to 'first' with '_c' postfix:
  - age_at_admission
  - hospital_id
  - in_dttm
  - out_dttm
  - location_category
  - hosp_id_day_key
  - fio2

Processing aggregations by type:
- Extracting base columns...
- Processing max aggregation...
  ✓ max complete (59 columns)
- Processing min aggregation...
  ✓ min complete (59 columns)
- Processing median aggregation...
  ✓ median complete (59 columns)
- Processing first aggregation...
  ✓ first complete (7 columns)
- Processing boolean aggregation...
  ✓ boolean complete (19 columns)
- Processing one_hot_encode aggregation...
  ✓ one_hot_encode complete (16 columns)

Merging aggregation results...


Processing batch 2/6:  17%|█▋        | 1/6 [00:03<00:15,  3.16s/batch]


Hourly aggregation complete: 265906 hourly records
Columns in hourly dataset: 225
Batch 1 completed: 265906 records

--- Batch 2/6 (10000 hospitalizations) ---
Creating hourly buckets...
Calculating nth_hour...
Columns not in aggregation_config, defaulting to 'first' with '_c' postfix:
  - age_at_admission
  - hospital_id
  - in_dttm
  - out_dttm
  - location_category
  - hosp_id_day_key
  - fio2

Processing aggregations by type:
- Extracting base columns...
- Processing max aggregation...
  ✓ max complete (59 columns)
- Processing min aggregation...
  ✓ min complete (59 columns)
- Processing median aggregation...
  ✓ median complete (59 columns)
- Processing first aggregation...
  ✓ first complete (7 columns)
- Processing boolean aggregation...
  ✓ boolean complete (19 columns)
- Processing one_hot_encode aggregation...
  ✓ one_hot_encode complete (16 columns)

Merging aggregation results...


Processing batch 3/6:  33%|███▎      | 2/6 [00:06<00:12,  3.24s/batch]


Hourly aggregation complete: 265793 hourly records
Columns in hourly dataset: 225
Batch 2 completed: 265793 records

--- Batch 3/6 (10000 hospitalizations) ---
Creating hourly buckets...
Calculating nth_hour...
Columns not in aggregation_config, defaulting to 'first' with '_c' postfix:
  - age_at_admission
  - hospital_id
  - in_dttm
  - out_dttm
  - location_category
  - hosp_id_day_key
  - fio2

Processing aggregations by type:
- Extracting base columns...
- Processing max aggregation...
  ✓ max complete (59 columns)
- Processing min aggregation...
  ✓ min complete (59 columns)
- Processing median aggregation...
  ✓ median complete (59 columns)
- Processing first aggregation...
  ✓ first complete (7 columns)
- Processing boolean aggregation...
  ✓ boolean complete (19 columns)
- Processing one_hot_encode aggregation...
  ✓ one_hot_encode complete (16 columns)

Merging aggregation results...


Processing batch 4/6:  50%|█████     | 3/6 [00:09<00:09,  3.24s/batch]


Hourly aggregation complete: 265269 hourly records
Columns in hourly dataset: 225
Batch 3 completed: 265269 records

--- Batch 4/6 (10000 hospitalizations) ---
Creating hourly buckets...
Calculating nth_hour...
Columns not in aggregation_config, defaulting to 'first' with '_c' postfix:
  - age_at_admission
  - hospital_id
  - in_dttm
  - out_dttm
  - location_category
  - hosp_id_day_key
  - fio2

Processing aggregations by type:
- Extracting base columns...
- Processing max aggregation...
  ✓ max complete (59 columns)
- Processing min aggregation...
  ✓ min complete (59 columns)
- Processing median aggregation...
  ✓ median complete (59 columns)
- Processing first aggregation...
  ✓ first complete (7 columns)
- Processing boolean aggregation...
  ✓ boolean complete (19 columns)
- Processing one_hot_encode aggregation...
  ✓ one_hot_encode complete (16 columns)

Merging aggregation results...


Processing batch 5/6:  67%|██████▋   | 4/6 [00:12<00:06,  3.21s/batch]


Hourly aggregation complete: 265172 hourly records
Columns in hourly dataset: 225
Batch 4 completed: 265172 records

--- Batch 5/6 (10000 hospitalizations) ---
Creating hourly buckets...
Calculating nth_hour...
Columns not in aggregation_config, defaulting to 'first' with '_c' postfix:
  - age_at_admission
  - hospital_id
  - in_dttm
  - out_dttm
  - location_category
  - hosp_id_day_key
  - fio2

Processing aggregations by type:
- Extracting base columns...
- Processing max aggregation...
  ✓ max complete (59 columns)
- Processing min aggregation...
  ✓ min complete (59 columns)
- Processing median aggregation...
  ✓ median complete (59 columns)
- Processing first aggregation...
  ✓ first complete (7 columns)
- Processing boolean aggregation...
  ✓ boolean complete (19 columns)
- Processing one_hot_encode aggregation...
  ✓ one_hot_encode complete (15 columns)

Merging aggregation results...


Processing batch 6/6:  83%|████████▎ | 5/6 [00:15<00:03,  3.18s/batch]


Hourly aggregation complete: 265954 hourly records
Columns in hourly dataset: 224
Batch 5 completed: 265954 records

--- Batch 6/6 (4509 hospitalizations) ---
Creating hourly buckets...
Calculating nth_hour...
Columns not in aggregation_config, defaulting to 'first' with '_c' postfix:
  - age_at_admission
  - hospital_id
  - in_dttm
  - out_dttm
  - location_category
  - hosp_id_day_key
  - fio2

Processing aggregations by type:
- Extracting base columns...
- Processing max aggregation...
  ✓ max complete (59 columns)
- Processing min aggregation...
  ✓ min complete (59 columns)
- Processing median aggregation...
  ✓ median complete (59 columns)
- Processing first aggregation...
  ✓ first complete (7 columns)
- Processing boolean aggregation...
  ✓ boolean complete (19 columns)
- Processing one_hot_encode aggregation...
  ✓ one_hot_encode complete (15 columns)

Merging aggregation results...


Processing batch 6/6: 100%|██████████| 6/6 [00:17<00:00,  2.93s/batch]


Hourly aggregation complete: 119783 hourly records
Columns in hourly dataset: 224
Batch 6 completed: 119783 records

Combining 6 batch results...





Final hourly dataset: 1447877 records from 6 batches
✅ Hourly aggregation completed!


In [11]:
# Performance and Results Summary
print("=== Hourly Aggregation Results ===")
print(f"✅ Processing complete!")
print(f"Input wide dataset: {wide_df.shape[0]:,} records")
print(f"Output hourly dataset: {hourly_df.shape[0]:,} records") 
print(f"Columns in hourly dataset: {hourly_df.shape[1]}")
print(f"Compression ratio: {wide_df.shape[0] / hourly_df.shape[0]:.1f}x fewer records")

# Show hourly distribution
hourly_stats = hourly_df.groupby('nth_hour').size()
print(f"\nHourly record distribution:")
print(f"  Hours covered: 0 to {hourly_stats.index.max()}")
print(f"  Average records per hour: {hourly_stats.mean():.0f}")
print(f"  Records in first 24 hours: {hourly_stats[hourly_stats.index < 24].sum():,}")

# Show sample of output columns
print(f"\nSample of aggregated columns:")
agg_columns = [col for col in hourly_df.columns if any(col.endswith(suffix) for suffix in ['_max', '_min', '_mean', '_boolean'])]
for col in agg_columns[:10]:
    non_null_count = hourly_df[col].notna().sum()
    print(f"  {col}: {non_null_count:,} non-null values")

=== Hourly Aggregation Results ===
✅ Processing complete!
Input wide dataset: 3,694,576 records
Output hourly dataset: 1,447,877 records
Columns in hourly dataset: 225
Compression ratio: 2.6x fewer records

Hourly record distribution:
  Hours covered: 0 to 9244
  Average records per hour: 857
  Records in first 24 hours: 915,722

Sample of aggregated columns:
  heart_rate_max: 1,231,954 non-null values
  map_max: 1,211,859 non-null values
  respiratory_rate_max: 1,225,913 non-null values
  spo2_max: 1,217,815 non-null values
  temp_c_max: 387,162 non-null values
  weight_kg_max: 74,576 non-null values
  height_cm_max: 935 non-null values
  albumin_max: 20,848 non-null values
  alkaline_phosphatase_max: 36,736 non-null values
  alt_max: 37,372 non-null values


In [12]:
# Note: This filtering step is now redundant if cohort_df was used in create_wide_dataset
# The data is already filtered to the 24-hour windows during the wide dataset creation
# However, we'll keep this for backward compatibility and verification

# Filter wide dataset to 24-hour windows
print("Filtering to 24-hour windows for event wide data...: Shape:", wide_df.shape)
cohort_df['hospitalization_id'] = cohort_df['hospitalization_id'].astype(str)
# Merge with cohort to get time windows
wide_df_filtered = pd.merge(
    wide_df,
    cohort_df[['hospitalization_id', 'hour_24_start_dttm', 'hour_24_end_dttm', 'disposition']],
    on='hospitalization_id',
    how='inner'
)

print(f"After merge with cohort: {len(wide_df_filtered)} records")

print(f"✅ Filtered to 24-hour windows: {len(wide_df_filtered)} records")
print(f"Hospitalizations with data: {wide_df_filtered['hospitalization_id'].nunique()}")

# Show time window validation
print("\nTime window validation:")
print(f"All events within window: {((wide_df_filtered['event_time'] >= wide_df_filtered['hour_24_start_dttm']) & (wide_df_filtered['event_time'] <= wide_df_filtered['hour_24_end_dttm'])).all()}")
print(f"Average records per hospitalization: {len(wide_df_filtered) / wide_df_filtered['hospitalization_id'].nunique():.1f}")
print('Shape: after filtering:', wide_df_filtered.shape)

wide_df_filtered.to_parquet(os.path.join(output_dir, 'by_event_wide_df.parquet'), index=False)

Filtering to 24-hour windows for event wide data...: Shape: (3694576, 91)
After merge with cohort: 3694576 records
✅ Filtered to 24-hour windows: 3694576 records
Hospitalizations with data: 54509

Time window validation:
All events within window: False
Average records per hospitalization: 67.8
Shape: after filtering: (3694576, 94)


In [13]:
# Filter hourly dataset to 24-hour windows
print("\nFiltering hourly dataset to 24-hour windows...| Shape:",hourly_df.shape)
# Merge with cohort to get time windows
hourly_df_filtered = pd.merge(
    hourly_df,
    cohort_df[['hospitalization_id', 'hour_24_start_dttm', 'hour_24_end_dttm', 'disposition']],
    on='hospitalization_id',
    how='inner'
)

print(f"After merge with cohort: {len(hourly_df_filtered)} records")

print(f"✅ Filtered hourly dataset to 24-hour windows: {len(hourly_df_filtered)} records")
print(f"Hospitalizations with data in hourly dataset: {hourly_df_filtered['hospitalization_id'].nunique()}")

# Show time window validation for hourly dataset
print("\nTime window validation for hourly dataset:")
print(f"All events within window: {((hourly_df_filtered['event_time_hour'] >= hourly_df_filtered['hour_24_start_dttm']) & (hourly_df_filtered['event_time_hour'] <= hourly_df_filtered['hour_24_end_dttm'])).all()}")
print(f"Average records per hospitalization: {len(hourly_df_filtered) / hourly_df_filtered['hospitalization_id'].nunique():.1f}")

print('Shape:', hourly_df_filtered.shape)
hourly_df_filtered.to_parquet(os.path.join(output_dir, 'by_hourly_wide_df.parquet'), index=False)


Filtering hourly dataset to 24-hour windows...| Shape: (1447877, 225)
After merge with cohort: 1447877 records
✅ Filtered hourly dataset to 24-hour windows: 1447877 records
Hospitalizations with data in hourly dataset: 54509

Time window validation for hourly dataset:
All events within window: False
Average records per hospitalization: 26.6
Shape: (1447877, 228)


In [14]:
hourly_df_filtered.columns.tolist()


['hospitalization_id',
 'event_time_hour',
 'nth_hour',
 'hour_bucket',
 'patient_id',
 'day_number',
 'heart_rate_max',
 'map_max',
 'respiratory_rate_max',
 'spo2_max',
 'temp_c_max',
 'weight_kg_max',
 'height_cm_max',
 'albumin_max',
 'alkaline_phosphatase_max',
 'alt_max',
 'ast_max',
 'basophils_percent_max',
 'basophils_absolute_max',
 'bicarbonate_max',
 'bilirubin_total_max',
 'bilirubin_conjugated_max',
 'bilirubin_unconjugated_max',
 'bun_max',
 'calcium_total_max',
 'calcium_ionized_max',
 'chloride_max',
 'creatinine_max',
 'crp_max',
 'eosinophils_percent_max',
 'eosinophils_absolute_max',
 'esr_max',
 'ferritin_max',
 'glucose_f≠ingerstick_max',
 'glucose_serum_max',
 'hemoglobin_max',
 'phosphate_max',
 'inr_max',
 'lactate_max',
 'ldh_max',
 'lymphocytes_percent_max',
 'lymphocytes_absolute_max',
 'magnesium_max',
 'monocytes_percent_max',
 'monocytes_absolute_max',
 'neutrophils_percent_max',
 'neutrophils_absolute_max',
 'pco2_arterial_max',
 'po2_arterial_max',
 'pc