# MIMIC-III Data Acquisition Pipeline

**Purpose:** This notebook acquires raw medical data from Google BigQuery's MIMIC-III database.

**What it does:**
- Connects to BigQuery and verifies access
- Executes a comprehensive SQL query to extract:
  - 5,000 discharge summaries (clinical notes)
  - Patient demographics (age, gender, ethnicity, insurance)
  - Lab results and abnormal flags
  - Diagnosis codes (ICD-9)
- Saves everything to a single CSV file: `mimic_discharge_labs.csv`

**What it does NOT do:**
- Data cleaning (handled by preprocessing script)
- Data validation (handled by validation script)
- Bias detection (handled by bias detection script)

**Output:** `data/raw/mimic_discharge_labs.csv` (~95 MB, ~9,700 records, 18 columns)


## 1. Import Required Libraries

We need:
- `bigquery`: To query Google's MIMIC-III database
- `InstalledAppFlow`: For interactive Google authentication
- `pandas`: To handle the resulting data
- `os` & `json`: For file paths and configuration management

In [12]:
# Cell 1: Import libraries
from google.cloud import bigquery
from google_auth_oauthlib.flow import InstalledAppFlow
import pandas as pd
import os
import json

## 2. Load Configuration & Setup Paths

**Why this matters:**
- We use a `pipeline_config.json` file to specify where data should be saved
- This makes the notebook **portable** - it works on any machine without hardcoding paths
- The script automatically creates the `data/raw/` directory if it doesn't exist

**What happens here:**
1. Find the project root (2 directories up from `notebooks/`)
2. Load `pipeline_config.json`
3. Convert the relative path from config into an absolute path
4. Create the directory if needed

**Result:** The variable `RAW_DATA_DIR` now points to the correct location to save our data.

In [13]:
# Cell 2: Setup - Load config and convert relative paths to absolute
# Find project root (2 levels up from notebooks/)
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), '../..'))

# Load config
config_path = os.path.join(PROJECT_ROOT, 'data-pipeline/configs/pipeline_config.json')
with open(config_path, 'r') as f:
    config = json.load(f)

# Convert relative config path to absolute path
RAW_DATA_DIR = os.path.join(PROJECT_ROOT, config['pipeline_config']['input_path'])
os.makedirs(RAW_DATA_DIR, exist_ok=True)

print(f"✓ Project root: {PROJECT_ROOT}")
print(f"✓ Data directory: {RAW_DATA_DIR}")

✓ Project root: /Users/Admin/Desktop/lab-lens
✓ Data directory: /Users/Admin/Desktop/lab-lens/data-pipeline/data/raw


## 3. Authenticate with Google BigQuery

**What happens:**
1. A browser window opens asking you to log in to Google
2. You grant permission for this script to access BigQuery
3. The script receives credentials and creates a `client` object

**Security note:** 
- The `client_id` and `client_secret` shown here are for the Google OAuth app, not your personal credentials
- Your actual credentials are stored locally and never shared

**Project:** 'Project_ID' (our GCP project with access to MIMIC-III)

In [14]:
# Cell 3: Authenticate with BigQuery
flow = InstalledAppFlow.from_client_config(
    {
        "installed": {
            "client_id": "764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com",
            "client_secret": "d-FL95Q19q7MQmFpd7hHD0Ty",
            "auth_uri": "https://accounts.google.com/o/oauth2/auth",
            "token_uri": "https://oauth2.googleapis.com/token",
            "redirect_uris": ["http://localhost"]
        }
    },
    scopes=["https://www.googleapis.com/auth/bigquery"]
)

credentials = flow.run_local_server(port=8081, prompt='consent')
client = bigquery.Client(project='regal-bonito-455919-u3', credentials=credentials)
print("✓ Connected to BigQuery!")

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8081%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&state=NUikgHvo8PSxpT5ETsunoIjsBe1ukI&prompt=consent&access_type=offline
✓ Connected to BigQuery!


## 4. Verify Connection to MIMIC-III Database

**Purpose:** Before running expensive queries, we verify the connection works.

**What we're checking:**
- Can we connect to BigQuery?
- Do we have access to the MIMIC-III database?
- Is the `admissions` table accessible?

**Expected result:** 58,976 total admissions in the database

