In [1]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests
import json
import os
from tqdm import tqdm
from dotenv import load_dotenv
import os 

# Load environment variables
load_dotenv()

False

In [None]:
# Configuration
API_KEY = os.getenv('DEEPSEEK_API_KEY', "sk-1718400f70c641058c6343acb0363e5a")  # Masked DeepSeek API key
API_URL = "https://api.deepseek.com/v1/chat/completions"
CACHE_FILE = "clinical_notes_cache.json"
MAX_WORKERS = 5  # For parallel API calls
RATE_LIMIT_DELAY = 0.2  # Seconds between API calls

# Disease profiles
DISEASE_PROFILES = {
    "Diabetes": {
        "glucose": (150, 400),
        "bp_systolic": (110, 130),
        "bp_diastolic": (70, 85),
        "spo2": (95, 100),
        "temperature": (36.0, 37.2),
        "heart_rate": (60, 100)
    },
    "Pneumonia": {
        "glucose": (70, 120),
        "bp_systolic": (110, 130),
        "bp_diastolic": (70, 85),
        "spo2": (85, 94),
        "temperature": (37.6, 39.5),
        "heart_rate": (80, 120)
    },
    "Hypertension": {
        "glucose": (70, 120),
        "bp_systolic": (140, 180),
        "bp_diastolic": (90, 110),
        "spo2": (95, 100),
        "temperature": (36.0, 37.2),
        "heart_rate": (60, 100)
    }
}

# Normal ranges for reference
NORMAL_RANGES = {
    "glucose": (70, 120),
    "bp_systolic": (110, 130),
    "bp_diastolic": (70, 85),
    "spo2": (95, 100),
    "temperature": (36.0, 37.2),
    "heart_rate": (60, 100)
}

def generate_patient_data(num_patients=50):
    """Generate simulated patient data for April 2025."""
    clinical_notes_cache = load_cache()
    patient_df = generate_patients_parallel(num_patients, clinical_notes_cache)
    save_cache(clinical_notes_cache)
    return patient_df

def generate_patients_parallel(num_patients, clinical_notes_cache):
    """Generate patient data with improved parallel processing."""
    patient_ids = [f"P{str(i+1).zfill(3)}" for i in range(num_patients)]
    base_dates = [datetime(2025, 4, 1) + timedelta(days=i) for i in range(0, 30, 3)]
    admission_dates = [random.choice(base_dates) + timedelta(hours=random.randint(0, 23)) 
                      for _ in range(num_patients)]
    diseases = random.choices(list(DISEASE_PROFILES.keys()), weights=[0.4, 0.3, 0.3], k=num_patients)
    
    args = [(pid, adm, dis, clinical_notes_cache) 
            for pid, adm, dis in zip(patient_ids, admission_dates, diseases)]
    
    all_patient_data = []
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(generate_single_patient_wrapper, arg): arg[0] for arg in args}
        
        for future in tqdm(as_completed(futures), total=num_patients, desc="Processing patients"):
            patient_id = futures[future]
            try:
                result = future.result()
                if not result:
                    print(f"Warning: No data generated for patient {patient_id}")
                all_patient_data.extend(result)
            except Exception as e:
                print(f"Unexpected error processing patient {patient_id}: {str(e)}")
    
    df = pd.DataFrame(all_patient_data)
    if not df.empty:
        df['date'] = pd.to_datetime(df['date'])
        df['admission_date'] = pd.to_datetime(df['admission_date'])
        df['discharge_date'] = pd.to_datetime(df['discharge_date'])
    
    return df

def generate_single_patient_wrapper(args):
    """Wrapper function for parallel processing with error handling."""
    try:
        return generate_single_patient(*args)
    except Exception as e:
        patient_id = args[0]
        print(f"Error generating patient {patient_id}: {str(e)}")
        with open("patient_errors.log", "a") as f:
            f.write(f"Error with patient {patient_id} at {datetime.now()}: {str(e)}\n")
        return []

