# Texas Children's Hospital Patient 360 PoC - Data Generator Notebook

This notebook generates comprehensive, realistic synthetic healthcare data for the Texas Children's Hospital Patient 360 PoC demonstration, including both structured and unstructured data sources.

## Execution Parameters
- `data_size`: Size of dataset to generate (small/smallplus/medium/large)
- `num_patients`: Number of patients (overrides data_size if provided)
- `encounters_per_patient`: Average encounters per patient (default: 5)
- `rows_per_file`: Shard size for structured CSVs (default: 25000; overridden by deploy script)
- `parallel`: true|false to control parallel generation path (default: true)
- `compress_files`: Whether to compress output files (default: true)


In [None]:
# Diagnostic: verify stage write from notebook compute
from snowflake.snowpark.context import get_active_session
from pathlib import Path
session = get_active_session()
path = Path('/tmp/diag_put.txt')
path.write_text('diag_ok')
try:
    put_res = session.file.put(str(path), "@TCH_PATIENT_360_POC.RAW_DATA.PATIENT_DATA_STAGE/diagnostics/", auto_compress=False, overwrite=True)
    print("diag_put_result:", put_res)
except Exception as e:
    print("diag_put_error:", e)



In [None]:
!pip install faker

In [None]:
# Import required packages
import os
import sys
import json
import csv
import gzip
import shutil
import random
import uuid
from datetime import datetime, timedelta, date
from typing import Dict, List, Optional, Tuple
from pathlib import Path

import pandas as pd
import numpy as np
from faker import Faker
from snowflake.snowpark.context import get_active_session

# Get Snowflake session
session = get_active_session()

print(f"Connected to Snowflake as {session.get_current_user()}", flush=True)
print(f"Database: {session.get_current_database()}", flush=True)
print(f"Schema: {session.get_current_schema()}", flush=True)
print(f"Warehouse: {session.get_current_warehouse()}", flush=True)


In [None]:
import logging
logger = logging.getLogger('tch_data_gen')
logger.setLevel(logging.INFO)
logger.info('Cell 4 start: environment and temp dirs prepared')


In [None]:
# Get execution parameters from notebook arguments
# When using EXECUTE NOTEBOOK, arguments are passed as strings and accessible via sys.argv
# Example: EXECUTE NOTEBOOK MY_NOTEBOOK('data_size=small', 'encounters_per_patient=10', 'rows_per_file=15000', 'parallel=true')

# Default values
DATA_SIZE = 'medium'
NUM_PATIENTS = None
ENCOUNTERS_PER_PATIENT = 5
COMPRESS_FILES = True
ROWS_PER_FILE = 25000
PARALLEL = True
DIAGNOSTICS = False

# Parse arguments if provided
if hasattr(sys, 'argv') and len(sys.argv) > 0:
    print(f"Arguments received: {sys.argv}")
    
    # Parse key=value arguments
    for arg in sys.argv:
        if isinstance(arg, str) and '=' in arg:
            key, value = arg.split('=', 1)
            if key == 'data_size':
                DATA_SIZE = value
            elif key == 'num_patients':
                NUM_PATIENTS = int(value)
            elif key == 'encounters_per_patient':
                ENCOUNTERS_PER_PATIENT = int(value)
            elif key == 'compress_files':
                COMPRESS_FILES = value.lower() != 'false'
            elif key == 'rows_per_file':
                ROWS_PER_FILE = int(value)
            elif key == 'parallel':
                PARALLEL = value.lower() == 'true'

# Determine number of patients based on size or explicit count
if NUM_PATIENTS is None:
    size_map = {
        'small': 1000,
        'medium': 5000,
        'large': 25000
    }
    NUM_PATIENTS = size_map.get(DATA_SIZE, 5000)

print(f"\nConfiguration:", flush=True)
print(f"  Data size: {DATA_SIZE}", flush=True)
print(f"  Number of patients: {NUM_PATIENTS:,}", flush=True)
print(f"  Encounters per patient: {ENCOUNTERS_PER_PATIENT}", flush=True)
print(f"  Compress files: {COMPRESS_FILES}", flush=True)
print(f"  Expected total encounters: ~{NUM_PATIENTS * ENCOUNTERS_PER_PATIENT:,}", flush=True)


In [None]:
# Setup output paths in Snowflake internal stage
# We'll use the existing PATIENT_DATA_STAGE and UNSTRUCTURED_DATA_STAGE

STRUCTURED_STAGE = '@TCH_PATIENT_360_POC.RAW_DATA.PATIENT_DATA_STAGE'
UNSTRUCTURED_STAGE = '@TCH_PATIENT_360_POC.RAW_DATA.UNSTRUCTURED_DATA_STAGE'

# Local temporary directory for file generation
TEMP_DIR = Path('/tmp/tch_data_generation')
STRUCTURED_DIR = TEMP_DIR / 'structured'
UNSTRUCTURED_DIR = TEMP_DIR / 'unstructured'

# Create directories
STRUCTURED_DIR.mkdir(parents=True, exist_ok=True)
UNSTRUCTURED_DIR.mkdir(parents=True, exist_ok=True)

# Clean up old files
print("🧹 Cleaning up old data files...", flush=True)
if TEMP_DIR.exists():
    shutil.rmtree(TEMP_DIR)
    TEMP_DIR.mkdir(parents=True, exist_ok=True)
    STRUCTURED_DIR.mkdir(parents=True, exist_ok=True)
    UNSTRUCTURED_DIR.mkdir(parents=True, exist_ok=True)

print(f"✅ Temporary directories created at {TEMP_DIR}", flush=True)
# Log the same message for CLI execution visibility
logger.info(f'Temp dirs ready at {TEMP_DIR}')


In [None]:
# Reset Snowflake stages to ensure a clean run
from snowflake.snowpark.context import get_active_session

session = get_active_session()
print("🧹 Clearing Snowflake stages before generation...", flush=True)
try:
    # Remove all files from structured and unstructured stages
    session.sql(f"REMOVE {STRUCTURED_STAGE};").collect()
    session.sql(f"REMOVE {UNSTRUCTURED_STAGE};").collect()
    # Refresh stage directory indexes
    session.sql("ALTER STAGE TCH_PATIENT_360_POC.RAW_DATA.PATIENT_DATA_STAGE REFRESH;").collect()
    session.sql("ALTER STAGE TCH_PATIENT_360_POC.RAW_DATA.UNSTRUCTURED_DATA_STAGE REFRESH;").collect()
    print("✅ Stages cleared and refreshed", flush=True)
    import logging
    logging.getLogger('tch_data_gen').info('Stages cleared and refreshed at start of run')
except Exception as e:
    print(f"⚠️ Stage clearing encountered an issue: {e}", flush=True)
    logging.getLogger('tch_data_gen').info(f'Stage clearing issue: {e}')


In [None]:
# PediatricDataGenerator class
class PediatricDataGenerator:
    """Generate realistic pediatric healthcare data for demonstration purposes."""
    
    def __init__(self, seed: int = 42):
        """Initialize generator with consistent seed for reproducible data."""
        random.seed(seed)
        np.random.seed(seed)
        self.fake = Faker()
        Faker.seed(seed)
        
        # Houston-area zip codes for realistic geographic distribution
        self.houston_zips = [
            '77001', '77002', '77003', '77004', '77005', '77006', '77007', '77008',
            '77009', '77010', '77011', '77012', '77013', '77014', '77015', '77016',
            '77017', '77018', '77019', '77020', '77021', '77022', '77023', '77024',
            '77025', '77026', '77027', '77028', '77029', '77030', '77031', '77032',
            '77033', '77034', '77035', '77036', '77037', '77038', '77039', '77040',
            '77041', '77042', '77043', '77044', '77045', '77046', '77047', '77048',
            '77049', '77050', '77051', '77052', '77053', '77054', '77055', '77056',
            '77057', '77058', '77059', '77060', '77061', '77062', '77063', '77064',
            '77065', '77066', '77067', '77068', '77069', '77070', '77071', '77072',
            '77073', '77074', '77075', '77076', '77077', '77078', '77079', '77080',
            '77081', '77082', '77083', '77084', '77085', '77086', '77087', '77088',
            '77089', '77090', '77091', '77092', '77093', '77094', '77095', '77096',
            '77097', '77098', '77099', '77338', '77339', '77345', '77346', '77347',
            '77354', '77357', '77365', '77373', '77375', '77377', '77379', '77380',
            '77381', '77382', '77383', '77384', '77385', '77386', '77388', '77389',
            '77391', '77393', '77396', '77401', '77402', '77406', '77407', '77429',
            '77433', '77447', '77449', '77450', '77459', '77469', '77477', '77478',
            '77479', '77484', '77489', '77493', '77494', '77498', '77502', '77503',
            '77504', '77505', '77506', '77507', '77508', '77520', '77521', '77530',
            '77532', '77536', '77539', '77546', '77547', '77562', '77571', '77573',
            '77581', '77584', '77586', '77587', '77598'
        ]
        
        # Common pediatric diagnoses with ICD-10 codes
        self.pediatric_diagnoses = {
            'J45.9': ('Asthma, unspecified', 0.08),  # 8% prevalence
            'F90.9': ('ADHD, unspecified', 0.07),   # 7% prevalence
            'E66.9': ('Obesity, unspecified', 0.18), # 18% prevalence
            'F84.0': ('Autistic disorder', 0.025),   # 2.5% prevalence
            'E10.9': ('Type 1 diabetes mellitus', 0.005), # 0.5% prevalence
            'Q21.0': ('Ventricular septal defect', 0.008), # 0.8% prevalence
            'H52.13': ('Myopia', 0.12),             # 12% prevalence
            'L20.9': ('Atopic dermatitis', 0.15),   # 15% prevalence
            'K59.00': ('Constipation', 0.10),       # 10% prevalence
            'J06.9': ('Upper respiratory infection', 0.30), # 30% prevalence
            'B34.9': ('Viral infection', 0.25),     # 25% prevalence
            'K21.9': ('GERD', 0.08),                # 8% prevalence
            'G40.909': ('Epilepsy', 0.007),         # 0.7% prevalence
            'F32.9': ('Depression', 0.03),          # 3% prevalence (adolescents)
            'F41.9': ('Anxiety disorder', 0.05),    # 5% prevalence
            'M79.3': ('Growing pains', 0.15),       # 15% prevalence
            'Z00.129': ('Well child exam', 0.95),   # 95% have routine checkups
            'S72.001A': ('Fracture of femur', 0.02), # 2% prevalence
            'T78.40XA': ('Allergy, unspecified', 0.20), # 20% prevalence
            'H66.90': ('Otitis media', 0.35)        # 35% prevalence
        }
        
        # Department mappings for Texas Children's Hospital
        self.departments = [
            'Emergency Department', 'Pediatric ICU', 'NICU', 'Cardiology',
            'Neurology', 'Oncology', 'Orthopedics', 'Pulmonology',
            'Gastroenterology', 'Endocrinology', 'Nephrology', 'Rheumatology',
            'Dermatology', 'Ophthalmology', 'ENT', 'Psychiatry',
            'General Pediatrics', 'Adolescent Medicine', 'Newborn Nursery',
            'Ambulatory Surgery', 'Radiology', 'Laboratory', 'Pharmacy'
        ]
        
        # Common pediatric medications
        self.pediatric_medications = [
            'Acetaminophen', 'Ibuprofen', 'Amoxicillin', 'Azithromycin',
            'Albuterol', 'Fluticasone', 'Montelukast', 'Methylphenidate',
            'Insulin', 'Prednisone', 'Cetirizine', 'Diphenhydramine',
            'Omeprazole', 'Ranitidine', 'Simethicone', 'Polyethylene glycol',
            'Hydrocortisone', 'Mupirocin', 'Miconazole', 'Nystatin'
        ]
        
        # Lab test reference ranges by age
        self.lab_reference_ranges = {
            'Hemoglobin': {
                '0-1': (14.0, 20.0), '1-3': (9.5, 13.0), '4-6': (10.5, 13.5),
                '7-12': (11.0, 14.0), '13-15': (12.0, 15.2), '16-21': (12.6, 16.6)
            },
            'White Blood Cells': {
                '0-1': (9000, 30000), '1-3': (6000, 17500), '4-6': (5500, 15500),
                '7-12': (4500, 13500), '13-21': (4500, 11000)
            },
            'Platelet Count': {
                '0-21': (150000, 450000)
            },
            'Glucose': {
                '0-21': (70, 100)
            },
            'Creatinine': {
                '0-1': (0.2, 0.4), '1-3': (0.3, 0.5), '4-6': (0.4, 0.6),
                '7-12': (0.5, 0.8), '13-21': (0.6, 1.2)
            }
        }


In [None]:
# Add methods to PediatricDataGenerator class (continuing from previous cell)
# This approach is necessary due to notebook cell size limitations

# Helper methods
def _generate_pediatric_age(self) -> int:
    """Generate age following pediatric population distribution."""
    weights = [25, 20, 15, 10, 10, 8, 6, 4, 2] + [1] * 13
    return random.choices(range(22), weights=weights)[0]