**Why this matters:** 
- BigQuery charges per query, so we want to catch connection issues early
- If this fails, we know there's an authentication or permissions problem before wasting time

In [15]:
# Cell 4: Test connection with simple query
test_query = """
SELECT COUNT(*) as count
FROM `physionet-data.mimiciii_clinical.admissions`
"""

try:
    df_test = client.query(test_query).to_dataframe()
    print(f"✓ Connection verified: {df_test['count'][0]:,} total admissions in database")
except Exception as e:
    print(f"✗ Connection failed: {e}")
    raise

✓ Connection verified: 58,976 total admissions in database


## 5. Document Available Tables

**Purpose:** List all tables in the `mimiciii_clinical` dataset for reference.

**Key tables we'll use:**
- `admissions` - Hospital admission records
- `patients` - Patient demographics (gender, date of birth)
- `labevents` - Laboratory test results
- `diagnoses_icd` - ICD-9 diagnosis codes
- `d_labitems` - Lab test definitions (what each test means)

**Why list them:** 
- Documentation for future queries
- Verify we have access to all required tables
- Quick reference without checking BigQuery console

In [16]:
# Cell 5: List available tables in mimiciii_clinical
print("Available tables in mimiciii_clinical:")
tables = client.list_tables('physionet-data.mimiciii_clinical')
for table in tables:
    print(f"  - {table.table_id}")

Available tables in mimiciii_clinical:
  - admissions
  - callout
  - caregivers
  - chartevents
  - cptevents
  - d_cpt
  - d_icd_diagnoses
  - d_icd_procedures
  - d_items
  - d_labitems
  - datetimeevents
  - diagnoses_icd
  - drgcodes
  - icustays
  - inputevents_cv
  - inputevents_mv
  - labevents
  - microbiologyevents
  - outputevents
  - patients
  - prescriptions
  - procedureevents_mv
  - procedures_icd
  - services
  - transfers


## 6. Verify Access to All Required Datasets

**MIMIC-III is split into 4 datasets:**
1. `mimiciii_clinical` - Structured clinical data (labs, admissions, diagnoses)
2. `mimiciii_notes` - Clinical notes (discharge summaries, radiology reports)
3. `mimiciii_derived` - Pre-calculated metrics
4. `mimiciii_notes_derived` - Processed note data

**What we're checking:**
- ✓ Do we have access to `mimiciii_clinical`? (for demographics, labs, diagnoses)
- ✓ Do we have access to `mimiciii_notes`? (for discharge summaries)

**Why this matters:**
- Our query needs BOTH datasets
- If either is missing, the main query will fail
- This "fail-fast" approach saves time by catching issues before the expensive query runs

In [17]:
# Cell 6: Verify access to all MIMIC-III datasets
datasets = list(client.list_datasets('physionet-data'))
mimic_datasets = [d for d in datasets if 'mimic' in d.dataset_id.lower()]

print("Available MIMIC-III datasets:")
for dataset in mimic_datasets:
    print(f"  ✓ {dataset.dataset_id}")

# Verify required datasets exist
required = ['mimiciii_clinical', 'mimiciii_notes']
available = [d.dataset_id for d in mimic_datasets]
for req in required:
    if req not in available:
        raise ValueError(f"Required dataset '{req}' not found!")

Available MIMIC-III datasets:
  ✓ mimiciii_clinical
  ✓ mimiciii_derived
  ✓ mimiciii_notes
  ✓ mimiciii_notes_derived


## 7. Execute Comprehensive Data Acquisition Query

**This is the main data extraction.** One complex SQL query joins multiple tables to create our complete dataset.

### The Query Has 4 Parts (CTEs):

#### Part 1: `discharge` CTE
- Extracts 5,000 discharge summaries from `noteevents`
- Removes PHI placeholders: `[**Name**]` → (blank)
- Calculates text length for each note

#### Part 2: `demographics` CTE
- Joins `patients` and `admissions` tables
- Extracts: gender, ethnicity, insurance, language, admission type
- Calculates: age at admission, length of stay

#### Part 3: `labs` CTE
- Aggregates lab results from `labevents`
- Creates a summary string of up to 20 lab values per patient
- Flags abnormal results with `(!)`
- Counts total labs and abnormal labs

#### Part 4: `diagnoses` CTE
- Aggregates diagnosis codes (ICD-9) per admission
- Counts total diagnoses
- Lists up to 10 diagnosis codes per patient