def generate_single_patient(patient_id, admission_date, disease, clinical_notes_cache):
    """Generate data for a single patient with proper date bounds."""
    april_end = datetime(2025, 4, 30)
    if admission_date > april_end:
        return []
    
    patient_data = []
    los_days = max(1, min(int(np.random.gamma(shape=2, scale=2)), 30))
    discharge_day = los_days
    
    for day in range(5, los_days):
        if random.random() < 0.15:
            discharge_day = day
            break
    
    for day in range(discharge_day):
        if random.random() < 0.10:
            discharge_day = min(discharge_day + random.randint(1, 5), 30)
    
    discharge_date = min(admission_date + timedelta(days=discharge_day), april_end)
    current_date = admission_date
    day_counter = 0
    
    while current_date <= discharge_date and day_counter <= 30:
        vitals = generate_daily_vitals(disease, day_counter, discharge_day)
        note_key = f"{patient_id}_{current_date.strftime('%Y-%m-%d')}"
        clinical_note = get_clinical_note(
            patient_id, disease, day_counter, vitals, 
            discharge_day - day_counter, clinical_notes_cache
        )
        
        patient_data.append({
            "patient_id": patient_id,
            "date": current_date,
            "admission_date": admission_date,
            "discharge_date": discharge_date,
            "disease": disease,
            "day_of_stay": day_counter + 1,
            "glucose": vitals["glucose"],
            "bp_systolic": vitals["bp_systolic"],
            "bp_diastolic": vitals["bp_diastolic"],
            "spo2": vitals["spo2"],
            "temperature": vitals["temperature"],
            "heart_rate": vitals["heart_rate"],
            "clinical_note": clinical_note,
            "status": "Discharged" if current_date == discharge_date else "Admitted"
        })
        
        current_date += timedelta(days=1)
        day_counter += 1
    
    return patient_data

def generate_daily_vitals(disease, day, total_days):
    """Generate daily vitals with improved range handling and validation."""
    disease_profile = DISEASE_PROFILES[disease]
    vitals = {}
    improvement = day / total_days if total_days > 0 else 0
    
    for param in disease_profile:
        min_val, max_val = disease_profile[param]
        normal_min, normal_max = NORMAL_RANGES[param]
        
        if (min_val, max_val) != (normal_min, normal_max):
            if min_val > normal_max:  # High values (like glucose in diabetes)
                adjusted_min = min_val * (1 - improvement * random.uniform(0.1, 0.3))
                adjusted_min = max(normal_max, adjusted_min)
                adjusted_max = max_val * (1 - improvement * random.uniform(0.1, 0.3))
                adjusted_max = max(adjusted_min + 5, adjusted_max)
            else:  # Low values (like SpO2 in pneumonia)
                adjusted_min = min_val + (normal_min - min_val) * improvement * random.uniform(0.7, 1.0)
                adjusted_min = min(normal_min, adjusted_min)
                adjusted_max = max_val + (normal_max - max_val) * improvement * random.uniform(0.7, 1.0)
                adjusted_max = min(adjusted_min + 5, adjusted_max)
            
            adjusted_min = max(min_val, adjusted_min)
            adjusted_max = min(max_val, adjusted_max)
            value = random.uniform(adjusted_min, adjusted_max)
        else:
            value = random.uniform(normal_min, normal_max)
        
        if param in ["temperature", "glucose"]:
            value = round(value, 1)
        elif param in ["spo2"]:
            value = round(value)
        else:
            value = int(round(value))
        
        vitals[param] = value
    
    validate_vitals(vitals)
    return vitals