def _generate_race(self) -> str:
    """Generate race based on Texas demographics."""
    races = ['White', 'Black', 'Asian', 'Other', 'Native American', 'Pacific Islander']
    weights = [0.40, 0.15, 0.05, 0.38, 0.01, 0.01]
    return random.choices(races, weights=weights)[0]

def _generate_ethnicity(self) -> str:
    """Generate ethnicity based on Texas demographics."""
    ethnicities = ['Hispanic', 'Non-Hispanic']
    weights = [0.40, 0.60]
    return random.choices(ethnicities, weights=weights)[0]

def _generate_insurance_type(self, age: int) -> str:
    """Generate insurance type appropriate for pediatric population."""
    if age <= 18:
        types = ['Medicaid', 'Commercial', 'CHIP', 'Self-pay']
        weights = [0.45, 0.35, 0.15, 0.05]
    else:
        types = ['Medicaid', 'Commercial', 'Self-pay']
        weights = [0.25, 0.65, 0.10]
    return random.choices(types, weights=weights)[0]

def _generate_language(self, ethnicity: str) -> str:
    """Generate primary language based on ethnicity."""
    if ethnicity == 'Hispanic':
        languages = ['English', 'Spanish', 'English/Spanish']
        weights = [0.40, 0.30, 0.30]
    else:
        languages = ['English', 'Spanish', 'Vietnamese', 'Chinese', 'Arabic', 'Other']
        weights = [0.90, 0.03, 0.02, 0.02, 0.01, 0.02]
    return random.choices(languages, weights=weights)[0]

def _determine_encounter_count(self, age: int, base_count: int) -> int:
    """Determine number of encounters based on age."""
    if age == 0:
        multiplier = 2.5
    elif age <= 2:
        multiplier = 1.8
    elif age <= 5:
        multiplier = 1.3
    else:
        multiplier = 1.0
    
    count = int(base_count * multiplier * random.uniform(0.5, 1.5))
    return max(1, count)

def _generate_encounter_date(self, patient_created_date) -> datetime:
    """Generate realistic encounter date."""
    days_since_creation = (datetime.now() - patient_created_date).days
    if days_since_creation > 0:
        days_ago = random.randint(0, min(days_since_creation, 365))
        return datetime.now() - timedelta(days=days_ago)
    return datetime.now()

def _select_department(self, age: int) -> str:
    """Select appropriate department based on age."""
    if age == 0:
        dept_weights = {
            'NICU': 0.15, 'Newborn Nursery': 0.20, 'General Pediatrics': 0.40,
            'Emergency Department': 0.15, 'Pediatric ICU': 0.05, 'Other': 0.05
        }
    elif age <= 5:
        dept_weights = {
            'General Pediatrics': 0.50, 'Emergency Department': 0.20,
            'Pediatric ICU': 0.02, 'Pulmonology': 0.05, 'Gastroenterology': 0.05,
            'ENT': 0.08, 'Other': 0.10
        }
    elif age <= 12:
        dept_weights = {
            'General Pediatrics': 0.45, 'Emergency Department': 0.15,
            'Orthopedics': 0.10, 'Dermatology': 0.05, 'Ophthalmology': 0.05,
            'Psychiatry': 0.05, 'Other': 0.15
        }
    else:
        dept_weights = {
            'General Pediatrics': 0.30, 'Adolescent Medicine': 0.20,
            'Emergency Department': 0.15, 'Psychiatry': 0.10,
            'Dermatology': 0.05, 'Orthopedics': 0.08, 'Other': 0.12
        }
    
    departments = list(dept_weights.keys())
    weights = list(dept_weights.values())
    selected = random.choices(departments, weights=weights)[0]
    
    if selected == 'Other':
        return random.choice(self.departments)
    return selected

def _determine_encounter_type(self, department: str) -> str:
    """Determine encounter type based on department."""
    if department == 'Emergency Department':
        return 'Emergency'
    elif department in ['Pediatric ICU', 'NICU']:
        return 'Inpatient'
    elif department in ['Ambulatory Surgery']:
        return 'Surgical'
    else:
        types = ['Outpatient', 'Outpatient', 'Outpatient', 'Inpatient']
        weights = [0.85, 0.10, 0.03, 0.02]
        return random.choices(types, weights=weights)[0]

def _generate_physician_name(self) -> str:
    """Generate realistic physician name."""
    return f"Dr. {self.fake.last_name()}"

def _generate_discharge_date(self, admission_date: datetime, encounter_type: str) -> Optional[datetime]:
    """Generate discharge date based on encounter type."""
    if encounter_type == 'Outpatient':
        return admission_date
    elif encounter_type == 'Emergency':
        hours = random.choices([4, 8, 12, 24], weights=[0.6, 0.2, 0.15, 0.05])[0]
        return admission_date + timedelta(hours=hours)
    elif encounter_type == 'Inpatient':
        days = random.choices([1, 2, 3, 5, 7, 10], weights=[0.3, 0.3, 0.2, 0.1, 0.07, 0.03])[0]
        return admission_date + timedelta(days=days)
    elif encounter_type == 'Surgical':
        days = random.choices([0, 1, 2], weights=[0.6, 0.3, 0.1])[0]
        return admission_date + timedelta(days=days)
    return None

def _generate_chief_complaint(self, age: int) -> str:
    """Generate age-appropriate chief complaint."""
    if age == 0:
        complaints = ['Fever', 'Poor feeding', 'Respiratory distress', 'Jaundice', 
                     'Vomiting', 'Routine newborn care']
    elif age <= 2:
        complaints = ['Fever', 'Cough', 'Vomiting', 'Diarrhea', 'Ear pain', 
                     'Rash', 'Well child check']
    elif age <= 5:
        complaints = ['Fever', 'Cough', 'Abdominal pain', 'Ear pain', 'Sore throat',
                     'Injury', 'Well child check', 'Asthma']
    elif age <= 12:
        complaints = ['Abdominal pain', 'Headache', 'Injury', 'Asthma', 'Sore throat',
                     'Fever', 'Well child check', 'ADHD follow-up']
    else:
        complaints = ['Abdominal pain', 'Headache', 'Injury', 'Depression screening',
                     'Anxiety', 'Acne', 'Sports physical', 'Well adolescent check']
    
    return random.choice(complaints)

def _select_diagnoses_for_encounter(self, department: str, count: int) -> List[Tuple[str, str]]:
    """Select appropriate diagnoses based on department (align with original script)."""
    if department == 'Emergency Department':
        common_codes = ['J06.9', 'B34.9', 'T78.40XA']
    elif department == 'Cardiology':
        common_codes = ['Q21.0']
    elif department == 'Pulmonology':
        common_codes = ['J45.9']
    elif department == 'Endocrinology':
        common_codes = ['E10.9']
    elif department == 'Dermatology':
        common_codes = ['L20.9']
    elif department in ['Otolaryngology', 'ENT']:
        common_codes = ['H66.90']
    elif department == 'Gastroenterology':
        common_codes = ['K21.9', 'K59.00']
    elif department == 'Neurology':
        common_codes = ['G40.909']
    else:
        common_codes = ['J06.9', 'B34.9', 'L20.9', 'H66.90']
    
    selections: List[Tuple[str, str]] = []
    for _ in range(max(1, count)):
        code = random.choice(common_codes)
        desc = self.pediatric_diagnoses.get(code, (code, 1.0))[0]
        selections.append((code, desc))
    return selections

# Add these methods to the PediatricDataGenerator class
PediatricDataGenerator._generate_pediatric_age = _generate_pediatric_age
PediatricDataGenerator._generate_race = _generate_race
PediatricDataGenerator._generate_ethnicity = _generate_ethnicity
PediatricDataGenerator._generate_insurance_type = _generate_insurance_type
PediatricDataGenerator._generate_language = _generate_language
PediatricDataGenerator._determine_encounter_count = _determine_encounter_count
PediatricDataGenerator._generate_encounter_date = _generate_encounter_date
PediatricDataGenerator._select_department = _select_department
PediatricDataGenerator._determine_encounter_type = _determine_encounter_type
PediatricDataGenerator._generate_physician_name = _generate_physician_name
PediatricDataGenerator._generate_discharge_date = _generate_discharge_date
PediatricDataGenerator._generate_chief_complaint = _generate_chief_complaint
PediatricDataGenerator._select_diagnoses_for_encounter = _select_diagnoses_for_encounter


In [None]:
# Main data generation methods for PediatricDataGenerator

def generate_patient_demographics(self, count: int) -> List[Dict]:
    """Generate realistic pediatric patient demographics."""
    patients = []
    
    for i in range(count):
        # Generate age with higher concentration in younger years
        age = self._generate_pediatric_age()
        birth_date = datetime.now() - timedelta(days=age * 365.25)
        
        # Generate basic demographics
        gender = random.choice(['M', 'F'])
        race = self._generate_race()
        ethnicity = self._generate_ethnicity()
        
        patient = {
            'patient_id': f"TCH-{i+1:06d}",
            'mrn': f"MRN{random.randint(10000000, 99999999)}",
            'first_name': self.fake.first_name_male() if gender == 'M' else self.fake.first_name_female(),
            'last_name': self.fake.last_name(),
            'date_of_birth': birth_date.date(),
        'gender': gender,
            'race': race,
            'ethnicity': ethnicity,
            'zip_code': random.choice(self.houston_zips),
            'insurance_type': self._generate_insurance_type(age),
            'language': self._generate_language(ethnicity),
            'created_date': self.fake.date_time_between(start_date='-5y', end_date='now'),
            'updated_date': datetime.now()
        }
        patients.append(patient)
    
    return patients

def generate_encounters(self, patients: List[Dict], encounters_per_patient: int = 5) -> List[Dict]:
    """Generate realistic encounter data for patients."""
    encounters = []
    encounter_id = 1
    
    for patient in patients:
        # Derive age dynamically from DOB for encounter logic
        patient_age = (datetime.now().date().year - patient['date_of_birth'].year) - ((datetime.now().date().month, datetime.now().date().day) < (patient['date_of_birth'].month, patient['date_of_birth'].day))
        
        # Determine number of encounters based on age and conditions
        num_encounters = self._determine_encounter_count(patient_age, encounters_per_patient)
        
        for _ in range(num_encounters):
            encounter_date = self._generate_encounter_date(patient['created_date'])
            department = self._select_department(patient_age)
            encounter_type = self._determine_encounter_type(department)
            
            encounter = {
                'encounter_id': f"ENC-{encounter_id:08d}",
                'patient_id': patient['patient_id'],
                'encounter_date': encounter_date,
                'encounter_type': encounter_type,
                'department': department,
                'attending_physician': self._generate_physician_name(),
                'admission_date': encounter_date,
                'discharge_date': self._generate_discharge_date(encounter_date, encounter_type),
                'length_of_stay': None,  # Will calculate based on dates
                'chief_complaint': self._generate_chief_complaint(patient_age),
                'status': random.choice(['Completed', 'In Progress', 'Scheduled']),
                'created_date': encounter_date,
                'updated_date': datetime.now()
            }
            
            # Calculate length of stay
            if encounter['discharge_date']:
                encounter['length_of_stay'] = (encounter['discharge_date'] - encounter_date).days
            
            encounters.append(encounter)
            encounter_id += 1
    
    return encounters

def generate_diagnoses(self, encounters: List[Dict]) -> List[Dict]:
    """Generate diagnosis data linked to encounters.
    Use department-based selection (matches previous script behavior).
    """
    diagnoses = []
    diagnosis_id = 1
    
    for encounter in encounters:
        # Determine number of diagnoses for this encounter
        num_diagnoses = random.choices([1, 2, 3], weights=[0.6, 0.3, 0.1])[0]
        
        # Select diagnoses based on department mapping
        selected_codes = self._select_diagnoses_for_encounter(
            encounter.get('department', 'General Pediatrics'), num_diagnoses
        )
        
        # Ensure at least one diagnosis
        if not selected_codes:
            code, (desc, _) = random.choice(list(self.pediatric_diagnoses.items()))
            selected_codes = [(code, desc)]
        
        # Create diagnosis records
        for i, (code, desc) in enumerate(selected_codes):
            diagnosis = {
                'diagnosis_id': f"DX-{diagnosis_id:08d}",
                'encounter_id': encounter['encounter_id'],
                'patient_id': encounter['patient_id'],
                # Use canonical keys expected by SQL loader while keeping legacy keys for compatibility
                'diagnosis_code': code,
                'icd10_code': code,
                'diagnosis_description': desc,
                'description': desc,
                'diagnosis_type': 'Primary' if i == 0 else 'Secondary',
                'diagnosis_date': encounter['encounter_date'],
                'created_date': encounter['encounter_date'],
                'updated_date': datetime.now()
            }
            diagnoses.append(diagnosis)
            diagnosis_id += 1
    
    return diagnoses