### Final Join:
- Combines all 4 parts using `hadm_id` (hospital admission ID)
- Uses LEFT JOIN to keep all discharge summaries, even if labs/diagnoses are missing

### Expected Result:
- ~9,700 records (some patients have multiple admissions)
- 18 columns covering demographics, clinical notes, labs, and diagnoses
- Average note length: ~10,500 characters

**Why one big query?**
- More efficient than multiple queries
- BigQuery charges per query, so this saves money
- Ensures data consistency (all from the same point in time)

In [18]:
# Cell 7: Execute comprehensive acquisition query
query = r"""
WITH discharge AS (
    SELECT 
        n.hadm_id,
        n.subject_id,
        REGEXP_REPLACE(n.text, r'\[\*\*[^\]]*\*\*\]', '') AS cleaned_text,
        LENGTH(n.text) as text_length
    FROM `physionet-data.mimiciii_notes.noteevents` n
    WHERE n.category = 'Discharge summary'
    LIMIT 5000
),
demographics AS (
    SELECT 
        p.subject_id,
        p.gender,
        a.ethnicity,
        a.insurance,
        a.language,
        a.marital_status,
        a.admission_type,
        a.admittime,
        a.dischtime,
        DATETIME_DIFF(a.admittime, p.dob, YEAR) as age_at_admission,
        DATETIME_DIFF(a.dischtime, a.admittime, DAY) as length_of_stay
    FROM `physionet-data.mimiciii_clinical.patients` p
    JOIN `physionet-data.mimiciii_clinical.admissions` a 
        ON p.subject_id = a.subject_id
),
labs AS (
    SELECT 
        le.hadm_id,
        STRING_AGG(
            CONCAT(d.label, ': ', CAST(le.value AS STRING), ' ', le.valueuom,
            CASE WHEN le.flag = 'abnormal' THEN ' (!)' ELSE '' END), '; '
            LIMIT 20
        ) AS lab_summary,
        COUNT(*) as total_labs,
        SUM(CASE WHEN le.flag = 'abnormal' THEN 1 ELSE 0 END) AS abnormal_count
    FROM `physionet-data.mimiciii_clinical.labevents` le
    JOIN `physionet-data.mimiciii_clinical.d_labitems` d ON le.itemid = d.itemid
    GROUP BY le.hadm_id
),
diagnoses AS (
    SELECT 
        hadm_id,
        COUNT(*) as diagnosis_count,
        STRING_AGG(icd9_code, ', ' LIMIT 10) as top_diagnoses
    FROM `physionet-data.mimiciii_clinical.diagnoses_icd`
    GROUP BY hadm_id
)
SELECT 
    d.*,
    dem.gender,
    dem.ethnicity,
    dem.age_at_admission,
    dem.insurance,
    dem.language,
    dem.admission_type,
    dem.admittime,
    dem.dischtime,
    dem.length_of_stay,
    l.lab_summary,
    l.total_labs,
    l.abnormal_count,
    diag.diagnosis_count,
    diag.top_diagnoses
FROM discharge d
LEFT JOIN demographics dem ON d.subject_id = dem.subject_id
LEFT JOIN labs l ON d.hadm_id = l.hadm_id
LEFT JOIN diagnoses diag ON d.hadm_id = diag.hadm_id
"""

print("Running comprehensive acquisition query...")
print("This may take several minutes...\n")

try:
    df_complete = client.query(query).to_dataframe()
    print(f"✓ Query completed successfully")
    print(f"✓ Loaded {len(df_complete):,} records")
    print(f"✓ Average text length: {df_complete['text_length'].mean():.0f} characters")
except Exception as e:
    print(f"✗ Query failed: {e}")
    raise

Running comprehensive acquisition query...
This may take several minutes...

✓ Query completed successfully
✓ Loaded 9,651 records
✓ Average text length: 10429 characters


## 8. Save Raw Data to CSV

**What happens:**
1. DataFrame is written to: `data/raw/mimic_discharge_labs.csv`
2. File size is calculated and displayed
3. Confirmation message shows the exact file location

**Expected output:**
- File size: ~95 MB
- Records: ~9,700
- Columns: 18

**Error handling:** If save fails (disk full, permission denied), the script will stop and show the error.