def validate_vitals(vitals):
    """Validate that all vitals are within medically plausible ranges."""
    errors = []
    
    if not 20 <= vitals['glucose'] <= 600:
        errors.append(f"Glucose out of range: {vitals['glucose']}")
    if not 50 <= vitals['bp_systolic'] <= 250:
        errors.append(f"Systolic BP out of range: {vitals['bp_systolic']}")
    if not 30 <= vitals['bp_diastolic'] <= 150:
        errors.append(f"Diastolic BP out of range: {vitals['bp_diastolic']}")
    if not 70 <= vitals['spo2'] <= 100:
        errors.append(f"SpO2 out of range: {vitals['spo2']}")
    if not 35.0 <= vitals['temperature'] <= 42.0:
        errors.append(f"Temperature out of range: {vitals['temperature']}")
    if not 30 <= vitals['heart_rate'] <= 220:
        errors.append(f"Heart rate out of range: {vitals['heart_rate']}")
    
    if errors:
        raise ValueError(f"Invalid vitals generated: {', '.join(errors)}")

def get_clinical_note(patient_id, disease, day, vitals, days_remaining, cache):
    """Get clinical note from cache or generate via API."""
    cache_key = f"{patient_id}_{day}"
    
    if cache_key in cache:
        return cache[cache_key]
    
    prompt = (
        f"Generate a concise (<25 words) clinical note for a {disease} patient. Based on their vital signs below:"
        f"Day {day+1} of hospitalization. Vitals: Glucose {vitals['glucose']}, "
        f"BP {vitals['bp_systolic']}/{vitals['bp_diastolic']}, "
        f"SpO2 {vitals['spo2']}%, Temp {vitals['temperature']}°C, "
        f"HR {vitals['heart_rate']}. {days_remaining} days remaining."
        f" Note on their alarming vital signs only"
    )
    
    time.sleep(RATE_LIMIT_DELAY)
    note = generate_note_via_api(prompt)
    cache[cache_key] = note
    return note

def generate_note_via_api(prompt, max_retries=3):
    """Generate clinical note using DeepSeek API with retries."""
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "model": "deepseek-reasoner",
        "messages": [
            {"role": "system", "content": "You are a helpful clinical assistant. Generate very concise medical notes under 25 words."},
            {"role": "user", "content": prompt}
        ],
        "max_tokens": 1000,
        "temperature": 1.1
    }
    
    for attempt in range(max_retries):
        try:
            response = requests.post(API_URL, headers=headers, json=payload)
            response.raise_for_status()
            note = response.json()["choices"][0]["message"]["content"].strip()
            return note if len(note.split()) <= 25 else ' '.join(note.split()[:25])
        except Exception as e:
            if attempt == max_retries - 1:
                print(f"API failed after {max_retries} attempts: {e}")
                return "Clinical note unavailable"
            time.sleep(1 * (attempt + 1))
    
    return "Clinical note unavailable"

def load_cache():
    """Load clinical notes cache from file with error handling."""
    try:
        if os.path.exists(CACHE_FILE):
            with open(CACHE_FILE, 'r') as f:
                return json.load(f)
    except (json.JSONDecodeError, IOError) as e:
        print(f"Error loading cache: {e}, starting with empty cache")
    return {}

def save_cache(cache):
    """Save clinical notes cache to file."""
    with open(CACHE_FILE, 'w') as f:
        json.dump(cache, f)

def save_to_csv(df, filename="hospital_patients_april2025.csv"):
    """Save DataFrame to CSV file."""
    df.to_csv(filename, index=False)
    print(f"Data saved to {filename}")

In [None]:
if __name__ == "__main__":
    print("Generating patient data...")
    patient_df = generate_patient_data(num_patients=10)
    
    if not patient_df.empty:
        save_to_csv(patient_df)
        print("Data generation complete.")
        print(f"Generated data for {patient_df['patient_id'].nunique()} patients")
        print(f"Time period: {patient_df['date'].min().date()} to {patient_df['date'].max().date()}")
        print(f"Disease distribution:\n{patient_df.groupby('disease')['patient_id'].nunique()}")
    else:
        print("Error: No patient data was generated")

Generating patient data...


Processing patients:  20%|██        | 2/10 [02:42<09:53, 74.22s/it] 

In [3]:
os.remove("clinical_notes_cache.json")
os.remove("hospital_patients_april2025.csv")