def generate_lab_results(self, encounters: List[Dict], patients: List[Dict]) -> List[Dict]:
    """Generate lab results linked to encounters."""
    lab_results = []
    lab_id = 1
    
    # Create patient lookup
    patient_lookup = {p['patient_id']: p for p in patients}
    
    # Common lab tests
    lab_tests = [
        ('CBC', 'Complete Blood Count', 0.40),
        ('BMP', 'Basic Metabolic Panel', 0.30),
        ('UA', 'Urinalysis', 0.25),
        ('Strep', 'Strep Test', 0.15),
        ('Flu', 'Influenza Test', 0.10),
        ('COVID', 'COVID-19 Test', 0.08),
        ('RSV', 'RSV Test', 0.12),
        ('Blood Culture', 'Blood Culture', 0.05)
    ]
    
    for encounter in encounters:
        patient = patient_lookup[encounter['patient_id']]
        patient_age = (datetime.now().date().year - patient['date_of_birth'].year) - ((datetime.now().date().month, datetime.now().date().day) < (patient['date_of_birth'].month, patient['date_of_birth'].day))
        
        # Determine if labs are needed
        lab_probability = 0.2  # Base probability
        if encounter['encounter_type'] in ['Emergency', 'Inpatient']:
            lab_probability = 0.8
        elif encounter['department'] in ['Pediatric ICU', 'NICU']:
            lab_probability = 0.95
        
        if random.random() < lab_probability:
            # Select which labs to order
            num_labs = random.choices([1, 2, 3, 4], weights=[0.4, 0.3, 0.2, 0.1])[0]
            selected_labs = random.sample(lab_tests, min(num_labs, len(lab_tests)))
            
            for test_code, test_name, _ in selected_labs:
                # Generate appropriate results based on test type
                if test_code == 'CBC':
                    results = self._generate_cbc_results(patient_age)
                elif test_code == 'BMP':
                    results = self._generate_bmp_results(patient_age)
                else:
                    results = self._generate_generic_results(test_code)
                
                lab_result = {
                    'lab_result_id': f"LAB-{lab_id:08d}",
                    'encounter_id': encounter['encounter_id'],
                    'patient_id': encounter['patient_id'],
                    'test_code': test_code,
                    'test_name': test_name,
                    'result_value': results['value'],
                    'result_unit': results['unit'],
                    'reference_range': results['reference'],
                    'abnormal_flag': results['abnormal'],
                    'collection_date': encounter['encounter_date'] + timedelta(hours=random.randint(0, 4)),
                    'result_date': encounter['encounter_date'] + timedelta(hours=random.randint(4, 8)),
                    'status': 'Final',
                    'created_date': encounter['encounter_date'],
                    'updated_date': datetime.now()
                }
                lab_results.append(lab_result)
                lab_id += 1
    
    return lab_results

# Add these methods to PediatricDataGenerator
PediatricDataGenerator.generate_patient_demographics = generate_patient_demographics
PediatricDataGenerator.generate_encounters = generate_encounters
PediatricDataGenerator.generate_diagnoses = generate_diagnoses
PediatricDataGenerator.generate_lab_results = generate_lab_results


In [None]:
# Additional methods for PediatricDataGenerator - Lab result generators and more

def _generate_cbc_results(self, age: int) -> Dict:
    """Generate age-appropriate CBC results."""
    # Get age range key
    if age == 0:
        age_key = '0-1'
    elif age <= 3:
        age_key = '1-3'
    elif age <= 6:
        age_key = '4-6'
    elif age <= 12:
        age_key = '7-12'
    elif age <= 15:
        age_key = '13-15'
    else:
        age_key = '16-21'
    
    # Generate hemoglobin
    hgb_range = self.lab_reference_ranges['Hemoglobin'][age_key]
    hgb_value = round(random.uniform(hgb_range[0] - 1, hgb_range[1] + 1), 1)
    
    return {
        'value': str(hgb_value),
        'unit': 'g/dL',
        'reference': f"{hgb_range[0]}-{hgb_range[1]}",
        'abnormal': 'L' if hgb_value < hgb_range[0] else ('H' if hgb_value > hgb_range[1] else 'N')
    }

def _generate_bmp_results(self, age: int) -> Dict:
    """Generate basic metabolic panel results."""
    glucose_range = self.lab_reference_ranges['Glucose']['0-21']
    glucose_value = random.randint(glucose_range[0] - 10, glucose_range[1] + 20)
    
    return {
        'value': str(glucose_value),
        'unit': 'mg/dL',
        'reference': f"{glucose_range[0]}-{glucose_range[1]}",
        'abnormal': 'L' if glucose_value < glucose_range[0] else ('H' if glucose_value > glucose_range[1] else 'N')
    }

def _generate_generic_results(self, test_code: str) -> Dict:
    """Generate generic test results."""
    if test_code in ['Strep', 'Flu', 'COVID', 'RSV']:
        result = random.choices(['Positive', 'Negative'], weights=[0.2, 0.8])[0]
        return {
            'value': result,
            'unit': '',
            'reference': 'Negative',
            'abnormal': 'A' if result == 'Positive' else 'N'
        }
    else:
        return {
            'value': 'See report',
            'unit': '',
            'reference': '',
            'abnormal': 'N'
        }

def generate_medications(self, encounters: List[Dict], diagnoses: List[Dict]) -> List[Dict]:
    """Generate medication data linked to encounters and diagnoses.
    Matches prior script behavior: always derive meds from diagnoses; chronic meds have no end_date; acute meds have 7–14 day course.
    """
    medications = []
    med_id = 1
    
    # Create diagnosis lookup by encounter
    encounter_diagnoses = {}
    for dx in diagnoses:
        if dx['encounter_id'] not in encounter_diagnoses:
            encounter_diagnoses[dx['encounter_id']] = []
        encounter_diagnoses[dx['encounter_id']].append(dx)
    
    # Medication mapping by diagnosis
    diagnosis_medications = {
        'J45.9': ['Albuterol', 'Fluticasone', 'Montelukast'],  # Asthma
        'F90.9': ['Methylphenidate'],  # ADHD
        'E10.9': ['Insulin'],  # Diabetes
        'J06.9': ['Acetaminophen', 'Ibuprofen'],  # URI
        'H66.90': ['Amoxicillin'],  # Otitis media
        'L20.9': ['Hydrocortisone'],  # Dermatitis
        'K21.9': ['Omeprazole'],  # GERD
        'K59.00': ['Polyethylene glycol']  # Constipation
    }
    
    chronic_meds = {'Albuterol', 'Fluticasone', 'Insulin', 'Methylphenidate', 'Omeprazole', 'Montelukast'}
    
    for encounter in encounters:
        # Always consider mapped meds from diagnoses for this encounter
        enc_diagnoses = encounter_diagnoses.get(encounter['encounter_id'], [])
        
        prescribed_meds = set()
        for dx in enc_diagnoses:
            if dx['icd10_code'] in diagnosis_medications:
                for med in diagnosis_medications[dx['icd10_code']]:
                    prescribed_meds.add(med)
        
        # Add supportive medication for ED encounters
        if encounter['encounter_type'] == 'Emergency':
            prescribed_meds.add(random.choice(['Acetaminophen', 'Ibuprofen']))
        
        # Create medication records
        for med_name in prescribed_meds:
            if med_name in chronic_meds:
                end_date_val = None
            else:
                end_date_val = encounter['encounter_date'] + timedelta(days=random.choice([7, 10, 14]))
            
            medication = {
                'medication_id': f"MED-{med_id:08d}",
                'encounter_id': encounter['encounter_id'],
                'patient_id': encounter['patient_id'],
                'medication_name': med_name,
                'dosage': self._generate_dosage(med_name),
                'frequency': self._generate_frequency(med_name),
                'route': self._generate_route(med_name),
                'start_date': encounter['encounter_date'],
                'end_date': end_date_val,
                'prescribing_provider': encounter['attending_physician'],
                'created_date': encounter['encounter_date'],
                'updated_date': datetime.now()
            }
            medications.append(medication)
            med_id += 1
    
    return medications

def _generate_dosage(self, medication_name: str) -> str:
    """Generate appropriate dosage for medication."""
    dosages = {
        'Acetaminophen': '15 mg/kg',
        'Ibuprofen': '10 mg/kg',
        'Amoxicillin': '45 mg/kg/day',
        'Albuterol': '2 puffs',
        'Fluticasone': '1 puff',
        'Methylphenidate': '5 mg',
        'Insulin': '0.5 units/kg/day'
    }
    return dosages.get(medication_name, '1 dose')

def _generate_frequency(self, medication_name: str) -> str:
    """Generate appropriate frequency for medication."""
    frequencies = {
        'Acetaminophen': 'every 4-6 hours as needed',
        'Ibuprofen': 'every 6-8 hours as needed',
        'Amoxicillin': 'twice daily',
        'Albuterol': 'every 4 hours as needed',
        'Fluticasone': 'twice daily',
        'Methylphenidate': 'once daily',
        'Insulin': 'as directed'
    }
    return frequencies.get(medication_name, 'as directed')

def _generate_route(self, medication_name: str) -> str:
    """Generate appropriate route for medication."""
    if medication_name in ['Albuterol', 'Fluticasone']:
        return 'Inhalation'
    elif medication_name == 'Insulin':
        return 'Subcutaneous'
    elif medication_name == 'Hydrocortisone':
        return 'Topical'
    else:
        return 'Oral'

def generate_vital_signs(self, encounters: List[Dict], patients: List[Dict]) -> List[Dict]:
    """Generate vital signs data for encounters."""
    vital_signs = []
    vital_id = 1
    
    # Create patient lookup
    patient_lookup = {p['patient_id']: p for p in patients}
    
    for encounter in encounters:
        patient = patient_lookup[encounter['patient_id']]
        patient_age = (datetime.now().date().year - patient['date_of_birth'].year) - ((datetime.now().date().month, datetime.now().date().day) < (patient['date_of_birth'].month, patient['date_of_birth'].day))
        
        # Generate vitals for most encounters
        if random.random() < 0.9:
            # Number of vital sign sets
            num_sets = 1
            if encounter['encounter_type'] == 'Inpatient':
                num_sets = random.randint(3, 8)
            elif encounter['encounter_type'] == 'Emergency':
                num_sets = random.randint(2, 4)
            
            for i in range(num_sets):
                vital_time = encounter['encounter_date'] + timedelta(hours=i * 4)
                
                vital = {
                    'vital_sign_id': f"VS-{vital_id:08d}",
                    'encounter_id': encounter['encounter_id'],
                    'patient_id': encounter['patient_id'],
                    'temperature': self._generate_temperature(),
                    'heart_rate': self._generate_heart_rate(patient_age),
                    'respiratory_rate': self._generate_respiratory_rate(patient_age),
                    'blood_pressure_systolic': self._generate_bp_systolic(patient_age),
                    'blood_pressure_diastolic': self._generate_bp_diastolic(patient_age),
                    'oxygen_saturation': self._generate_oxygen_saturation(),
                    'weight_kg': self._generate_weight(patient_age),
                    'height_cm': self._generate_height(patient_age),
                    'recorded_date': vital_time,
                    'recorded_by': f"Nurse {self.fake.last_name()}",
                    'created_date': vital_time,
                    'updated_date': datetime.now()
                }
                vital_signs.append(vital)
                vital_id += 1
    
    return vital_signs

# Vital sign generation helpers
def _generate_temperature(self) -> float:
    """Generate body temperature in Celsius."""
    return round(random.uniform(36.0, 38.5), 1)

def _generate_heart_rate(self, age: int) -> int:
    """Generate age-appropriate heart rate."""
    if age == 0:
        return random.randint(100, 160)
    elif age <= 1:
        return random.randint(90, 150)
    elif age <= 3:
        return random.randint(80, 130)
    elif age <= 6:
        return random.randint(75, 115)
    elif age <= 12:
        return random.randint(70, 110)
    else:
        return random.randint(60, 100)

def _generate_respiratory_rate(self, age: int) -> int:
    """Generate age-appropriate respiratory rate."""
    if age == 0:
        return random.randint(30, 60)
    elif age <= 1:
        return random.randint(24, 40)
    elif age <= 3:
        return random.randint(22, 34)
    elif age <= 6:
        return random.randint(20, 30)
    elif age <= 12:
        return random.randint(18, 26)
    else:
        return random.randint(12, 20)

def _generate_bp_systolic(self, age: int) -> int:
    """Generate age-appropriate systolic blood pressure."""
    if age <= 1:
        return random.randint(70, 95)
    elif age <= 3:
        return random.randint(80, 100)
    elif age <= 6:
        return random.randint(85, 105)
    elif age <= 12:
        return random.randint(90, 110)
    else:
        return random.randint(100, 120)

def _generate_bp_diastolic(self, age: int) -> int:
    """Generate age-appropriate diastolic blood pressure."""
    if age <= 1:
        return random.randint(45, 65)
    elif age <= 3:
        return random.randint(50, 65)
    elif age <= 6:
        return random.randint(55, 70)
    elif age <= 12:
        return random.randint(60, 75)
    else:
        return random.randint(65, 80)

def _generate_oxygen_saturation(self) -> int:
    """Generate oxygen saturation percentage."""
    return random.choices(
        [98, 99, 100, 97, 96, 95],
        weights=[0.3, 0.3, 0.2, 0.1, 0.05, 0.05]
    )[0]

def _generate_weight(self, age: int) -> float:
    """Generate age-appropriate weight in kg."""
    if age == 0:
        return round(random.uniform(2.5, 4.5), 1)
    elif age == 1:
        return round(random.uniform(8, 12), 1)
    elif age <= 3:
        return round(random.uniform(12, 18), 1)
    elif age <= 6:
        return round(random.uniform(16, 25), 1)
    elif age <= 12:
        return round(random.uniform(25, 50), 1)
    else:
        return round(random.uniform(45, 80), 1)