In [19]:
# Cell 8: Save raw data
output_path = os.path.join(RAW_DATA_DIR, 'mimic_discharge_labs.csv')

try:
    df_complete.to_csv(output_path, index=False)
    file_size_mb = os.path.getsize(output_path) / 1024 / 1024
    print(f"✓ Data saved successfully")
    print(f"✓ Location: {output_path}")
    print(f"✓ File size: {file_size_mb:.2f} MB")
except Exception as e:
    print(f"✗ Save failed: {e}")
    raise

✓ Data saved successfully
✓ Location: /Users/Admin/Desktop/lab-lens/data-pipeline/data/raw/mimic_discharge_labs.csv
✓ File size: 93.33 MB


In [20]:
print(df_complete.columns.tolist())

['hadm_id', 'subject_id', 'cleaned_text', 'text_length', 'gender', 'ethnicity', 'age_at_admission', 'insurance', 'language', 'admission_type', 'admittime', 'dischtime', 'length_of_stay', 'lab_summary', 'total_labs', 'abnormal_count', 'diagnosis_count', 'top_diagnoses']


## 9. Acquisition Verification Report

**Purpose:** Confirm the data was acquired correctly before moving to the next pipeline stage.

### What We're Verifying:

#### ✓ Basic Metrics
- Total records acquired
- Number of columns
- File size on disk

#### ✓ Schema Validation
We check for all 18 expected columns:

**Identifiers:**
- `hadm_id`, `subject_id`

**Clinical Text:**
- `cleaned_text`, `text_length`

**Demographics:**
- `gender`, `ethnicity`, `age_at_admission`, `insurance`, `language`, `admission_type`

**Temporal Data:**
- `admittime`, `dischtime`, `length_of_stay`

**Lab Data:**
- `lab_summary`, `total_labs`, `abnormal_count`

**Diagnosis Data:**
- `diagnosis_count`, `top_diagnoses`

#### ✓ Column Validation
- ✗ Missing columns → Indicates query problem
- ⚠ Extra columns → Indicates schema drift

### If Everything Passes:
✓ **Acquisition complete! Ready for preprocessing.**

The data is now ready for the next pipeline stage.

In [21]:
# Cell 9: Verify acquisition completed successfully
print("\n" + "="*60)
print("ACQUISITION VERIFICATION REPORT")
print("="*60)

# Basic metrics
print(f"\n✓ Records acquired: {len(df_complete):,}")
print(f"✓ Columns: {len(df_complete.columns)}")
print(f"✓ File size: {os.path.getsize(output_path)/1024/1024:.2f} MB")

# Schema check
print(f"\n✓ Schema:")
expected_columns = [
    'hadm_id', 'subject_id', 'cleaned_text', 'text_length',
    'gender', 'ethnicity', 'age_at_admission', 'insurance', 'language',
    'admission_type', 'admittime', 'dischtime', 'length_of_stay',
    'lab_summary', 'total_labs', 'abnormal_count',
    'diagnosis_count', 'top_diagnoses'
]

for col in expected_columns:
    status = "✓" if col in df_complete.columns else "✗"
    print(f"  {status} {col}")

# Missing columns check
missing = set(expected_columns) - set(df_complete.columns)
if missing:
    print(f"\n⚠ Warning: Missing columns: {missing}")

# Extra columns check
extra = set(df_complete.columns) - set(expected_columns)
if extra:
    print(f"\n⚠ Warning: Unexpected columns: {extra}")

print("\n" + "="*60)
print("✓ Acquisition complete! Ready for preprocessing.")
print("="*60)


ACQUISITION VERIFICATION REPORT

✓ Records acquired: 9,651
✓ Columns: 18
✓ File size: 93.33 MB

✓ Schema:
  ✓ hadm_id
  ✓ subject_id
  ✓ cleaned_text
  ✓ text_length
  ✓ gender
  ✓ ethnicity
  ✓ age_at_admission
  ✓ insurance
  ✓ language
  ✓ admission_type
  ✓ admittime
  ✓ dischtime
  ✓ length_of_stay
  ✓ lab_summary
  ✓ total_labs
  ✓ abnormal_count
  ✓ diagnosis_count
  ✓ top_diagnoses

✓ Acquisition complete! Ready for preprocessing.