def _generate_height(self, age: int) -> float:
    """Generate age-appropriate height in cm."""
    if age == 0:
        return round(random.uniform(45, 55), 1)
    elif age == 1:
        return round(random.uniform(70, 80), 1)
    elif age <= 3:
        return round(random.uniform(85, 100), 1)
    elif age <= 6:
        return round(random.uniform(100, 120), 1)
    elif age <= 12:
        return round(random.uniform(120, 150), 1)
    else:
        return round(random.uniform(150, 180), 1)

# Add all methods to PediatricDataGenerator
PediatricDataGenerator._generate_cbc_results = _generate_cbc_results
PediatricDataGenerator._generate_bmp_results = _generate_bmp_results
PediatricDataGenerator._generate_generic_results = _generate_generic_results
PediatricDataGenerator.generate_medications = generate_medications
PediatricDataGenerator._generate_dosage = _generate_dosage
PediatricDataGenerator._generate_frequency = _generate_frequency
PediatricDataGenerator._generate_route = _generate_route
PediatricDataGenerator.generate_vital_signs = generate_vital_signs
PediatricDataGenerator._generate_temperature = _generate_temperature
PediatricDataGenerator._generate_heart_rate = _generate_heart_rate
PediatricDataGenerator._generate_respiratory_rate = _generate_respiratory_rate
PediatricDataGenerator._generate_bp_systolic = _generate_bp_systolic
PediatricDataGenerator._generate_bp_diastolic = _generate_bp_diastolic
PediatricDataGenerator._generate_oxygen_saturation = _generate_oxygen_saturation
PediatricDataGenerator._generate_weight = _generate_weight
PediatricDataGenerator._generate_height = _generate_height


In [None]:
# Override generator behavior to improve diabetes + HbA1c cohorts
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import random

# 1) Expand Endocrinology diagnoses and include Type 2 diabetes

def _select_diagnoses_for_encounter_override(self, department: str, count: int) -> List[Tuple[str, str]]:
    if department == 'Emergency Department':
        common_codes = ['J06.9', 'B34.9', 'T78.40XA']
    elif department == 'Cardiology':
        common_codes = ['Q21.0']
    elif department == 'Pulmonology':
        common_codes = ['J45.9']
    elif department == 'Endocrinology':
        # Prefer diabetes codes in endocrine visits
        common_codes = ['E10.9', 'E11.9']
    elif department == 'Dermatology':
        common_codes = ['L20.9']
    elif department in ['Otolaryngology', 'ENT']:
        common_codes = ['H66.90']
    elif department == 'Gastroenterology':
        common_codes = ['K21.9', 'K59.00']
    elif department == 'Neurology':
        common_codes = ['G40.909']
    else:
        common_codes = ['J06.9', 'B34.9', 'L20.9', 'H66.90']

    selections: List[Tuple[str, str]] = []
    for _ in range(max(1, count)):
        code = random.choice(common_codes)
        desc = self.pediatric_diagnoses.get(code, (code, 1.0))[0]
        selections.append((code, desc))
    return selections

# 2) Force a recent Endocrinology encounter for a slice of patients to ensure recency window

def generate_encounters_override(self, patients: List[Dict], encounters_per_patient: int = 5) -> List[Dict]:
    encounters: List[Dict] = []
    encounter_id = 1

    for patient in patients:
        # Derive age from DOB
        patient_age = (datetime.now().date().year - patient['date_of_birth'].year) - (
            (datetime.now().date().month, datetime.now().date().day) < (patient['date_of_birth'].month, patient['date_of_birth'].day)
        )

        num_encounters = self._determine_encounter_count(patient_age, encounters_per_patient)

        # Roughly 2.5% of patients will be marked to have a recent endocrine visit
        force_recent_endo = random.random() < 0.025
        forced_index = random.randrange(num_encounters) if force_recent_endo else -1

        for idx in range(num_encounters):
            if idx == forced_index:
                # Force an endocrinology encounter within the last 90 days
                encounter_date = datetime.now() - timedelta(days=random.randint(0, 85))
                department = 'Endocrinology'
            else:
                encounter_date = self._generate_encounter_date(patient['created_date'])
                department = self._select_department(patient_age)

            encounter_type = self._determine_encounter_type(department)

            encounter = {
                'encounter_id': f"ENC-{encounter_id:08d}",
                'patient_id': patient['patient_id'],
                'encounter_date': encounter_date,
                'encounter_type': encounter_type,
                'department': department,
                'attending_physician': self._generate_physician_name(),
                'admission_date': encounter_date,
                'discharge_date': self._generate_discharge_date(encounter_date, encounter_type),
                'length_of_stay': None,
                'chief_complaint': self._generate_chief_complaint(patient_age),
                'status': 'Completed',
                'created_date': encounter_date,
                'updated_date': datetime.now()
            }

            if encounter['discharge_date']:
                encounter['length_of_stay'] = (encounter['discharge_date'] - encounter_date).days

            encounters.append(encounter)
            encounter_id += 1

    return encounters

# 3) Ensure HbA1c labs are generated for Endocrinology encounters; include a realistic high tail

def _generate_hba1c_results(self, age: int) -> Dict:
    roll = random.random()
    if roll < 0.75:
        value = round(random.uniform(4.8, 6.0), 1)
        abnormal = ''
    elif roll < 0.90:
        value = round(random.uniform(6.5, 8.9), 1)
        abnormal = 'H'
    else:
        value = round(random.uniform(9.0, 13.5), 1)
        abnormal = 'H'
    return {
        'value': str(value),
        'unit': '%',
        'reference': '4.8-5.6',
        'abnormal': abnormal
    }


def generate_lab_results_override(self, encounters: List[Dict], patients: List[Dict]) -> List[Dict]:
    lab_results: List[Dict] = []
    lab_id = 1

    patient_lookup = {p['patient_id']: p for p in patients}

    base_lab_tests = [
        ('CBC', 'Complete Blood Count', 0.40),
        ('BMP', 'Basic Metabolic Panel', 0.30),
        ('UA', 'Urinalysis', 0.25),
        ('Strep', 'Strep Test', 0.15),
        ('Flu', 'Influenza Test', 0.10),
        ('COVID', 'COVID-19 Test', 0.08),
        ('RSV', 'RSV Test', 0.12),
        ('Blood Culture', 'Blood Culture', 0.05),
        ('HBA1C', 'Hemoglobin A1c', 0.15)
    ]

    for encounter in encounters:
        patient = patient_lookup[encounter['patient_id']]
        patient_age = (datetime.now().date().year - patient['date_of_birth'].year) - (
            (datetime.now().date().month, datetime.now().date().day) < (patient['date_of_birth'].month, patient['date_of_birth'].day)
        )

        # Higher lab probability for acute/critical settings
        lab_probability = 0.2
        if encounter['encounter_type'] in ['Emergency', 'Inpatient']:
            lab_probability = 0.8
        elif encounter['department'] in ['Pediatric ICU', 'NICU']:
            lab_probability = 0.95
        # Endocrinology visits more likely to include labs
        if encounter['department'] == 'Endocrinology':
            lab_probability = max(lab_probability, 0.85)

        if random.random() < lab_probability:
            # Choose labs, but guarantee HbA1c for Endocrinology
            num_labs = random.choices([1, 2, 3, 4], weights=[0.4, 0.3, 0.2, 0.1])[0]
            selected = random.sample(base_lab_tests, min(num_labs, len(base_lab_tests)))

            if encounter['department'] == 'Endocrinology' and not any(c == 'HBA1C' for c, _, _ in selected):
                # Ensure HbA1c is included
                selected.append(('HBA1C', 'Hemoglobin A1c', 1.0))

            for test_code, test_name, _ in selected:
                if test_code == 'CBC':
                    results = self._generate_cbc_results(patient_age)
                elif test_code == 'BMP':
                    results = self._generate_bmp_results(patient_age)
                elif test_code == 'HBA1C':
                    results = self._generate_hba1c_results(patient_age)
                else:
                    results = self._generate_generic_results(test_code)

                lab_result = {
                    'lab_result_id': f"LAB-{lab_id:08d}",
                    'encounter_id': encounter['encounter_id'],
                    'patient_id': encounter['patient_id'],
                    'test_code': test_code,
                    'test_name': test_name,
                    'result_value': results['value'],
                    'result_unit': results['unit'],
                    'reference_range': results['reference'],
                    'abnormal_flag': results['abnormal'],
                    'collection_date': encounter['encounter_date'] + timedelta(hours=random.randint(0, 4)),
                    'result_date': encounter['encounter_date'] + timedelta(hours=random.randint(4, 8)),
                    'status': 'Final',
                    'created_date': encounter['encounter_date'],
                    'updated_date': datetime.now()
                }
                lab_results.append(lab_result)
                lab_id += 1

    return lab_results

# Apply overrides
PediatricDataGenerator._select_diagnoses_for_encounter = _select_diagnoses_for_encounter_override
PediatricDataGenerator.generate_encounters = generate_encounters_override
PediatricDataGenerator._generate_hba1c_results = _generate_hba1c_results
PediatricDataGenerator.generate_lab_results = generate_lab_results_override



In [None]:
# ClinicalNotesGenerator class
class ClinicalNotesGenerator:
    """Generate realistic clinical documentation for pediatric patients."""
    
    def __init__(self, seed: int = 42):
        """Initialize generator with consistent seed for reproducible data."""
        random.seed(seed)
        self.fake = Faker()
        Faker.seed(seed)
        
        # Clinical note templates and components
        self.note_types = [
            'Progress Note', 'Admission Note', 'Discharge Summary', 
            'Consultation Note', 'Nursing Note', 'Emergency Department Note',
            'Procedure Note', 'Follow-up Note'
        ]
        
        self.pediatric_symptoms = [
            'fever', 'cough', 'vomiting', 'diarrhea', 'abdominal pain', 'headache',
            'sore throat', 'ear pain', 'rash', 'congestion', 'wheezing', 'fatigue',
            'poor feeding', 'irritability', 'difficulty breathing', 'shortness of breath',
            'chest pain', 'joint pain', 'muscle aches', 'nausea', 'dizziness',
            'anxiety', 'panic', 'sleep disturbance', 'appetite loss'
        ]
        
        self.physical_exam_findings = {
            'general': ['alert', 'responsive', 'well-appearing', 'ill-appearing', 'anxious', 'comfortable'],
            'vital_signs': ['stable', 'normal for age', 'elevated temperature', 'tachycardic', 'tachypneic'],
            'heent': ['normocephalic', 'atraumatic', 'pupils equal and reactive', 'TMs clear', 'throat erythematous'],
            'cardiovascular': ['regular rate and rhythm', 'no murmurs', 'good perfusion', 'normal S1 S2'],
            'respiratory': ['clear to auscultation', 'good air movement', 'no wheezes', 'no rales', 'symmetric expansion'],
            'abdomen': ['soft', 'non-tender', 'non-distended', 'normal bowel sounds', 'no organomegaly'],
            'extremities': ['no edema', 'full range of motion', 'no deformity', 'good strength'],
            'neurologic': ['alert and oriented', 'no focal deficits', 'cranial nerves intact', 'reflexes normal'],
            'skin': ['warm and dry', 'no rash', 'good turgor', 'no lesions']
        }
        
        self.assessment_plans = {
            'J45.9': {  # Asthma
                'assessment': 'Asthma exacerbation',
                'plan': [
                    'Continue albuterol inhaler 2 puffs every 4-6 hours as needed',
                    'Start/continue inhaled corticosteroid therapy',
                    'Follow up with pulmonology in 2-4 weeks',
                    'Return to ED if worsening symptoms',
                    'Asthma action plan reviewed with family'
                ]
            },
            'F90.9': {  # ADHD
                'assessment': 'Attention deficit hyperactivity disorder',
                'plan': [
                    'Continue current medication regimen',
                    'Behavioral therapy referral',
                    'School accommodations discussed',
                    'Follow up in 3 months',
                    'Monitor growth and development'
                ]
            },
            'E10.9': {  # Type 1 Diabetes
                'assessment': 'Type 1 diabetes mellitus',
                'plan': [
                    'Continue insulin per sliding scale',
                    'Blood glucose monitoring 4x daily',
                    'Endocrinology follow-up in 3 months',
                    'Nutrition counseling',
                    'Annual ophthalmology exam'
                ]
            },
            'J06.9': {  # Upper respiratory infection
                'assessment': 'Viral upper respiratory infection',
                'plan': [
                    'Supportive care with rest and fluids',
                    'Acetaminophen or ibuprofen for fever',
                    'Saline nasal drops for congestion',
                    'Return if symptoms worsen or persist >10 days',
                    'No antibiotics indicated'
                ]
            }
        }
        
        self.radiology_findings = {
            'chest_xray': [
                'lungs are clear bilaterally',
                'no acute cardiopulmonary process',
                'heart size normal for age',
                'no pneumonia or pneumothorax',
                'costophrenic angles are sharp'
            ],
            'abdominal_xray': [
                'normal bowel gas pattern',
                'no evidence of obstruction',
                'no free air',
                'normal soft tissue shadows',
                'no abnormal calcifications'
            ],
            'brain_mri': [
                'normal brain parenchyma',
                'no evidence of mass effect',
                'ventricles normal in size',
                'no abnormal enhancement',
                'normal gray-white differentiation'
            ],
            'brain_ct': [
                'no acute intracranial abnormality',
                'ventricles and sulci normal for age',
                'no midline shift',
                'no hemorrhage or mass effect',
                'normal gray-white differentiation'
            ],
            'ultrasound_abdomen': [
                'liver normal in size and echotexture',
                'gallbladder normal without stones',
                'kidneys normal in size and position',
                'no free fluid',
                'normal bowel peristalsis'
            ],
            'echo': [
                'normal cardiac structure and function',
                'ejection fraction normal',
                'no valvular abnormalities',
                'no pericardial effusion',
                'normal chamber sizes'
            ]
        }


In [None]:
# Methods for ClinicalNotesGenerator

def generate_progress_note(self, patient: Dict, encounter: Dict, diagnoses: List[Dict]) -> Dict:
    """Generate a progress note for an encounter."""
    note_id = f"PROG-{uuid.uuid4().hex[:8].upper()}"
    
    # Generate HPI
    hpi = self._generate_hpi(patient, encounter, diagnoses)
    
    # Generate physical exam
    physical_exam = self._generate_physical_exam()
    
    # Get assessment and plan
    assessment_plan = self._generate_assessment_plan(diagnoses)
    
    # Format DOB as MM/DD/YYYY
    if isinstance(patient['date_of_birth'], str):
        dob = datetime.strptime(patient['date_of_birth'], '%Y-%m-%d').strftime('%m/%d/%Y')
    else:
        dob = patient['date_of_birth'].strftime('%m/%d/%Y')
    
    note_content = f"""PROGRESS NOTE
    
Date of Service: {encounter['encounter_date'].strftime('%Y-%m-%d %H:%M')}
Patient: {patient['first_name']} {patient['last_name']}
MRN: {patient['mrn']}
DOB: {dob}

CHIEF COMPLAINT:
{encounter.get('chief_complaint', 'Follow-up visit')}

HISTORY OF PRESENT ILLNESS:
{hpi}

PHYSICAL EXAMINATION:
{physical_exam}

ASSESSMENT AND PLAN:
{assessment_plan}

Provider: {encounter['attending_physician']}
Date: {encounter['encounter_date'].strftime('%Y-%m-%d')}
"""
    
    return {
        'note_id': note_id,
        'encounter_id': encounter['encounter_id'],
        'patient_id': patient['patient_id'],
        'note_type': 'Progress Note',
        'note_date': encounter['encounter_date'],
        'author': encounter['attending_physician'],
        'note_content': note_content,
        'created_date': encounter['encounter_date'],
        'updated_date': datetime.now()
    }

def generate_nursing_note(self, patient: Dict, encounter: Dict) -> Dict:
    """Generate a nursing note."""
    note_id = f"NURS-{uuid.uuid4().hex[:8].upper()}"
    
    activities = random.choice([
        'Patient resting comfortably in bed',
        'Patient ambulating in hallway with assistance',
        'Patient playing in playroom',
        'Patient watching TV with parent at bedside'
    ])
    
    intake_output = f"PO intake: {random.randint(200, 800)}mL"
    
    parent_teaching = random.choice([
        'Medication administration reviewed with parents',
        'Discharge instructions discussed',
        'Home care instructions provided',
        'Signs and symptoms to watch for reviewed'
    ])
    
    # Format DOB as MM/DD/YYYY
    if isinstance(patient['date_of_birth'], str):
        dob = datetime.strptime(patient['date_of_birth'], '%Y-%m-%d').strftime('%m/%d/%Y')
    else:
        dob = patient['date_of_birth'].strftime('%m/%d/%Y')
    
    note_content = f"""NURSING NOTE

Date/Time: {encounter['encounter_date'].strftime('%Y-%m-%d %H:%M')}
Patient: {patient['first_name']} {patient['last_name']}
MRN: {patient['mrn']}
DOB: {dob}

SUBJECTIVE:
Patient {random.choice(['denies', 'reports minimal', 'reports mild'])} pain. 
Parent at bedside, involved in care.

OBJECTIVE:
Vital signs stable and within normal limits for age.
{activities}
{intake_output}

ASSESSMENT:
Patient stable, tolerating treatment well.

PLAN:
Continue current plan of care.
{parent_teaching}

Nurse: RN {self.fake.last_name()}
"""
    
    return {
        'note_id': note_id,
        'encounter_id': encounter['encounter_id'],
        'patient_id': patient['patient_id'],
        'note_type': 'Nursing Note',
        'note_date': encounter['encounter_date'],
        'author': f"RN {self.fake.last_name()}",
        'note_content': note_content,
        'created_date': encounter['encounter_date'],
        'updated_date': datetime.now()
    }

def generate_discharge_summary(self, patient: Dict, encounter: Dict, 
                             diagnoses: List[Dict], medications: List[Dict]) -> Dict:
    """Generate a discharge summary."""
    note_id = f"DSUM-{uuid.uuid4().hex[:8].upper()}"
    
    # Get primary diagnosis
    primary_dx = diagnoses[0] if diagnoses else {'description': 'Observation', 'icd10_code': 'Z03.89'}
    
    # Generate hospital course
    hospital_course = self._generate_hospital_course(patient, encounter, diagnoses)
    
    # Generate discharge medications list
    discharge_meds = self._format_discharge_medications(medications)
    
    # Format DOB as MM/DD/YYYY
    if isinstance(patient['date_of_birth'], str):
        dob = datetime.strptime(patient['date_of_birth'], '%Y-%m-%d').strftime('%m/%d/%Y')
    else:
        dob = patient['date_of_birth'].strftime('%m/%d/%Y')
    
    note_content = f"""DISCHARGE SUMMARY

Admission Date: {encounter['admission_date'].strftime('%Y-%m-%d')}
Discharge Date: {encounter.get('discharge_date', encounter['admission_date']).strftime('%Y-%m-%d')}
Length of Stay: {encounter.get('length_of_stay', 1)} days

Patient: {patient['first_name']} {patient['last_name']}
MRN: {patient['mrn']}
DOB: {dob}

DISCHARGE DIAGNOSES:
Primary: {primary_dx['description']} ({primary_dx['icd10_code']})
{self._format_secondary_diagnoses(diagnoses[1:])}

HOSPITAL COURSE:
{hospital_course}

DISCHARGE MEDICATIONS:
{discharge_meds}

DISCHARGE INSTRUCTIONS:
1. Follow up with primary care physician in 3-5 days
2. Return to ED if fever >101°F, worsening symptoms, or concerns
3. Continue medications as prescribed
4. Activity as tolerated

DISCHARGE CONDITION: Stable
DISCHARGE DISPOSITION: Home with family

Attending Physician: {encounter['attending_physician']}
"""
    
    return {
        'note_id': note_id,
        'encounter_id': encounter['encounter_id'],
        'patient_id': patient['patient_id'],
        'note_type': 'Discharge Summary',
        'note_date': encounter.get('discharge_date', encounter['encounter_date']),
        'author': encounter['attending_physician'],
        'note_content': note_content,
        'created_date': encounter['encounter_date'],
        'updated_date': datetime.now()
    }

def generate_consultation_note(self, patient: Dict, encounter: Dict, 
                             specialty: str, diagnoses: List[Dict]) -> Dict:
    """Generate a specialty consultation note."""
    note_id = f"CONS-{uuid.uuid4().hex[:8].upper()}"
    
    reason_for_consult = self._get_consult_reason(specialty, diagnoses)
    age = self._calculate_age(patient['date_of_birth'])
    recommendations = self._get_specialty_recommendations(specialty, age)
    
    # Format DOB as MM/DD/YYYY
    if isinstance(patient['date_of_birth'], str):
        dob = datetime.strptime(patient['date_of_birth'], '%Y-%m-%d').strftime('%m/%d/%Y')
    else:
        dob = patient['date_of_birth'].strftime('%m/%d/%Y')
    
    note_content = f"""{specialty.upper()} CONSULTATION NOTE

Date of Consultation: {encounter['encounter_date'].strftime('%Y-%m-%d')}
Patient: {patient['first_name']} {patient['last_name']}
MRN: {patient['mrn']}
DOB: {dob}

REASON FOR CONSULTATION:
{reason_for_consult}

HISTORY OF PRESENT ILLNESS:
{self._generate_hpi(patient, encounter, diagnoses)}

PAST MEDICAL HISTORY:
{self._generate_pmh(self._calculate_age(patient['date_of_birth']))}

PHYSICAL EXAMINATION:
{self._generate_focused_exam(specialty)}

ASSESSMENT:
Based on history and examination, {self._generate_specialty_assessment(specialty, diagnoses)}

RECOMMENDATIONS:
{recommendations}

Thank you for this interesting consultation.

{specialty} Consultant: Dr. {self.fake.last_name()}
"""
    
    return {
        'note_id': note_id,
        'encounter_id': encounter['encounter_id'],
        'patient_id': patient['patient_id'],
        'note_type': 'Consultation Note',
        'note_date': encounter['encounter_date'],
        'author': f"Dr. {self.fake.last_name()}",
        'note_content': note_content,
        'created_date': encounter['encounter_date'],
        'updated_date': datetime.now()
    }

def generate_radiology_report(self, patient: Dict, encounter: Dict, study_type: str) -> Dict:
    """Generate a radiology report."""
    note_id = f"RAD-{uuid.uuid4().hex[:8].upper()}"
    
    # Get appropriate findings for study type
    findings = self.radiology_findings.get(study_type, ['No acute findings'])
    selected_findings = random.sample(findings, min(3, len(findings)))
    
    # Determine if normal or abnormal
    is_normal = random.random() < 0.7  # 70% normal studies
    
    if is_normal:
        impression = "No acute pathology identified."
    else:
        impression = self._generate_abnormal_impression(study_type, self._calculate_age(patient['date_of_birth']))
    
    # Format DOB as MM/DD/YYYY
    if isinstance(patient['date_of_birth'], str):
        dob = datetime.strptime(patient['date_of_birth'], '%Y-%m-%d').strftime('%m/%d/%Y')
    else:
        dob = patient['date_of_birth'].strftime('%m/%d/%Y')
    
    note_content = f"""RADIOLOGY REPORT

Study: {study_type.replace('_', ' ').upper()}
Date: {encounter['encounter_date'].strftime('%Y-%m-%d %H:%M')}

Patient: {patient['first_name']} {patient['last_name']}
MRN: {patient['mrn']}
DOB: {dob}

CLINICAL HISTORY:
{encounter.get('chief_complaint', 'Evaluate for pathology')}

TECHNIQUE:
Standard {study_type.replace('_', ' ')} protocol was performed.

FINDINGS:
{self._format_findings(selected_findings)}

IMPRESSION:
{impression}

Radiologist: Dr. {self.fake.last_name()}, MD
Electronically signed on {datetime.now().strftime('%Y-%m-%d %H:%M')}
"""
    
    return {
        'note_id': note_id,
        'encounter_id': encounter['encounter_id'],
        'patient_id': patient['patient_id'],
        'note_type': 'Radiology Report',
        'note_date': encounter['encounter_date'],
        'author': f"Dr. {self.fake.last_name()}",
        'note_content': note_content,
        'created_date': encounter['encounter_date'],
        'updated_date': datetime.now()
    }

# Add all methods to ClinicalNotesGenerator
ClinicalNotesGenerator.generate_progress_note = generate_progress_note
ClinicalNotesGenerator.generate_nursing_note = generate_nursing_note
ClinicalNotesGenerator.generate_discharge_summary = generate_discharge_summary
ClinicalNotesGenerator.generate_consultation_note = generate_consultation_note
ClinicalNotesGenerator.generate_radiology_report = generate_radiology_report


In [None]:
# Ensure ClinicalNotesGenerator has a DOB-based age calculator available
from datetime import datetime

def _cn_calculate_age(self, birth_date):
    if isinstance(birth_date, str):
        birth_date = datetime.strptime(birth_date, '%Y-%m-%d').date()
    today = datetime.now().date()
    return today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))

# Monkey-patch method onto the class (safe if already present)
ClinicalNotesGenerator._calculate_age = _cn_calculate_age



In [None]:
# Helper methods for ClinicalNotesGenerator

def _generate_hpi(self, patient: Dict, encounter: Dict, diagnoses: List[Dict]) -> str:
    """Generate history of present illness with richer variety (severity, negation, and context)."""
    age_str = f"{self._calculate_age(patient['date_of_birth'])}-year-old"
    gender_str = "male" if patient['gender'] == 'M' else "female"

    # Duration and symptom set
    duration = random.choice(['1 day', '2 days', '3 days', '4 days', '1 week', '10 days', '2 weeks'])
    symptoms = random.sample(self.pediatric_symptoms, random.randint(2, 4))

    # Add severity and occasional negations
    severity_terms = ['mild', 'moderate', 'severe', 'intermittent', 'persistent', 'worsening', 'improving']
    def describe_symptom(sym: str) -> str:
        if random.random() < 0.2:
            return f"denies {sym}"
        if random.random() < 0.6:
            return f"{random.choice(severity_terms)} {sym}"
        return sym
    symptoms_desc = [describe_symptom(s) for s in symptoms]

    hpi = f"This is a {age_str} {gender_str} who presents with {encounter.get('chief_complaint', 'symptoms')} "
    hpi += f"for the past {duration}. "

    if symptoms_desc:
        if len(symptoms_desc) == 1:
            hpi += f"Associated symptom includes {symptoms_desc[0]}. "
        else:
            hpi += f"Associated symptoms include {', '.join(symptoms_desc[:-1])}, and {symptoms_desc[-1]}. "

    # Tie to known diagnoses when available
    if diagnoses and random.random() < 0.6:
        dx_names = [dx.get('description') or dx.get('diagnosis_description') for dx in diagnoses[:3] if (dx.get('description') or dx.get('diagnosis_description'))]
        if dx_names:
            hpi += f"History notable for {', '.join(n.lower() for n in dx_names)}. "

    # Add an additional relevant negative
    hpi += f"Patient denies {random.choice(['chest pain', 'shortness of breath', 'syncope', 'fever and chills'])}. "

    return hpi

def _generate_physical_exam(self) -> str:
    """Generate a complete physical examination with more variability."""
    exam_sections = []

    for system, findings in self.physical_exam_findings.items():
        # Bias vital signs away from always 'stable'
        if system == 'vital_signs' and random.random() < 0.5:
            finding = random.choice(['elevated temperature', 'tachycardic', 'tachypneic', 'normal for age'])
        else:
            finding = random.choice(findings)
        if system == 'general':
            exam_sections.append(f"GENERAL: Patient is {finding}")
        elif system == 'vital_signs':
            exam_sections.append(f"VITAL SIGNS: {finding}")
        else:
            exam_sections.append(f"{system.upper()}: {finding.capitalize()}")

    return '\n'.join(exam_sections)

def _generate_assessment_plan(self, diagnoses: List[Dict]) -> str:
    """Generate assessment and plan based on diagnoses."""
    if not diagnoses:
        return "1. Well child visit\n   - Continue routine care\n   - Follow up as scheduled"
    
    assessment_plan = ""
    for i, dx in enumerate(diagnoses[:3], 1):  # Limit to top 3 diagnoses
        if dx['icd10_code'] in self.assessment_plans:
            ap = self.assessment_plans[dx['icd10_code']]
            assessment_plan += f"{i}. {ap['assessment']}\n"
            for plan_item in ap['plan']:
                assessment_plan += f"   - {plan_item}\n"
        else:
            assessment_plan += f"{i}. {dx['description']}\n"
            assessment_plan += f"   - Monitor symptoms\n   - Follow up as needed\n"
    
    return assessment_plan

def _generate_hospital_course(self, patient: Dict, encounter: Dict, diagnoses: List[Dict]) -> str:
    """Generate hospital course narrative."""
    age_str = f"{self._calculate_age(patient['date_of_birth'])}-year-old"
    gender_str = "male" if patient['gender'] == 'M' else "female"
    
    primary_dx = diagnoses[0]['description'] if diagnoses else 'observation'
    
    course = f"Patient is a {age_str} {gender_str} admitted for {primary_dx}. "
    course += "Hospital course was uncomplicated. "
    course += "Patient responded well to treatment with improvement in symptoms. "
    course += "Vital signs remained stable throughout admission. "
    course += "Patient tolerated diet and medications without difficulty."
    
    return course

def _format_discharge_medications(self, medications: List[Dict]) -> str:
    """Format medications for discharge summary."""
    if not medications:
        return "No discharge medications"
    
    med_list = []
    for i, med in enumerate(medications[:5], 1):  # Limit to 5 medications
        med_str = f"{i}. {med['medication_name']} {med['dosage']} {med['route']} {med['frequency']}"
        med_list.append(med_str)
    
    return '\n'.join(med_list)

def _format_secondary_diagnoses(self, diagnoses: List[Dict]) -> str:
    """Format secondary diagnoses."""
    if not diagnoses:
        return ""
    
    secondary = []
    for dx in diagnoses[:3]:  # Limit to 3 secondary diagnoses
        secondary.append(f"Secondary: {dx['description']} ({dx['icd10_code']})")
    
    return '\n'.join(secondary)

def _get_consult_reason(self, specialty: str, diagnoses: List[Dict]) -> str:
    """Get reason for consultation based on specialty."""
    consult_reasons = {
        'Cardiology': 'Evaluation of cardiac murmur',
        'Neurology': 'Evaluation of seizure disorder',
        'Pulmonology': 'Management of asthma',
        'Gastroenterology': 'Evaluation of chronic abdominal pain',
        'Endocrinology': 'Management of Type 1 diabetes',
        'Nephrology': 'Evaluation of proteinuria',
        'Rheumatology': 'Evaluation of joint pain',
        'Dermatology': 'Evaluation of chronic rash',
        'Psychiatry': 'Evaluation of mood disorder'
    }
    
    return consult_reasons.get(specialty, f"{specialty} evaluation requested")

def _get_specialty_recommendations(self, specialty: str, age: int) -> str:
    """Get specialty-specific recommendations."""
    base_recs = [
        "1. Will follow in clinic",
        "2. Additional testing as outlined above",
        "3. Please continue current management"
    ]
    
    specialty_recs = {
        'Cardiology': ["Consider echocardiogram", "EKG in AM"],
        'Neurology': ["EEG to be scheduled", "Consider MRI brain"],
        'Pulmonology': ["Pulmonary function tests", "Consider controller therapy"],
        'Endocrinology': ["Check HbA1c", "Nutrition counseling"]
    }
    
    recs = base_recs.copy()
    if specialty in specialty_recs:
        recs.extend([f"{i+4}. {rec}" for i, rec in enumerate(specialty_recs[specialty])])
    
    return '\n'.join(recs)

def _generate_pmh(self, age: int) -> str:
    """Generate past medical history."""
    if age < 2:
        return "Born full term via normal vaginal delivery. No complications."
    elif age < 10:
        return "Previously healthy child with normal development."
    else:
        return "No significant past medical history."

def _generate_focused_exam(self, specialty: str) -> str:
    """Generate specialty-focused examination."""
    if specialty == 'Cardiology':
        return "Heart: Regular rate and rhythm, no murmurs, rubs, or gallops. Good perfusion."
    elif specialty == 'Neurology':
        return "Neuro: Alert and oriented, cranial nerves II-XII intact, normal tone and strength."
    elif specialty == 'Pulmonology':
        return "Lungs: Clear to auscultation bilaterally, no wheezes or rales, good air movement."
    else:
        return self._generate_physical_exam()

def _generate_specialty_assessment(self, specialty: str, diagnoses: List[Dict]) -> str:
    """Generate specialty-specific assessment."""
    if diagnoses and diagnoses[0]['description']:
        return f"findings are consistent with {diagnoses[0]['description']}"
    else:
        return "no acute pathology identified at this time"

def _generate_abnormal_impression(self, study_type: str, age: int) -> str:
    """Generate abnormal radiology impression."""
    abnormal_findings = {
        'chest_xray': [
            'Right lower lobe pneumonia',
            'Mild peribronchial thickening consistent with viral illness',
            'Small pleural effusion'
        ],
        'abdominal_xray': [
            'Moderate stool burden',
            'Nonspecific bowel gas pattern',
            'No evidence of obstruction'
        ],
        'brain_ct': [
            'No acute intracranial abnormality',
            'Mild sinusitis',
            'Small amount of fluid in mastoid air cells'
        ]
    }
    
    findings = abnormal_findings.get(study_type, ['Findings as above'])
    return random.choice(findings)

def _format_findings(self, findings: List[str]) -> str:
    """Format radiology findings into numbered list."""
    formatted = []
    for i, finding in enumerate(findings, 1):
        formatted.append(f"{i}. {finding.capitalize()}")
    return '\n'.join(formatted)

# Add all helper methods to ClinicalNotesGenerator
ClinicalNotesGenerator._generate_hpi = _generate_hpi
ClinicalNotesGenerator._generate_physical_exam = _generate_physical_exam
ClinicalNotesGenerator._generate_assessment_plan = _generate_assessment_plan
ClinicalNotesGenerator._generate_hospital_course = _generate_hospital_course
ClinicalNotesGenerator._format_discharge_medications = _format_discharge_medications
ClinicalNotesGenerator._format_secondary_diagnoses = _format_secondary_diagnoses
ClinicalNotesGenerator._get_consult_reason = _get_consult_reason
ClinicalNotesGenerator._get_specialty_recommendations = _get_specialty_recommendations
ClinicalNotesGenerator._generate_pmh = _generate_pmh
ClinicalNotesGenerator._generate_focused_exam = _generate_focused_exam
ClinicalNotesGenerator._generate_specialty_assessment = _generate_specialty_assessment
ClinicalNotesGenerator._generate_abnormal_impression = _generate_abnormal_impression
ClinicalNotesGenerator._format_findings = _format_findings


In [None]:
# TCHDataGenerationOrchestrator class
class TCHDataGenerationOrchestrator:
    """Orchestrates the generation of all TCH PoC data."""
    
    def __init__(self, output_dir: str = "/tmp/tch_data_generation", seed: int = 42, compress_files: bool = False):
        """Initialize the data generation orchestrator."""
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.compress_files = compress_files
        
        # Create subdirectories for different data types
        self.structured_dir = self.output_dir / "structured"
        self.unstructured_dir = self.output_dir / "unstructured"
        self.structured_dir.mkdir(exist_ok=True)
        self.unstructured_dir.mkdir(exist_ok=True)
        
        # Initialize generators
        self.pediatric_generator = PediatricDataGenerator(seed=seed)
        self.notes_generator = ClinicalNotesGenerator(seed=seed)
        
        compression_note = " (with gzip compression)" if compress_files else ""
        print(f"Data generation output directory: {self.output_dir.absolute()}{compression_note}")
    
    def _save_to_csv(self, data: List[Dict], filename: str, columns_order: Optional[List[str]] = None):
        """Save data to CSV file with proper handling of datetime objects.
        If columns_order is provided, enforce exact column order and drop extras to match ingest SQL.
        """
        if not data:
            print(f"   Warning: No data to save for {filename}")
            return
        
        # Add .gz extension if compression is enabled
        if self.compress_files:
            if not filename.endswith('.gz'):
                filename = filename + '.gz'
        
        filepath = self.structured_dir / filename
        
        # Convert data to DataFrame for better CSV handling
        df = pd.DataFrame(data)
        
        # Enforce exact column order if requested
        if columns_order is not None:
            for col in columns_order:
                if col not in df.columns:
                    df[col] = None
            df = df.reindex(columns=columns_order)
        
        # Convert datetime and date columns to strings
        for col in df.columns:
            if df[col].dtype == 'object':
                sample_series = df[col].dropna()
                sample_val = sample_series.iloc[0] if not sample_series.empty else None
                if isinstance(sample_val, (datetime, pd.Timestamp)):
                    df[col] = pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d %H:%M:%S')
                elif hasattr(sample_val, 'strftime'):
                    df[col] = df[col].apply(lambda x: x.strftime('%Y-%m-%d') if pd.notna(x) else None)
        
        # Save with or without compression
        if self.compress_files:
            with gzip.open(filepath, 'wt', encoding='utf-8') as f:
                df.to_csv(f, index=False, quoting=csv.QUOTE_NONNUMERIC)
        else:
            df.to_csv(filepath, index=False, quoting=csv.QUOTE_NONNUMERIC)
        
        file_size = filepath.stat().st_size / (1024 * 1024)  # Size in MB
        compression_note = " (compressed)" if self.compress_files else ""
        print(f"   Saved {len(data):,} records to {filename} ({file_size:.1f} MB){compression_note}")
    
    def _save_text_file(self, content: str, filename: str, subdir: str = ""):
        """Save text content to file."""
        if subdir:
            target_dir = self.unstructured_dir / subdir
            target_dir.mkdir(exist_ok=True)
        else:
            target_dir = self.unstructured_dir
        
        # Add .gz extension if compression is enabled
        if self.compress_files:
            if not filename.endswith('.gz'):
                filename = filename + '.gz'
        
        filepath = target_dir / filename
        
        # Save with or without compression
        if self.compress_files:
            with gzip.open(filepath, 'wt', encoding='utf-8') as f:
                f.write(content)
        else:
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(content)


In [None]:
# Data generation methods for TCHDataGenerationOrchestrator

def _generate_imaging_studies(self, encounters: List[Dict], patients: List[Dict]) -> List[Dict]:
    """Generate imaging study records."""
    imaging_studies = []
    study_id = 1
    
    # Create patient lookup
    patient_lookup = {p['patient_id']: p for p in patients}
    
    # Common pediatric imaging studies
    study_types = [
        ('chest_xray', 'Chest X-ray', 0.15),
        ('abdominal_xray', 'Abdominal X-ray', 0.08),
        ('brain_mri', 'Brain MRI', 0.03),
        ('brain_ct', 'Brain CT', 0.05),
        ('ultrasound_abdomen', 'Abdominal Ultrasound', 0.06),
        ('echo', 'Echocardiogram', 0.04)
    ]
    
    for encounter in encounters:
        patient = patient_lookup[encounter['patient_id']]
        
        # Determine if imaging is needed based on encounter type and department
        imaging_probability = 0.1  # Base 10% chance
        
        if encounter['encounter_type'] == 'Emergency':
            imaging_probability = 0.25
        elif encounter['department'] in ['Pediatric ICU', 'NICU']:
            imaging_probability = 0.4
        elif encounter['department'] in ['Cardiology', 'Pulmonology', 'Neurology']:
            imaging_probability = 0.3
        
        if random.random() < imaging_probability:
            # Select appropriate study type
            study_code, study_name, _ = random.choice(study_types)
            
            imaging_study = {
                'imaging_study_id': f"IMG-{study_id:08d}",
                'encounter_id': encounter['encounter_id'],
                'patient_id': encounter['patient_id'],
                'study_type': study_code,
                'study_name': study_name,
                'study_date': encounter['encounter_date'] + timedelta(hours=random.randint(1, 24)),
                'ordering_provider': encounter['attending_physician'],
                'performing_department': 'Radiology',
                'study_status': random.choice(['Completed', 'Preliminary', 'Final']),
                'modality': self._get_modality_for_study(study_code),
                'body_part': self._get_body_part_for_study(study_code),
                'created_date': encounter['encounter_date'],
                'updated_date': datetime.now()
            }
            imaging_studies.append(imaging_study)
            study_id += 1
    
    return imaging_studies

def _get_modality_for_study(self, study_type: str) -> str:
    """Get imaging modality for study type."""
    modality_map = {
        'chest_xray': 'XR',
        'abdominal_xray': 'XR',
        'brain_mri': 'MR',
        'brain_ct': 'CT',
        'ultrasound_abdomen': 'US',
        'echo': 'US'
    }
    return modality_map.get(study_type, 'XR')

def _get_body_part_for_study(self, study_type: str) -> str:
    """Get body part for study type."""
    body_part_map = {
        'chest_xray': 'Chest',
        'abdominal_xray': 'Abdomen',
        'brain_mri': 'Brain',
        'brain_ct': 'Brain',
        'ultrasound_abdomen': 'Abdomen',
        'echo': 'Heart'
    }
    return body_part_map.get(study_type, 'Chest')

def _generate_providers(self) -> List[Dict]:
    """Generate provider/physician data."""
    providers = []
    
    # Specialties at TCH
    specialties = [
        'General Pediatrics', 'Emergency Medicine', 'Pediatric Critical Care',
        'Neonatology', 'Cardiology', 'Neurology', 'Oncology', 'Orthopedics',
        'Pulmonology', 'Gastroenterology', 'Endocrinology', 'Nephrology',
        'Rheumatology', 'Dermatology', 'Ophthalmology', 'ENT', 'Psychiatry',
        'Adolescent Medicine', 'Radiology', 'Pathology', 'Anesthesiology'
    ]
    
    provider_id = 1
    for specialty in specialties:
        # Generate 10-20 providers per specialty
        num_providers = random.randint(10, 20)
        for _ in range(num_providers):
            provider = {
                'provider_id': f"PROV-{provider_id:06d}",
                'npi': f"{random.randint(1000000000, 9999999999)}",
                'first_name': self.pediatric_generator.fake.first_name(),
                'last_name': self.pediatric_generator.fake.last_name(),
                'specialty': specialty,
                'department': specialty,
                'credentials': random.choice(['MD', 'DO', 'MD, PhD']),
                'status': random.choice(['Active', 'Active', 'Active', 'Inactive']),  # Mostly active
                'hire_date': self.pediatric_generator.fake.date_between(start_date='-20y', end_date='-1y'),
                'created_date': datetime.now() - timedelta(days=random.randint(30, 1000)),
                'updated_date': datetime.now()
            }
            providers.append(provider)
            provider_id += 1
    
    return providers

def _generate_departments(self) -> List[Dict]:
    """Generate department/service line data."""
    departments = []
    
    dept_info = [
        ('Emergency Department', 'ED', 'Emergency Medicine'),
        ('Pediatric ICU', 'PICU', 'Critical Care'),
        ('NICU', 'NICU', 'Neonatology'),
        ('General Pediatrics', 'PEDS', 'Ambulatory'),
        ('Cardiology', 'CARDS', 'Specialty'),
        ('Neurology', 'NEURO', 'Specialty'),
        ('Oncology', 'ONCO', 'Specialty'),
        ('Orthopedics', 'ORTHO', 'Specialty'),
        ('Pulmonology', 'PULM', 'Specialty'),
        ('Gastroenterology', 'GI', 'Specialty'),
        ('Endocrinology', 'ENDO', 'Specialty'),
        ('Nephrology', 'NEPHRO', 'Specialty'),
        ('Radiology', 'RAD', 'Ancillary'),
        ('Laboratory', 'LAB', 'Ancillary'),
        ('Pharmacy', 'PHARM', 'Ancillary')
    ]
    
    for i, (name, code, service_line) in enumerate(dept_info, 1):
        department = {
            'department_id': f"DEPT-{i:03d}",
            'department_name': name,
            'department_code': code,
            'service_line': service_line,
            'location': random.choice(['Main Campus', 'West Campus', 'The Woodlands']),
            'status': 'Active',
            'created_date': datetime.now() - timedelta(days=random.randint(100, 2000)),
            'updated_date': datetime.now()
        }
        departments.append(department)
    
    return departments

def _generate_clinical_notes(self, encounters: List[Dict], patients: List[Dict], 
                           diagnoses: List[Dict], medications: List[Dict]) -> List[Dict]:
    """Generate clinical notes and documentation."""
    clinical_notes = []
    
    # Create lookups for efficient access
    patient_lookup = {p['patient_id']: p for p in patients}
    encounter_diagnoses = {}
    encounter_medications = {}
    
    for dx in diagnoses:
        if dx['encounter_id'] not in encounter_diagnoses:
            encounter_diagnoses[dx['encounter_id']] = []
        encounter_diagnoses[dx['encounter_id']].append(dx)
    
    for med in medications:
        if med['encounter_id'] not in encounter_medications:
            encounter_medications[med['encounter_id']] = []
        encounter_medications[med['encounter_id']].append(med)
    
    # Generate notes for subset of encounters (performance consideration)
    sample_encounters = random.sample(encounters, min(len(encounters), 100000))
    
    for i, encounter in enumerate(sample_encounters):
        if i % 10000 == 0:
            print(f"    Generating notes for encounter {i+1:,} of {len(sample_encounters):,}", flush=True)
        
        patient = patient_lookup[encounter['patient_id']]
        # Ensure 'age' exists to prevent downstream KeyError
        if 'age' not in patient:
            try:
                dob = patient['date_of_birth']
                dob_date = datetime.strptime(dob, '%Y-%m-%d').date() if isinstance(dob, str) else dob
                today = datetime.now().date()
                computed_age = today.year - dob_date.year - ((today.month, today.day) < (dob_date.month, dob_date.day))
            except Exception:
                computed_age = 0
            patient = dict(patient)
            patient['age'] = computed_age
        enc_diagnoses = encounter_diagnoses.get(encounter['encounter_id'], [])
        enc_medications = encounter_medications.get(encounter['encounter_id'], [])
        
        # Generate different types of notes based on encounter
        note_types_to_generate = ['progress']
        
        if encounter['encounter_type'] == 'Inpatient':
            note_types_to_generate.extend(['nursing', 'discharge'])
        
        if encounter['encounter_type'] == 'Emergency':
            note_types_to_generate.append('nursing')
        
        if encounter['department'] in ['Cardiology', 'Neurology', 'Pulmonology']:
            note_types_to_generate.append('consultation')
        
        # Generate each type of note
        for note_type in note_types_to_generate:
            try:
                if note_type == 'progress':
                    note = self.notes_generator.generate_progress_note(patient, encounter, enc_diagnoses)
                elif note_type == 'nursing':
                    note = self.notes_generator.generate_nursing_note(patient, encounter)
                elif note_type == 'discharge':
                    note = self.notes_generator.generate_discharge_summary(patient, encounter, enc_diagnoses, enc_medications)
                elif note_type == 'consultation':
                    note = self.notes_generator.generate_consultation_note(patient, encounter, encounter['department'], enc_diagnoses)
                
                # Ensure metadata fields required by ingest CSV
                note['department'] = encounter.get('department')
                diag_codes = []
                for dx in enc_diagnoses:
                    code_val = dx.get('diagnosis_code') or dx.get('icd10_code')
                    if code_val:
                        diag_codes.append(code_val)
                note['diagnosis_codes'] = json.dumps(diag_codes)
                
                clinical_notes.append(note)
                
                # Save note content to individual text file
                filename = f"note_{note['note_id']}.txt"
                self._save_text_file(note['note_content'], filename, "clinical_notes")
                
            except Exception as e:
                raise RuntimeError(f"Error generating {note_type} note for encounter {encounter['encounter_id']}: {e}")
    
    # Build and save clinical notes metadata (exclude note_content; enforce exact order)
    clinical_notes_meta = [
        {
            'note_id': n['note_id'],
            'patient_id': n['patient_id'],
            'encounter_id': n['encounter_id'],
            'note_type': n['note_type'],
            'note_date': n['note_date'],
            'author': n['author'],
            'department': n.get('department'),
            'diagnosis_codes': n.get('diagnosis_codes') or '[]',
            'created_date': n['created_date'],
            'updated_date': n['updated_date']
        }
        for n in clinical_notes
    ]
    self._save_to_csv(clinical_notes_meta, 'clinical_notes.csv', columns_order=[
        'note_id','patient_id','encounter_id','note_type','note_date','author','department','diagnosis_codes','created_date','updated_date'
    ])
    
    return clinical_notes

def _generate_radiology_reports(self, imaging_studies: List[Dict], patients: List[Dict], 
                              encounters: List[Dict]) -> List[Dict]:
    """Generate radiology reports."""
    radiology_reports = []
    
    # Create lookups
    patient_lookup = {p['patient_id']: p for p in patients}
    encounter_lookup = {e['encounter_id']: e for e in encounters}
    
    for study in imaging_studies:
        if study['study_status'] in ['Completed', 'Final']:
            try:
                patient = patient_lookup[study['patient_id']]
                encounter = encounter_lookup[study['encounter_id']]
                
                # Ensure 'age' exists for downstream generators
                if 'age' not in patient:
                    try:
                        dob = patient['date_of_birth']
                        dob_date = datetime.strptime(dob, '%Y-%m-%d').date() if isinstance(dob, str) else dob
                        today = datetime.now().date()
                        computed_age = today.year - dob_date.year - ((today.month, today.day) < (dob_date.month, dob_date.day))
                    except Exception:
                        computed_age = 0
                    patient = dict(patient)
                    patient['age'] = computed_age
                
                report = self.notes_generator.generate_radiology_report(
                    patient, encounter, study['study_type']
                )
                
                # Add study-specific information and metadata alignment
                report['imaging_study_id'] = study['imaging_study_id']
                report['study_type'] = study['study_type']
                report['department'] = encounter.get('department')
                
                radiology_reports.append(report)
                
                # Save report content to text file
                filename = f"radiology_{report['note_id']}.txt"
                self._save_text_file(report['note_content'], filename, "radiology_reports")
                
            except Exception as e:
                raise RuntimeError(f"Error generating radiology report for study {study['imaging_study_id']}: {e}")
    
    # Build and save radiology reports metadata (exclude note_content; enforce exact order)
    radiology_reports_meta = [
        {
            'note_id': r['note_id'],
            'patient_id': r['patient_id'],
            'encounter_id': r['encounter_id'],
            'imaging_study_id': r.get('imaging_study_id'),
            'note_type': r['note_type'],
            'study_type': r.get('study_type'),
            'note_date': r['note_date'],
            'author': r['author'],
            'department': r.get('department'),
            'created_date': r['created_date'],
            'updated_date': r['updated_date']
        }
        for r in radiology_reports
    ]
    self._save_to_csv(radiology_reports_meta, 'radiology_reports.csv', columns_order=[
        'note_id','patient_id','encounter_id','imaging_study_id','note_type','study_type','note_date','author','department','created_date','updated_date'
    ])
    
    return radiology_reports

def _generate_metadata(self, stats: Dict[str, int], num_patients: int, encounters_per_patient: int):
    """Generate metadata about the dataset."""
    metadata = {
        'generated_date': datetime.now().isoformat(),
        'generator_version': '1.0.0',
        'target_patients': num_patients,
        'target_encounters_per_patient': encounters_per_patient,
        'actual_statistics': stats,
        'data_characteristics': {
            'age_distribution': 'Pediatric (0-21 years) with higher concentration in younger ages',
            'geographic_focus': 'Houston metropolitan area zip codes',
            'insurance_mix': 'Realistic pediatric insurance distribution (Medicaid, Commercial, CHIP)',
            'condition_prevalence': 'Matches real-world pediatric disease prevalence rates',
            'clinical_realism': 'Age-appropriate diagnoses, medications, and vital signs'
        },
        'file_structure': {
            'structured_data': str(self.structured_dir),
            'unstructured_data': str(self.unstructured_dir),
            'csv_files': list(self.structured_dir.glob("*.csv")),
            'text_files': {
                'clinical_notes': len(list((self.unstructured_dir / "clinical_notes").glob("*.txt"))),
                'radiology_reports': len(list((self.unstructured_dir / "radiology_reports").glob("*.txt")))
            }
        }
    }
    
    # Save metadata
    metadata_file = self.output_dir / "dataset_metadata.json"
    with open(metadata_file, 'w') as f:
        json.dump(metadata, f, indent=2, default=str)
    
    print(f"   Generated metadata file: {metadata_file}")

# Add methods to TCHDataGenerationOrchestrator
TCHDataGenerationOrchestrator._generate_imaging_studies = _generate_imaging_studies
TCHDataGenerationOrchestrator._get_modality_for_study = _get_modality_for_study
TCHDataGenerationOrchestrator._get_body_part_for_study = _get_body_part_for_study
TCHDataGenerationOrchestrator._generate_providers = _generate_providers
TCHDataGenerationOrchestrator._generate_departments = _generate_departments
TCHDataGenerationOrchestrator._generate_clinical_notes = _generate_clinical_notes
TCHDataGenerationOrchestrator._generate_radiology_reports = _generate_radiology_reports
TCHDataGenerationOrchestrator._generate_metadata = _generate_metadata


In [None]:
# Switch between parallel and sequential based on CLI arg 'parallel=true|false'
use_parallel = True
try:
    # sys.argv parsed earlier; check for key
    for arg in sys.argv:
        if isinstance(arg, str) and arg.startswith('parallel='):
            use_parallel = arg.split('=',1)[1].lower() == 'true'
            break
except Exception:
    use_parallel = True
logger.info(f'Parallel flag: {use_parallel}')



In [None]:
# Parallelize independent generation steps and shard CSVs for faster ingest
from concurrent.futures import ThreadPoolExecutor
from typing import List

def _save_df_sharded(self, df: pd.DataFrame, base_name: str, rows_per_file: int | None = None) -> List[Path]:
    """Save a DataFrame into multiple shard files to improve COPY parallelism."""
    shard_paths: List[Path] = []
    if df is None or df.empty:
        return shard_paths
    # Determine shard size from arg or instance default
    rows_per_file = rows_per_file or getattr(self, 'rows_per_file', 25000)
    # Ensure base_name has .csv.gz when compression enabled
    compress_ext = '.csv.gz' if self.compress_files else '.csv'
    # Slice into shards
    for i in range(0, len(df), rows_per_file):
        shard_idx = i // rows_per_file
        shard_name = f"{base_name}_part_{shard_idx:03d}{compress_ext}"
        shard_path = self.structured_dir / shard_name
        if self.compress_files:
            with gzip.open(shard_path, 'wt', encoding='utf-8') as f:
                df.iloc[i:i+rows_per_file].to_csv(f, index=False, quoting=csv.QUOTE_NONNUMERIC)
        else:
            df.iloc[i:i+rows_per_file].to_csv(shard_path, index=False, quoting=csv.QUOTE_NONNUMERIC)
        shard_paths.append(shard_path)
    print(f"   Saved {len(shard_paths)} shards for {base_name}")
    return shard_paths

# Monkey-patch onto class for use below
TCHDataGenerationOrchestrator._save_df_sharded = _save_df_sharded



In [None]:
# Orchestrate parallel generation and shard+upload in parallel

def _generate_parallel_and_upload(self, num_patients: int, encounters_per_patient: int = 5):
    # 1) Generate prerequisites
    print("\n1. Generating patient demographics...")
    logger.info('Step 1 start: generating patients')
    patients = self.pediatric_generator.generate_patient_demographics(num_patients)
    df_patients = pd.DataFrame(patients)
    self._save_df_sharded(df_patients, 'patients', rows_per_file=ROWS_PER_FILE)
    logger.info(f"Step 1 end: {len(df_patients)} patients")

    print("\n2. Generating encounters...")
    logger.info('Step 2 start: generating encounters')
    encounters = self.pediatric_generator.generate_encounters(patients, encounters_per_patient)
    df_encounters = pd.DataFrame(encounters)
    self._save_df_sharded(df_encounters, 'encounters', rows_per_file=ROWS_PER_FILE)
    logger.info(f"Step 2 end: {len(df_encounters)} encounters")

    # 2) Parallelize independent steps
    def gen_diagnoses():
        diags = self.pediatric_generator.generate_diagnoses(encounters)
        df = pd.DataFrame(diags)
        self._save_df_sharded(df, 'diagnoses', rows_per_file=ROWS_PER_FILE)
        return df

    def gen_vitals():
        vitals = self.pediatric_generator.generate_vital_signs(encounters, patients)
        df = pd.DataFrame(vitals)
        self._save_df_sharded(df, 'vital_signs', rows_per_file=ROWS_PER_FILE)
        return df

    def gen_labs():
        labs = self.pediatric_generator.generate_lab_results(encounters, patients)
        df = pd.DataFrame(labs)
        self._save_df_sharded(df, 'lab_results', rows_per_file=ROWS_PER_FILE)
        return df

    def gen_imaging():
        imgs = self._generate_imaging_studies(encounters, patients)
        df = pd.DataFrame(imgs)
        self._save_df_sharded(df, 'imaging_studies', rows_per_file=ROWS_PER_FILE)
        return df

    print("\n3-6. Generating diagnoses, vitals, labs, imaging in parallel...")
    logger.info('Steps 3-6 start: parallel gen (diagnoses,vitals,labs,imaging)')
    with ThreadPoolExecutor(max_workers=4) as pool:
        f_diags = pool.submit(gen_diagnoses)
        f_vitals = pool.submit(gen_vitals)
        f_labs = pool.submit(gen_labs)
        f_imgs = pool.submit(gen_imaging)
        diagnoses_df = f_diags.result()
        vitals_df = f_vitals.result()
        labs_df = f_labs.result()
        imaging_df = f_imgs.result()
    logger.info(f"Steps 3-6 end: diags={len(diagnoses_df)}, vitals={len(vitals_df)}, labs={len(labs_df)}, imaging={len(imaging_df)}")

    # 3) Dependent steps
    print("\n7. Generating medications...")
    logger.info('Step 7 start: medications')
    medications = self.pediatric_generator.generate_medications(encounters, diagnoses_df.to_dict('records'))
    self._save_df_sharded(pd.DataFrame(medications), 'medications', rows_per_file=ROWS_PER_FILE)
    logger.info(f"Step 7 end: meds={len(medications)}")

    print("\n8. Generating providers and departments...")
    logger.info('Step 8 start: providers/departments')
    providers = self._generate_providers()
    departments = self._generate_departments()
    self._save_df_sharded(pd.DataFrame(providers), 'providers', rows_per_file=ROWS_PER_FILE)
    self._save_df_sharded(pd.DataFrame(departments), 'departments', rows_per_file=ROWS_PER_FILE)
    logger.info(f"Step 8 end: providers={len(providers)}, departments={len(departments)}")

    logger.info('Step 9 start: generating unstructured documents')
    print("\n9. Generating clinical notes and radiology reports (content files still per-doc)...")
    clinical_notes = self._generate_clinical_notes(encounters, patients, diagnoses_df.to_dict('records'), medications)
    radiology_reports = self._generate_radiology_reports(imaging_df.to_dict('records'), patients, encounters)
    logger.info('Step 9 end: unstructured documents generated')
    # Build metadata-only rows (exclude note_content) to match ingest schema
    clinical_notes_meta = [
        {
            'note_id': n['note_id'],
            'patient_id': n['patient_id'],
            'encounter_id': n['encounter_id'],
            'note_type': n['note_type'],
            'note_date': n['note_date'],
            'author': n['author'],
            'department': n.get('department'),
            'diagnosis_codes': n.get('diagnosis_codes') or '[]',
            'created_date': n['created_date'],
            'updated_date': n['updated_date']
        }
        for n in clinical_notes
    ]
    radiology_reports_meta = [
        {
            'note_id': r['note_id'],
            'patient_id': r['patient_id'],
            'encounter_id': r['encounter_id'],
            'imaging_study_id': r.get('imaging_study_id'),
            'note_type': r['note_type'],
            'study_type': r.get('study_type'),
            'note_date': r['note_date'],
            'author': r['author'],
            'department': r.get('department'),
            'created_date': r['created_date'],
            'updated_date': r['updated_date']
        }
        for r in radiology_reports
    ]
    # Metadata sharding
    self._save_df_sharded(pd.DataFrame(clinical_notes_meta), 'clinical_notes', rows_per_file=ROWS_PER_FILE)
    self._save_df_sharded(pd.DataFrame(radiology_reports_meta), 'radiology_reports', rows_per_file=ROWS_PER_FILE)

    # 4) Parallel uploads for all CSVs (already sharded)
    csv_files = list(self.structured_dir.glob('*.csv')) + list(self.structured_dir.glob('*.csv.gz'))
    print(f"Found {len(csv_files)} structured data shard files to upload")

    # Use a single glob PUT for structured shards too, to reduce per-file overhead
    session.file.put(str(self.structured_dir / ('*.csv.gz' if self.compress_files else '*.csv')),
                     STRUCTURED_STAGE,
                     auto_compress=False,
                     overwrite=True,
                     parallel=64)

    # 5) Immediately upload unstructured text files via single glob PUTs (avoid per-file loops)
    try:
        notes_dir = self.unstructured_dir / 'clinical_notes'
        rad_dir = self.unstructured_dir / 'radiology_reports'
        note_glob = '*.txt.gz' if self.compress_files else '*.txt'
        notes_count = len(list(notes_dir.glob(note_glob))) if notes_dir.exists() else 0
        rad_count = len(list(rad_dir.glob(note_glob))) if rad_dir.exists() else 0
        print(f"Found {notes_count} clinical notes and {rad_count} radiology reports to upload", flush=True)
        if notes_count:
            session.file.put(str(notes_dir / note_glob), f"{UNSTRUCTURED_STAGE}/clinical_notes/", auto_compress=False, overwrite=True, parallel=64)
        if rad_count:
            session.file.put(str(rad_dir / note_glob), f"{UNSTRUCTURED_STAGE}/radiology_reports/", auto_compress=False, overwrite=True, parallel=64)
    except Exception as ue:
        print(f"Unstructured upload error: {ue}")

    return {
        'patients': len(df_patients),
        'encounters': len(df_encounters),
        'diagnoses': len(diagnoses_df),
        'lab_results': len(labs_df),
        'vital_signs': len(vitals_df),
        'imaging_studies': len(imaging_df)
    }

# Attach method
TCHDataGenerationOrchestrator._generate_parallel_and_upload = _generate_parallel_and_upload



In [None]:
# Decide mode based on CLI arg 'parallel=true|false' (default true)
use_parallel = PARALLEL
logger.info(f'Parallel flag: {use_parallel}')

# Ensure orchestrator is initialized before use
try:
    orchestrator
except NameError:
    orchestrator = TCHDataGenerationOrchestrator(
        output_dir=str(TEMP_DIR),
        seed=42,
        compress_files=COMPRESS_FILES
    )

if use_parallel:
    print('Using parallel generation with sharded uploads...', flush=True)
    logger.info('Mode: parallel')
    _ = orchestrator._generate_parallel_and_upload(NUM_PATIENTS, ENCOUNTERS_PER_PATIENT)
else:
    print('Using sequential generation (compat mode)...', flush=True)
    logger.info('Mode: sequential')
    # Sequential path is not supported in this notebook run; fail fast to avoid ambiguity
    raise RuntimeError('Sequential generation path is disabled. Pass parallel=true.')



In [None]:
# Verify uploaded files and provide final status
print("\n📋 Verifying uploaded files...", flush=True)

# Check structured data stage
print("\nStructured data files:", flush=True)
list_query = f"LIST {STRUCTURED_STAGE};"
result = session.sql(list_query).collect()
structured_count = len(result)
print(f"  Found {structured_count} files in structured stage", flush=True)

# Check unstructured data stage
print("\nUnstructured data files:", flush=True)
list_query = f"LIST {UNSTRUCTURED_STAGE};"
result = session.sql(list_query).collect()
unstructured_count = len(result)
print(f"  Found {unstructured_count} files in unstructured stage")

print(f"\n✅ Data generation and upload completed successfully!")
print(f"\n📊 Summary:")
print(f"  - Generated {NUM_PATIENTS:,} patients")
print(f"  - Created {structured_count} structured data files")
print(f"  - Created {unstructured_count} unstructured data files")

print(f"\n🎯 Next Steps:")
print(f"  1. Run the data ingestion SQL scripts to load data into raw tables")
print(f"  2. Deploy Dynamic Tables to create the conformed layer")
print(f"  3. Deploy presentation layer views")
print(f"  4. Setup Cortex Search services")
print(f"  5. Deploy the Streamlit application")

# Optional: Clean up temporary files
# Uncomment the following lines if you want to clean up local files after upload
# print("\n🧹 Cleaning up temporary files...")
# shutil.rmtree(TEMP_DIR)
# print("✅ Cleanup complete")
