In [0]:
df=spark.table('patient_notes')
display(df)

patient_id,patient_name,lab_results,nursing_notes
101,John Doe,Troponin: Elevated (2025-11-10)|ECG: ST-segment elevation (2025-11-10)|BNP: High (2025-11-11),"Patient reports 8/10 chest pain, radiating to left arm.|Vitals stable post-aspirin.|Family anxious, provided update."
102,Jane Smith,X-Ray (Right Hip): Femoral neck fracture (2025-11-09)|Hgb: 9.5 g/dL (Low) (2025-11-10),Patient in significant pain pre-op.|Post-op: Vitals stable. Pain managed with PCA pump.|Physical therapy consult ordered for tomorrow.
103,Robert Brown,Sputum Culture: Pending (2025-11-11)|ABG: Respiratory acidosis (2025-11-11)|Chest X-Ray: Infiltrates in right lower lobe (2025-11-10),Patient has persistent cough and fever (101.5 F).|On 2L nasal cannula. O2 sats at 92%.|Started on IV antibiotics (Ceftriaxone).
104,Maria Garcia,Blood Glucose: 450 mg/dL (High) (2025-11-11)|Urinalysis: Ketones present (2025-11-11)|A1c: 11.2% (2025-11-11),Patient admitted with symptoms of DKA.|Started on insulin drip per protocol.|Diabetes educator consulted.
105,David Lee,"CT Head (Non-contrast): No acute bleed (2025-11-08)|MRI Brain: Acute ischemic stroke, left MCA (2025-11-09)",Patient presents with right-sided weakness and aphasia.|NIH Stroke Scale: 12.|tPA administered in ED. Patient transferred to Neuro ICU.
106,Sarah Chen,Amylase: 950 U/L (High) (2025-11-10)|Lipase: 1200 U/L (High) (2025-11-10),"Admitted with acute-onset abdominal pain, nausea, and vomiting.|Patient made NPO (nothing by mouth).|Receiving IV fluids and pain management (morphine)."
107,Michael Johnson,WBC: 14.2 k/uL (High) (2025-11-11)|Wound Culture: Pending (2025-11-11),"Post-op Day 3 (Appendectomy).|Surgical incision site is red, warm, and draining purulent fluid.|Fever of 102.0 F. Dr. notified. Started on IV Vancomycin for suspected infection."
108,Emily White,RSV Swab: Positive (2025-11-10),6-month-old female with respiratory distress.|Audible wheezing and intercostal retractions.|Placed in crib with cool mist. Suctioned for secretions PRN.
109,Kevin Patel,Urine Drug Screen: Negative (2025-11-09)|TSH: Within normal limits (2025-11-09),Patient admitted for acute depressive episode with suicidal ideation.|Placed on 1-to-1 observation.|Safety plan in place. Patient is cooperative with treatment.
110,Jessica Davis,GBS: Positive (2025-11-01)|Fetal Heart Tones: Stable (140s) (2025-11-11),"Patient is 39 weeks, admitted in active labor.|Epidural placed. Receiving Penicillin G for GBS prophylaxis.|Currently 6 cm dilated, 100% effaced."


In [0]:
# Install required libraries first (run this in a separate cell)
%pip install transformers librosa torch soundfile

# Restart Python kernel to use newly installed packages
dbutils.library.restartPython()

Collecting transformers
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
Collecting librosa
  Downloading librosa-0.11.0-py3-none-any.whl.metadata (8.7 kB)
Collecting torch
  Downloading torch-2.9.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collecting soundfile
  Downloading soundfile-0.13.1-py2.py3-none-manylinux_2_28_x86_64.whl.metadata (16 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2025.11.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
Collecting tokenizers<=0.23.0,>=0.22.0 (from transformers)
  Downloading tokenizers-0.22.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.8 kB)
Collecting safetensors>=0.4.3 (from transformers)
  Downloading safetensors-0.6.2-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metada

In [0]:
# 3. PIPELINE STEP 1: LOAD & TRANSCRIBE ALL AUDIO (BRONZE)
# =========================================================
from transformers import pipeline
import librosa
import glob
import os
from pyspark.sql.types import StructType, StructField, StringType
import torch

print("Loading Whisper model for transcription...")

# For Databricks, check if GPU is available
device = 0 if torch.cuda.is_available() else -1

asr_pipeline = pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-base",
    device=device
)

# Update path to Databricks Volume
audio_folder = "/Volumes/workspace/default/audio_files/"
audio_files = glob.glob(audio_folder + "*.wav")  # You can change .wav to .mp3 if needed

transcripts_list = []
print(f"Found {len(audio_files)} audio files. Starting transcription...")

# This is your Python batch loop
for audio_file_path in audio_files:
    # Get the patient ID from the filename
    patient_id = os.path.basename(audio_file_path).split('.')[0]
    
    # Load the audio
    speech, sample_rate = librosa.load(audio_file_path, sr=16000)
    
    # Transcribe
    result = asr_pipeline(speech, return_timestamps=True)
    transcript_text = result['text']
    
    # Add the result to our list
    transcripts_list.append((patient_id, transcript_text))
    print(f"  > Transcribed Patient ID: {patient_id}")

# --- Create the first PySpark DataFrame ---
transcript_schema = StructType([
    StructField("patient_id", StringType(), True),
    StructField("transcript_text", StringType(), True)
])

transcripts_df = spark.createDataFrame(transcripts_list, schema=transcript_schema)

print("\n--- Audio Transcripts DataFrame ---")
transcripts_df.show(truncate=False)

Loading Whisper model for transcription...


config.json: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/290M [00:00<?, ?B/s]

generation_config.json: 0.00B [00:00, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

normalizer.json: 0.00B [00:00, ?B/s]

added_tokens.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

preprocessor_config.json: 0.00B [00:00, ?B/s]

Device set to use cpu


Found 20 audio files. Starting transcription...


`return_token_timestamps` is deprecated for WhisperFeatureExtractor and will be removed in Transformers v5. Use `return_attention_mask` instead, as the number of frames can be inferred from it.
Using custom `forced_decoder_ids` from the (generation) config. This is deprecated in favor of the `task` and `language` flags/config options.
Transcription using a multilingual Whisper will default to language detection followed by transcription instead of translation to English. This might be a breaking change for your use case. If you want to instead always translate your audio to English, make sure to pass `language='en'`. See https://github.com/huggingface/transformers/pull/28687 for more details.


  > Transcribed Patient ID: 101
  > Transcribed Patient ID: 102
  > Transcribed Patient ID: 103
  > Transcribed Patient ID: 104
  > Transcribed Patient ID: 105
  > Transcribed Patient ID: 106
  > Transcribed Patient ID: 107
  > Transcribed Patient ID: 108
  > Transcribed Patient ID: 109
  > Transcribed Patient ID: 110
  > Transcribed Patient ID: 111
  > Transcribed Patient ID: 112
  > Transcribed Patient ID: 113
  > Transcribed Patient ID: 114
  > Transcribed Patient ID: 115
  > Transcribed Patient ID: 116
  > Transcribed Patient ID: 117
  > Transcribed Patient ID: 118
  > Transcribed Patient ID: 119
  > Transcribed Patient ID: 120

--- Audio Transcripts DataFrame ---
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
display(transcripts_df)

patient_id,transcript_text
101,"Michael 107, subject to post-operation day 3, appendectomy, reports worsening incision pain and fever, objective fever of 102, white count 14.2, incision site is red, warm and draining fluid, assessment, surgical site infection, plan send wood culture, start bankomacinal, consult surgery."
102,"Jessica, subject 32 year old female, third and weak pregnant in active labor, contractions every 3 to 5 minutes, object 2, fetal heart tones, stable, patient is 6 centimeter dilated, 100% effaced, GBS positive, assessment, active labor, plan, admit to L and D, start Penthaline G for GBS, notice Anastasia."
103,"Patient John 101, subject to 58-year-old male history of hyperattention presents with severe crushing chest pain, objective TCG shows ST segment elevation, troponin is marked elevated BNB high assessment, acute myocardial infraction, plan admit to cardiac unit, consult cardiology."
104,"David Onzerofel, subject to 67-year-old male, acute right-side weakness, symptoms started on our ego. Object to CT head is negative obliter, MRI confirms left MCA ishemic stroke, NIH score is 11th. Assessment, acute ishemic stroke, plan, administer TBA per protocol, and shorten neuro ICU."
105,"Emily 108, subject 2, 6 month old female, with respiratory distress and poor feeding, objective, audible phasing and intercostal retractions, nasal swab is born to, for RSV, as a cement bronchialitis, secondary to RSV, plan, add me to pdiatrics, provide cool mist and nasal suctioning."
106,"Kevin 109 Subjective 29-year-old male admitted for acute depressive episode with pause-bass you suicidal ideation objective urine, dry, drug screen negative patient is cooperative but has a flat effect assessment major depressive disorder with suicidal ideation plan admit to psychiatrist start one to one observation"
107,"The patient is a fortifier year old male presenting with a three-day history of severe chest pain and shortness of breath. He describes the pain as a crushing pressure. He has a history of hypertension and type 2 diabetes. Vital strengths are stable. ECG shows ST segment elevation in the anterior leads, proponent levels are elevated. Patient is diagnosed with an acute myocondyl infection. We will admit the patient to the cardiac unit for continuous monitoring, we will start him on aspirin, lysinopryl and start him, we will also consult cardiology for an urgent cardia characterization."
108,"Note 4 James Smith 102 Subjective 72-year-old female presented after a fall, severe right-hit pain unable to bear weight, objective, right leg shortened and externally rotated, X-ray confirmed femoral neck fracture, HGB is 9.5, assessment, accurate right-hit fracture, plan and pivot for surgery, or the consult for fixation and manage pain with PCA post-op."
109,"Sara, 35 year old, female, acute, epigastric pain, radiating to back with nausea, objective analyze his 950, liposis 1200, assessment, acute pancreas, plan, start morphine, order right upper quadrant, ultrasound."
110,"For our brown 103 subject 68 year old male with a 3 day history of worsening cough fever 101.5 shortness of breath objective o2 saturation 92% on 2 liters chest x ray right lower low infiltrates abg shows respiratory acidosis assessment pneumonia plan admit to floor start 4, septary, eggsone and ezytromycin, continue oxygen."


In [0]:
df = spark.read.table("patient_notes")
from pyspark.sql.functions import col, concat_ws
from transformers import pipeline

# =========================================================
# AI PREPROCESSING: EXTRACT FROM NURSING NOTES (SILVER)
# =========================================================

print("\n--- Initializing Question Answering (QA) AI Model (on CPU) ---")
# Load a model fine-tuned for question answering
# device=-1 explicitly tells it to use the CPU
qa_pipeline = pipeline(
    "question-answering", 
    model="distilbert-base-cased-distilled-squad", 
    device=-1  # Use CPU
)

# --- Step 1: Combine notes from your *existing* 'df' DataFrame ---
# This line is changed to use 'df' as the source
nursing_notes_text_df = df.select(
    col("patient_id"),
    concat_ws(" ", col("nursing_notes")).alias("notes_context") # Join all notes with a space
)

# --- Step 2: Collect the notes to the driver node ---
print("Collecting notes to run AI extraction...")
patients_to_process = nursing_notes_text_df.collect()

# --- Step 3: Define your questions (your desired columns) ---
questions = {
    "patient_problem": "What is the patient's primary problem or complaint?",
    "vitals": "What are the patient's vitals?",
    "assessment": "What is the assessment?",
    "plan": "What is the plan?"
}

# --- Step 4: Loop, ask questions, and get AI-extracted answers ---
extracted_data = []

for row in patients_to_process:
    patient_id = row["patient_id"]
    context = row["notes_context"]
    
    result = {"patient_id": patient_id}
    
    # Ask each question to the AI model
    for column_name, question_text in questions.items():
        qa_result = qa_pipeline(question=question_text, context=context)
        
        # We save the extracted answer.
        if qa_result['score'] > 0.1: # You can tune this confidence threshold
            result[column_name] = qa_result['answer']
        else:
            result[column_name] = None # No good answer found
            
    extracted_data.append(result)
    print(f" > Processed Patient ID: {patient_id}")

# --- Step 5: Create the final Silver DataFrame ---
# Convert your list of Python dictionaries into a new Spark DataFrame
ai_nursing_silver_df = spark.createDataFrame(extracted_data)

print("\n--- AI-Extracted Nursing Notes Silver Table ---")
ai_nursing_silver_df.show(truncate=False)


--- Initializing Question Answering (QA) AI Model (on CPU) ---


Device set to use cpu


Collecting notes to run AI extraction...
 > Processed Patient ID: 101
 > Processed Patient ID: 102
 > Processed Patient ID: 103
 > Processed Patient ID: 104
 > Processed Patient ID: 105
 > Processed Patient ID: 106
 > Processed Patient ID: 107
 > Processed Patient ID: 108
 > Processed Patient ID: 109
 > Processed Patient ID: 110
 > Processed Patient ID: 201
 > Processed Patient ID: 202
 > Processed Patient ID: 203
 > Processed Patient ID: 205
 > Processed Patient ID: 206
 > Processed Patient ID: 207
 > Processed Patient ID: 208
 > Processed Patient ID: 209
 > Processed Patient ID: 210

--- AI-Extracted Nursing Notes Silver Table ---
+---------------------------------------+----------+---------------------------------------------+-------------------------------------------------------------------+---------------------------------------------------+
|assessment                             |patient_id|patient_problem                              |plan                                      

In [0]:
display(ai_nursing_silver_df)

assessment,patient_id,patient_problem,plan,vitals
8/10 chest pain,101,chest pain,Family anxious,
Vitals stable,102,pain,,
92%,103,cough and fever,Started on IV antibiotics,
Diabetes educator consulted,104,DKA,Started on insulin drip per protocol,
NIH Stroke Scale: 12,105,aphasia,NIH Stroke Scale: 12,
Receiving IV fluids and pain management,106,acute-onset abdominal pain,Receiving IV fluids and pain management,
,107,infection,,
Suctioned for secretions PRN,108,respiratory distress,Suctioned for secretions PRN,
,109,suicidal ideation,Safety plan,
Epidural placed,110,"Patient is 39 weeks, admitted in active labor",Receiving Penicillin G for GBS prophylaxis,


In [0]:
import re  # Import regular expressions for cleaning
from pyspark.sql.functions import col, concat_ws, regexp_extract
from transformers import pipeline

# Load your DataFrame from the table
df = spark.read.table("patient_notes")

# =========================================================
# AI PREPROCESSING: EXTRACT FROM NURSING NOTES (SILVER)
# HYBRID APPROACH (REGEX + AI + CATCH-ALL)
# =========================================================

print("\n--- Initializing Question Answering (QA) AI Model (on CPU) ---")
qa_pipeline = pipeline(
    "question-answering",
    model="distilbert-base-cased-distilled-squad",
    device=-1  # Use CPU
)

# --- Step 1: Pre-process the notes from 'df' ---
nursing_notes_hybrid_df = df.select(
    col("patient_id"),
    # Create the single text blob for the AI
    concat_ws(" ", col("nursing_notes")).alias("notes_context"),

    # --- USE REGEX FOR VITALS ---
    # We'll extract any text after "Vitals: " and before the next period "."
    regexp_extract(
        concat_ws(" ", col("nursing_notes")),
        r"Vitals: (.*?)\.",
        1
    ).alias("vitals_extract") # Renamed to avoid confusion
)

# --- Step 2: Collect the data to the driver node ---
print("Collecting notes to run AI extraction...")
patients_to_process = nursing_notes_hybrid_df.collect()

# --- Step 3: Define *only* the hard questions for the AI ---
questions = {
    "patient_problem": "What is the patient's primary problem or complaint?",
    "assessment": "What is the assessment?",
    "plan": "What is the plan?"
}

# --- Step 4: Loop, ask questions, and get AI-extracted answers ---
extracted_data = []

for row in patients_to_process:
    patient_id = row["patient_id"]
    context = row["notes_context"]
    
    result = {"patient_id": patient_id}
    
    # This list will hold all the pieces of text we extract
    extracted_pieces = []
    
    # --- 1. Get Vitals (Regex) ---
    vitals_text = row["vitals_extract"] if row["vitals_extract"] else None
    if vitals_text:
        result["vitals"] = vitals_text
        # Add the extracted text AND the keyword to the removal list
        extracted_pieces.append(vitals_text)
        extracted_pieces.append("Vitals:")
    else:
        result["vitals"] = None

    # --- 2. Get AI Answers ---
    for column_name, question_text in questions.items():
        qa_result = qa_pipeline(question=question_text, context=context)
        
        if qa_result['score'] > 0.1: # You can tune this confidence threshold
            answer = qa_result['answer']
            result[column_name] = answer
            # Add the AI's answer to the removal list
            extracted_pieces.append(answer)
        else:
            result[column_name] = None

    
    extracted_data.append(result)
    print(f" > Processed Patient ID: {patient_id}")

# --- Step 5: Create the final Silver DataFrame ---
ai_nursing_silver_df = spark.createDataFrame(extracted_data)

# Reorder columns to be more logical
column_order = [
    "patient_id",
    "patient_problem",
    "vitals",
    "assessment",
    "plan"
]
ai_nursing_silver_df = ai_nursing_silver_df.select(column_order)


print("\n--- AI-Extracted Nursing Notes Silver Table (Hybrid + Other) ---")
display(ai_nursing_silver_df)


--- Initializing Question Answering (QA) AI Model (on CPU) ---


Device set to use cpu


Collecting notes to run AI extraction...
 > Processed Patient ID: 101
 > Processed Patient ID: 102
 > Processed Patient ID: 103
 > Processed Patient ID: 104
 > Processed Patient ID: 105
 > Processed Patient ID: 106
 > Processed Patient ID: 107
 > Processed Patient ID: 108
 > Processed Patient ID: 109
 > Processed Patient ID: 110
 > Processed Patient ID: 201
 > Processed Patient ID: 202
 > Processed Patient ID: 203
 > Processed Patient ID: 205
 > Processed Patient ID: 206
 > Processed Patient ID: 207
 > Processed Patient ID: 208
 > Processed Patient ID: 209
 > Processed Patient ID: 210

--- AI-Extracted Nursing Notes Silver Table (Hybrid + Other) ---


patient_id,patient_problem,vitals,assessment,plan
101,chest pain,,8/10 chest pain,Family anxious
102,pain,,Vitals stable,
103,cough and fever,,92%,Started on IV antibiotics
104,DKA,,Diabetes educator consulted,Started on insulin drip per protocol
105,aphasia,,NIH Stroke Scale: 12,NIH Stroke Scale: 12
106,acute-onset abdominal pain,,Receiving IV fluids and pain management,Receiving IV fluids and pain management
107,infection,,,
108,respiratory distress,,Suctioned for secretions PRN,Suctioned for secretions PRN
109,suicidal ideation,,,Safety plan
110,"Patient is 39 weeks, admitted in active labor",,Epidural placed,Receiving Penicillin G for GBS prophylaxis


In [0]:
from pyspark.sql.functions import col, explode, split, regexp_extract, regexp_replace, trim, when
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# Load your base DataFrame

# OR if it's already a table:
df = spark.read.table("patient_notes")

# =========================================================
# PREPROCESSING: CREATE LAB RESULTS SILVER TABLE
# =========================================================

print("\n--- Creating Silver Table: Lab Results ---")

# --- Step 1: Split the lab_results by pipe (|) to create an array ---
df_split = df.withColumn(
    "lab_results_array",
    split(col("lab_results"), "\\|")
)

# --- Step 2: Explode the array to get one row per lab result ---
df_exploded = df_split.withColumn(
    "lab_result_raw",
    explode(col("lab_results_array"))
).filter(col("lab_result_raw").isNotNull())

# --- Step 3: Parse each lab result string ---
# Format: "Test: Result (Date)" where date is in YYYY-MM-DD format
lab_results_silver_df = df_exploded.select(
    col("patient_id"),
    col("patient_name"),
    # Extract test name (everything before the colon)
    trim(regexp_extract(col("lab_result_raw"), "^([^:]+):", 1)).alias("test"),
    # Extract result (everything between colon and date pattern, or end of string)
    # Remove the date pattern at the end if it exists
    trim(
        regexp_replace(
            regexp_extract(col("lab_result_raw"), ":\\s*(.+)", 1),
            "\\s*\\(\\d{4}-\\d{2}-\\d{2}\\)\\s*$",
            ""
        )
    ).alias("result"),
    # Extract date ONLY if it matches YYYY-MM-DD pattern in parentheses
    when(
        regexp_extract(col("lab_result_raw"), "\\((\\d{4}-\\d{2}-\\d{2})\\)", 1) != "",
        regexp_extract(col("lab_result_raw"), "\\((\\d{4}-\\d{2}-\\d{2})\\)", 1)
    ).otherwise("NA").alias("date")
).filter(col("test") != "")

# Show the result
print("\n--- Lab Results Silver Table ---")
lab_results_silver_df.display(50, truncate=False)

# Optional: Write to Delta table
# lab_results_silver_df.write.format("delta").mode("overwrite").saveAsTable("lab_results_silver")



--- Creating Silver Table: Lab Results ---

--- Lab Results Silver Table ---


patient_id,patient_name,test,result,date
101,John Doe,Troponin,Elevated,2025-11-10
101,John Doe,ECG,ST-segment elevation,2025-11-10
101,John Doe,BNP,High,2025-11-11
102,Jane Smith,X-Ray (Right Hip),Femoral neck fracture,2025-11-09
102,Jane Smith,Hgb,9.5 g/dL (Low),2025-11-10
103,Robert Brown,Sputum Culture,Pending,2025-11-11
103,Robert Brown,ABG,Respiratory acidosis,2025-11-11
103,Robert Brown,Chest X-Ray,Infiltrates in right lower lobe,2025-11-10
104,Maria Garcia,Blood Glucose,450 mg/dL (High),2025-11-11
104,Maria Garcia,Urinalysis,Ketones present,2025-11-11


In [0]:
# Save the transcripts results you just created
transcripts_df.write.format("delta").mode("overwrite").saveAsTable("transcripts_bronze")

In [0]:
# Save the lab results you just created
lab_results_silver_df.write.format("delta").mode("overwrite").saveAsTable("lab_results_silver")

In [0]:
# Save the nursing notes from the AI (QA) model
# (You may need to re-run the cell that creates 'ai_nursing_silver_df' first)
ai_nursing_silver_df.write.format("delta").mode("overwrite").saveAsTable("nursing_notes_silver")

In [0]:
from pyspark.sql.functions import col, concat, lit, collect_list, concat_ws, coalesce

# --- 1. Load All Our Silver Tables ---
labs_df = spark.read.table("lab_results_silver")
nursing_df = spark.read.table("nursing_notes_silver") # This table is now correct
transcripts_df = spark.read.table("transcripts_bronze")
patients_df = spark.read.table("patient_notes").select("patient_id", "patient_name")

# --- 2. Aggregate Lab Results ---
labs_with_string = labs_df.withColumn(
    "lab_string", 
    concat(col("test"), lit(": "), col("result"))
)
labs_agg_df = labs_with_string.groupBy("patient_id").agg(
    concat_ws(" | ", collect_list("lab_string")).alias("lab_summary")
)

# --- 3. Join All Data Sources ---
final_join_df = patients_df.join(
    transcripts_df.select("patient_id", "transcript_text"), "patient_id", "left"
).join(
    nursing_df, "patient_id", "left"
).join(
    labs_agg_df, "patient_id", "left"
)

# --- 4. Create the Final SOAP Note Prompt ---
# *** MODIFIED: Removed 'other_clinical_details' line ***
soap_prompt_df = final_join_df.withColumn(
    "Full_History_Text",
    concat_ws(
        "\n\n",  # Separate each section with a double newline
        
        lit("--- SUBJECTIVE ---"),
        coalesce(col("patient_problem"), lit("N/A")),
        
        lit("--- OBJECTIVE ---"),
        concat(
            lit("DOCTOR'S DICTATION: "), coalesce(col("transcript_text"), lit("N/A")),
            lit("\nVITALS: "), coalesce(col("vitals"), lit("N/A"))
        ),
        
        lit("--- KEY LABS ---"),
        coalesce(col("lab_summary"), lit("N/A")),
        
        lit("--- ASSESSMENT ---"),
        coalesce(col("assessment"), lit("N/A")),
        
        lit("--- PLAN ---"),
        coalesce(col("plan"), lit("N/A"))
    )
).select("patient_id", "patient_name", "Full_History_Text")

# --- 5. Show the Final Result ---
print("\n--- Gold Layer Pre-processing Complete (SOAP Format) ---")
display(soap_prompt_df)


--- Gold Layer Pre-processing Complete (SOAP Format) ---


patient_id,patient_name,Full_History_Text
101,John Doe,"--- SUBJECTIVE --- chest pain --- OBJECTIVE --- DOCTOR'S DICTATION: Michael 107, subject to post-operation day 3, appendectomy, reports worsening incision pain and fever, objective fever of 102, white count 14.2, incision site is red, warm and draining fluid, assessment, surgical site infection, plan send wood culture, start bankomacinal, consult surgery. VITALS: N/A --- KEY LABS --- Troponin: Elevated | ECG: ST-segment elevation | BNP: High --- ASSESSMENT --- 8/10 chest pain --- PLAN --- Family anxious"
102,Jane Smith,"--- SUBJECTIVE --- pain --- OBJECTIVE --- DOCTOR'S DICTATION: Jessica, subject 32 year old female, third and weak pregnant in active labor, contractions every 3 to 5 minutes, object 2, fetal heart tones, stable, patient is 6 centimeter dilated, 100% effaced, GBS positive, assessment, active labor, plan, admit to L and D, start Penthaline G for GBS, notice Anastasia. VITALS: N/A --- KEY LABS --- X-Ray (Right Hip): Femoral neck fracture | Hgb: 9.5 g/dL (Low) --- ASSESSMENT --- Vitals stable --- PLAN --- N/A"
103,Robert Brown,"--- SUBJECTIVE --- cough and fever --- OBJECTIVE --- DOCTOR'S DICTATION: Patient John 101, subject to 58-year-old male history of hyperattention presents with severe crushing chest pain, objective TCG shows ST segment elevation, troponin is marked elevated BNB high assessment, acute myocardial infraction, plan admit to cardiac unit, consult cardiology. VITALS: N/A --- KEY LABS --- Sputum Culture: Pending | ABG: Respiratory acidosis | Chest X-Ray: Infiltrates in right lower lobe --- ASSESSMENT --- 92% --- PLAN --- Started on IV antibiotics"
104,Maria Garcia,"--- SUBJECTIVE --- DKA --- OBJECTIVE --- DOCTOR'S DICTATION: David Onzerofel, subject to 67-year-old male, acute right-side weakness, symptoms started on our ego. Object to CT head is negative obliter, MRI confirms left MCA ishemic stroke, NIH score is 11th. Assessment, acute ishemic stroke, plan, administer TBA per protocol, and shorten neuro ICU. VITALS: N/A --- KEY LABS --- Blood Glucose: 450 mg/dL (High) | Urinalysis: Ketones present | A1c: 11.2% --- ASSESSMENT --- Diabetes educator consulted --- PLAN --- Started on insulin drip per protocol"
105,David Lee,"--- SUBJECTIVE --- aphasia --- OBJECTIVE --- DOCTOR'S DICTATION: Emily 108, subject 2, 6 month old female, with respiratory distress and poor feeding, objective, audible phasing and intercostal retractions, nasal swab is born to, for RSV, as a cement bronchialitis, secondary to RSV, plan, add me to pdiatrics, provide cool mist and nasal suctioning. VITALS: N/A --- KEY LABS --- CT Head (Non-contrast): No acute bleed | MRI Brain: Acute ischemic stroke, left MCA --- ASSESSMENT --- NIH Stroke Scale: 12 --- PLAN --- NIH Stroke Scale: 12"
106,Sarah Chen,"--- SUBJECTIVE --- acute-onset abdominal pain --- OBJECTIVE --- DOCTOR'S DICTATION: Kevin 109 Subjective 29-year-old male admitted for acute depressive episode with pause-bass you suicidal ideation objective urine, dry, drug screen negative patient is cooperative but has a flat effect assessment major depressive disorder with suicidal ideation plan admit to psychiatrist start one to one observation VITALS: N/A --- KEY LABS --- Amylase: 950 U/L (High) | Lipase: 1200 U/L (High) --- ASSESSMENT --- Receiving IV fluids and pain management --- PLAN --- Receiving IV fluids and pain management"
107,Michael Johnson,"--- SUBJECTIVE --- infection --- OBJECTIVE --- DOCTOR'S DICTATION: The patient is a fortifier year old male presenting with a three-day history of severe chest pain and shortness of breath. He describes the pain as a crushing pressure. He has a history of hypertension and type 2 diabetes. Vital strengths are stable. ECG shows ST segment elevation in the anterior leads, proponent levels are elevated. Patient is diagnosed with an acute myocondyl infection. We will admit the patient to the cardiac unit for continuous monitoring, we will start him on aspirin, lysinopryl and start him, we will also consult cardiology for an urgent cardia characterization. VITALS: N/A --- KEY LABS --- WBC: 14.2 k/uL (High) | Wound Culture: Pending --- ASSESSMENT --- N/A --- PLAN --- N/A"
108,Emily White,"--- SUBJECTIVE --- respiratory distress --- OBJECTIVE --- DOCTOR'S DICTATION: Note 4 James Smith 102 Subjective 72-year-old female presented after a fall, severe right-hit pain unable to bear weight, objective, right leg shortened and externally rotated, X-ray confirmed femoral neck fracture, HGB is 9.5, assessment, accurate right-hit fracture, plan and pivot for surgery, or the consult for fixation and manage pain with PCA post-op. VITALS: N/A --- KEY LABS --- RSV Swab: Positive --- ASSESSMENT --- Suctioned for secretions PRN --- PLAN --- Suctioned for secretions PRN"
109,Kevin Patel,"--- SUBJECTIVE --- suicidal ideation --- OBJECTIVE --- DOCTOR'S DICTATION: Sara, 35 year old, female, acute, epigastric pain, radiating to back with nausea, objective analyze his 950, liposis 1200, assessment, acute pancreas, plan, start morphine, order right upper quadrant, ultrasound. VITALS: N/A --- KEY LABS --- Urine Drug Screen: Negative | TSH: Within normal limits --- ASSESSMENT --- N/A --- PLAN --- Safety plan"
110,Jessica Davis,"--- SUBJECTIVE --- Patient is 39 weeks, admitted in active labor --- OBJECTIVE --- DOCTOR'S DICTATION: For our brown 103 subject 68 year old male with a 3 day history of worsening cough fever 101.5 shortness of breath objective o2 saturation 92% on 2 liters chest x ray right lower low infiltrates abg shows respiratory acidosis assessment pneumonia plan admit to floor start 4, septary, eggsone and ezytromycin, continue oxygen. VITALS: N/A --- KEY LABS --- GBS: Positive | Fetal Heart Tones: Stable (140s) --- ASSESSMENT --- Epidural placed --- PLAN --- Receiving Penicillin G for GBS prophylaxis"


In [0]:
soap_prompt_df.write.format("delta").mode("overwrite").saveAsTable("discharge_summary_gold")

print("Successfully saved 'discharge_summary_gold' table.")

Successfully saved 'discharge_summary_gold' table.


In [0]:
import re
from pyspark.sql.functions import col
from transformers import pipeline

# --- 1. Load the Data We Need ---
# This is the DataFrame we created in the previous step
# It has patient_id, patient_name, and Full_History_Text
df_for_patient = spark.read.table("discharge_summary_gold") # Or use soap_prompt_df if it's still in memory

# --- 2. Initialize the AI Models ---
print("\n--- Initializing AI Models for Patient Use Case (on CPU) ---")

# Model 1: Named Entity Recognition (NER) for "Knowledge"
ner_pipeline = pipeline(
    "ner",
    model="d4data/biomedical-ner-all",
    aggregation_strategy="simple",
    device=-1  # Use CPU
)

# Model 2: Question Answering (QA) for "Action"
qa_pipeline = pipeline(
    "question-answering",
    model="distilbert-base-cased-distilled-squad",
    device=-1  # Use CPU
)
print("--- AI Models Initialized ---")

# --- 3. Collect Data to Process ---
print("Collecting data to process for patient portal...")
patients_to_process = df_for_patient.collect()

# --- 4. Define the "Action" Questions ---
action_questions = {
    "doctor_advice": "What advice or instructions did the doctor give the patient?",
    "follow_up": "What is the follow-up plan?"
}

# --- 5. Loop, Extract Knowledge & Action ---
patient_portal_data = []

for row in patients_to_process:
    patient_id = row["patient_id"]
    patient_name = row["patient_name"]
    full_text = row["Full_History_Text"]
    
    result = {
        "patient_id": patient_id,
        "patient_name": patient_name
    }
    
    # --- Part 1: "Knowledge" (NER) ---
    ner_results = ner_pipeline(full_text)
    
    key_terms = []
    for entity in ner_results:
        entity_type = entity['entity_group']
        entity_word = entity['word'].strip().lower() # Clean up the word
        
        # We only want to show the patient the most important terms
        if entity_type in ["Medication", "Disease", "Symptom", "Treatment"]:
            if len(entity_word) > 2 and "n/a" not in entity_word:
                key_terms.append(f"{entity_type}: {entity_word}")
    
    # Get a unique, sorted list
    result["key_terms"] = " | ".join(sorted(list(set(key_terms))))

    # --- Part 2: "Action" (QA) ---
    advice_list = []
    for col_name, question in action_questions.items():
        qa_result = qa_pipeline(question=question, context=full_text)
        
        if qa_result['score'] > 0.05: # Lower threshold to catch more advice
            # Clean the answer
            answer = qa_result['answer'].strip(" .")
            if "N/A" not in answer:
                advice_list.append(answer)
    
    result["doctor_advice"] = " | ".join(sorted(list(set(advice_list))))
            
    patient_portal_data.append(result)
    print(f" > Processed Patient ID: {patient_id}")

# --- 6. Create the Final "Patient Portal" DataFrame ---
patient_portal_df = spark.createDataFrame(patient_portal_data)

print("\n--- Final 'Patient Portal' Table ---")
display(patient_portal_df)

# --- 7. Save Your Final Table ---
patient_portal_df.write.format("delta").mode("overwrite").saveAsTable("patient_portal_gold")


--- Initializing AI Models for Patient Use Case (on CPU) ---


Device set to use cpu
Device set to use cpu


--- AI Models Initialized ---
Collecting data to process for patient portal...
 > Processed Patient ID: 101
 > Processed Patient ID: 102
 > Processed Patient ID: 103
 > Processed Patient ID: 104
 > Processed Patient ID: 105
 > Processed Patient ID: 106
 > Processed Patient ID: 107
 > Processed Patient ID: 108
 > Processed Patient ID: 109
 > Processed Patient ID: 110
 > Processed Patient ID: 201
 > Processed Patient ID: 202
 > Processed Patient ID: 203
 > Processed Patient ID: 205
 > Processed Patient ID: 206
 > Processed Patient ID: 207
 > Processed Patient ID: 208
 > Processed Patient ID: 209
 > Processed Patient ID: 210

--- Final 'Patient Portal' Table ---


doctor_advice,key_terms,patient_id,patient_name
assessment | send wood culture,,101,John Doe
"admit to L and D | assessment, active labor, plan, admit to L and D",Medication: pen,102,Jane Smith
"admit to cardiac unit, consult cardiology | consult cardiology",Medication: antibiotics,103,Robert Brown
"administer TBA per protocol | administer TBA per protocol, and shorten neuro ICU",Medication: insulin,104,Maria Garcia
add me to pdiatrics | audible phasing and intercostal retractions,,105,David Lee
admit to psychiatrist start one to one observation | cooperative,,106,Sarah Chen
,Medication: ##pryl | Medication: lysino,107,Michael Johnson
consult for fixation and manage pain with PCA post-op | plan and pivot for surgery,Medication: secret,108,Emily White
"Safety plan | objective analyze his 950, liposis 1200, assessment",,109,Kevin Patel
continue oxygen,Medication: ##tromy | Medication: ##zy | Medication: eggs | Medication: penicillin,110,Jessica Davis


In [0]:
import re
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
from transformers import pipeline

# --- 1. Load the Data ---
df_for_patient = spark.read.table("discharge_summary_gold")

# --- 2. Initialize AI Models ---
print("\n--- Initializing AI Models for Patient Portal ---")

# NER for extracting medical entities
ner_pipeline = pipeline(
    "ner",
    model="d4data/biomedical-ner-all",
    aggregation_strategy="simple",
    device=-1
)

# Summarization for better advice extraction
summarizer = pipeline(
    "summarization",
    model="facebook/bart-large-cnn",
    device=-1
)

print("--- AI Models Initialized ---")

# --- 3. Collect Data ---
print("Collecting patient data...")
patients_to_process = df_for_patient.collect()

# --- 4. Process Each Patient ---
patient_portal_data = []

for row in patients_to_process:
    patient_id = row["patient_id"]
    patient_name = row["patient_name"]
    full_text = row["Full_History_Text"]
    
    print(f"\n--- Processing Patient: {patient_id} - {patient_name} ---")
    
    # Initialize result dictionary with proper column order
    result = {
        "patient_id": patient_id,
        "patient_name": patient_name,
        "medications_prescribed": "",
        "conditions_identified": "",
        "symptoms_noted": "",
        "doctor_instructions": "",
        "follow_up_actions": ""
    }
    
    # --- STEP 1: Extract Medical Entities with Better Handling ---
    try:
        ner_results = ner_pipeline(full_text)
        
        medications = []
        conditions = []
        symptoms = []
        
        for entity in ner_results:
            entity_type = entity['entity_group']
            entity_word = entity['word'].strip()
            
            # Clean up the entity word - remove hashtags and extra spaces
            entity_word = entity_word.replace('#', '').replace('##', '')
            
            # Only keep meaningful terms (length > 2 characters)
            if len(entity_word) > 2:
                if entity_type == "Medication":
                    medications.append(entity_word)
                elif entity_type in ["Disease", "Disease_disorder"]:
                    conditions.append(entity_word)
                elif entity_type == "Symptom":
                    symptoms.append(entity_word)
        
        # Deduplicate and format
        result["medications_prescribed"] = ", ".join(sorted(set(medications))) if medications else "None documented"
        result["conditions_identified"] = ", ".join(sorted(set(conditions))) if conditions else "None documented"
        result["symptoms_noted"] = ", ".join(sorted(set(symptoms))) if symptoms else "None documented"
        
    except Exception as e:
        print(f"  [WARNING] NER failed for patient {patient_id}: {str(e)}")
        result["medications_prescribed"] = "Error extracting medications"
        result["conditions_identified"] = "Error extracting conditions"
        result["symptoms_noted"] = "Error extracting symptoms"
    
    # --- STEP 2: Extract Doctor Instructions ---
    try:
        # Look for instruction patterns in the text
        instruction_patterns = [
            r"(?:plan|instructions?|advice|prescribed|ordered|started on|continue)[:\s]+([^.|]+)",
            r"(?:patient (?:should|advised to|instructed to))[:\s]+([^.|]+)",
            r"(?:discharge (?:instructions?|plan))[:\s]+([^.|]+)"
        ]
        
        instructions = []
        for pattern in instruction_patterns:
            matches = re.findall(pattern, full_text, re.IGNORECASE)
            instructions.extend([match.strip() for match in matches if len(match.strip()) > 10])
        
        if instructions:
            # Deduplicate and limit to top 3 most relevant
            unique_instructions = list(set(instructions))[:3]
            result["doctor_instructions"] = " | ".join(unique_instructions)
        else:
            result["doctor_instructions"] = "Please consult your discharge paperwork for detailed instructions"
            
    except Exception as e:
        print(f"  [WARNING] Instruction extraction failed for patient {patient_id}: {str(e)}")
        result["doctor_instructions"] = "Please consult your discharge paperwork"
    
    # --- STEP 3: Extract Follow-up Actions ---
    try:
        # Look for follow-up patterns
        followup_patterns = [
            r"(?:follow[- ]?up)[:\s]+([^.|]+)",
            r"(?:return to|see|consult|appointment with)[:\s]+([^.|]+)",
            r"(?:scheduled for|referred to)[:\s]+([^.|]+)"
        ]
        
        followups = []
        for pattern in followup_patterns:
            matches = re.findall(pattern, full_text, re.IGNORECASE)
            followups.extend([match.strip() for match in matches if len(match.strip()) > 5])
        
        if followups:
            unique_followups = list(set(followups))[:3]
            result["follow_up_actions"] = " | ".join(unique_followups)
        else:
            result["follow_up_actions"] = "No specific follow-up documented. Contact your provider if symptoms worsen."
            
    except Exception as e:
        print(f"  [WARNING] Follow-up extraction failed for patient {patient_id}: {str(e)}")
        result["follow_up_actions"] = "Please review your discharge instructions"
    
    patient_portal_data.append(result)
    print(f"  ✓ Completed processing for Patient ID: {patient_id}")

# --- 5. Create DataFrame with Proper Schema ---
schema = StructType([
    StructField("patient_id", StringType(), True),
    StructField("patient_name", StringType(), True),
    StructField("medications_prescribed", StringType(), True),
    StructField("conditions_identified", StringType(), True),
    StructField("symptoms_noted", StringType(), True),
    StructField("doctor_instructions", StringType(), True),
    StructField("follow_up_actions", StringType(), True)
])

patient_portal_df = spark.createDataFrame(patient_portal_data, schema=schema)

# --- 6. Display Results ---
print("\n" + "="*80)
print("PATIENT PORTAL - FINAL OUTPUT")
print("="*80)
display(patient_portal_df)
patient_portal_df.write.format("delta").mode("overwrite").saveAsTable("patient_portal_gold")



--- Initializing AI Models for Patient Portal ---


Device set to use cpu
Device set to use cpu


--- AI Models Initialized ---
Collecting patient data...

--- Processing Patient: 101 - John Doe ---
  ✓ Completed processing for Patient ID: 101

--- Processing Patient: 102 - Jane Smith ---
  ✓ Completed processing for Patient ID: 102

--- Processing Patient: 103 - Robert Brown ---
  ✓ Completed processing for Patient ID: 103

--- Processing Patient: 104 - Maria Garcia ---
  ✓ Completed processing for Patient ID: 104

--- Processing Patient: 105 - David Lee ---
  ✓ Completed processing for Patient ID: 105

--- Processing Patient: 106 - Sarah Chen ---
  ✓ Completed processing for Patient ID: 106

--- Processing Patient: 107 - Michael Johnson ---
  ✓ Completed processing for Patient ID: 107

--- Processing Patient: 108 - Emily White ---
  ✓ Completed processing for Patient ID: 108

--- Processing Patient: 109 - Kevin Patel ---
  ✓ Completed processing for Patient ID: 109

--- Processing Patient: 110 - Jessica Davis ---
  ✓ Completed processing for Patient ID: 110

--- Processing Patien

patient_id,patient_name,medications_prescribed,conditions_identified,symptoms_noted,doctor_instructions,follow_up_actions
101,John Doe,None documented,None documented,None documented,"--- Family anxious | send wood culture, start bankomacinal, consult surgery",surgery
102,Jane Smith,pen,fracture,None documented,Please consult your discharge paperwork for detailed instructions,No specific follow-up documented. Contact your provider if symptoms worsen.
103,Robert Brown,antibiotics,respiratory acid,None documented,"admit to cardiac unit, consult cardiology | --- Started on IV antibiotics",cardiology
104,Maria Garcia,insulin,None documented,None documented,--- Started on insulin drip per protocol,No specific follow-up documented. Contact your provider if symptoms worsen.
105,David Lee,None documented,has,None documented,--- NIH Stroke Scale: 12,No specific follow-up documented. Contact your provider if symptoms worsen.
106,Sarah Chen,None documented,"depressive episode, major depressive disorder",None documented,admit to psychiatrist start one to one observation VITALS: N/A --- KEY LABS --- Amylase: 950 U/L (High) | --- Receiving IV fluids and pain management,No specific follow-up documented. Contact your provider if symptoms worsen.
107,Michael Johnson,"lysino, pryl",ocond,None documented,Please consult your discharge paperwork for detailed instructions,cardiology for an urgent cardia characterization
108,Emily White,secret,None documented,None documented,"--- Suctioned for secretions PRN | and pivot for surgery, or the consult for fixation and manage pain with PCA post-op",for fixation and manage pain with PCA post-op
109,Kevin Patel,None documented,None documented,None documented,--- Safety plan,No specific follow-up documented. Contact your provider if symptoms worsen.
110,Jessica Davis,"eggs, penicillin, tromy",None documented,None documented,"admit to floor start 4, septary, eggsone and ezytromycin, continue oxygen | --- Receiving Penicillin G for GBS prophylaxis",No specific follow-up documented. Contact your provider if symptoms worsen.


In [0]:
# --- 8. Show sample for one patient ---
print("\n--- SAMPLE OUTPUT FOR ONE PATIENT ---")
sample = patient_portal_df.limit(1).collect()[0]
print(f"\nPatient: {sample['patient_name']} (ID: {sample['patient_id']})")
print(f"\n📋 Medications Prescribed:\n   {sample['medications_prescribed']}")
print(f"\n🏥 Conditions Identified:\n   {sample['conditions_identified']}")
print(f"\n🤒 Symptoms Noted:\n   {sample['symptoms_noted']}")
print(f"\n💊 Doctor's Instructions:\n   {sample['doctor_instructions']}")
print(f"\n📅 Follow-up Actions:\n   {sample['follow_up_actions']}")
print("\n" + "="*80)


--- SAMPLE OUTPUT FOR ONE PATIENT ---

Patient: John Doe (ID: 101)

📋 Medications Prescribed:
   None documented

🏥 Conditions Identified:
   None documented

🤒 Symptoms Noted:
   None documented

💊 Doctor's Instructions:
   ---

Family anxious | send wood culture, start bankomacinal, consult surgery

📅 Follow-up Actions:
   surgery



In [0]:
"""
USE CASE 4: HOSPITAL READMISSION RISK PREDICTION
================================================
Predict the probability of a patient being readmitted within 30 days
using Spark ML (Logistic Regression & Random Forest)
"""

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

print("="*80)
print("USE CASE 4: HOSPITAL READMISSION RISK PREDICTION")
print("="*80)

# =========================================================
# STEP 1: FEATURE ENGINEERING
# =========================================================
print("\n--- Step 1: Loading and Engineering Features ---")

# Load your existing tables
lab_results_df = spark.read.table("lab_results_silver")
patient_notes_df = spark.read.table("patient_notes")
discharge_summary_df = spark.read.table("discharge_summary_gold")

# --- Feature 1: Count of Lab Tests per Patient ---
lab_test_counts = lab_results_df.groupBy("patient_id").agg(
    F.count("test").alias("num_lab_tests"),
    F.sum(F.when(F.lower(F.col("result")).contains("high"), 1).otherwise(0)).alias("num_high_results"),
    F.sum(F.when(F.lower(F.col("result")).contains("low"), 1).otherwise(0)).alias("num_low_results")
)

# --- Feature 2: Extract Age from Patient Name (if available) or create synthetic ---
# For demonstration, let's create age groups based on patient_id patterns
patient_features = patient_notes_df.select(
    "patient_id",
    "patient_name"
).withColumn(
    "age_group",
    F.when(F.col("patient_id").cast("int") <= 105, "senior")  # 101-105
     .when(F.col("patient_id").cast("int") <= 110, "adult")   # 106-110
     .otherwise("middle_aged")  # 201+
)

# --- Feature 3: Extract Medication Count from Discharge Summary ---
medications_count = discharge_summary_df.select(
    "patient_id",
    F.size(F.split(F.col("Full_History_Text"), "medication|prescribed|started on")).alias("medication_mentions")
)

# --- Feature 4: Severity Score from Nursing Notes ---
severity_features = patient_notes_df.select(
    "patient_id",
    F.length("nursing_notes").alias("notes_length"),
    # Count severity indicators
    (F.lower(F.col("nursing_notes")).rlike("pain|distress|critical|severe|urgent")).cast("int").alias("has_severity_keywords"),
    # Count positive indicators
    (F.lower(F.col("nursing_notes")).rlike("stable|improved|comfortable|recovering")).cast("int").alias("has_positive_keywords")
)

# --- Feature 5: CREATE SYNTHETIC READMISSION LABEL ---
# In real scenarios, this would come from actual readmission data
# For demonstration, we'll create labels based on risk factors
readmission_labels = patient_notes_df.select("patient_id").withColumn(
    "readmitted_30days",
    # Patients with IDs 102, 104, 107, 201, 205 are "readmitted" for demo
    F.when(F.col("patient_id").isin(["102", "104", "107", "201", "205"]), 1).otherwise(0)
)

# --- STEP 2: JOIN ALL FEATURES ---
print("\n--- Step 2: Joining All Features ---")

ml_dataset = patient_features \
    .join(lab_test_counts, "patient_id", "left") \
    .join(medications_count, "patient_id", "left") \
    .join(severity_features, "patient_id", "left") \
    .join(readmission_labels, "patient_id", "left") \
    .fillna(0)  # Fill nulls with 0

print("Feature Engineering Complete. Sample:")
ml_dataset.show(5, truncate=False)

# =========================================================
# STEP 3: PREPARE DATA FOR ML
# =========================================================
print("\n--- Step 3: Preparing Data for Machine Learning ---")

# Convert categorical feature (age_group) to numeric
indexer = StringIndexer(inputCol="age_group", outputCol="age_group_indexed")

# Select numeric features
feature_cols = [
    "age_group_indexed",
    "num_lab_tests",
    "num_high_results",
    "num_low_results",
    "medication_mentions",
    "notes_length",
    "has_severity_keywords",
    "has_positive_keywords"
]

# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")

# Scale features (important for logistic regression)
scaler = StandardScaler(inputCol="features_raw", outputCol="features")

# Split data into training and test sets (80/20)
train_df, test_df = ml_dataset.randomSplit([0.8, 0.2], seed=42)

print(f"Training set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")

# =========================================================
# STEP 4: TRAIN LOGISTIC REGRESSION MODEL
# =========================================================
print("\n--- Step 4: Training Logistic Regression Model ---")

# Create Logistic Regression model
lr = LogisticRegression(
    featuresCol="features",
    labelCol="readmitted_30days",
    maxIter=100,
    regParam=0.01
)

# Create pipeline
pipeline_lr = Pipeline(stages=[indexer, assembler, scaler, lr])

# Train model
model_lr = pipeline_lr.fit(train_df)

# Make predictions
predictions_lr = model_lr.transform(test_df)

print("\n--- Logistic Regression Predictions ---")
predictions_lr.select(
    "patient_id",
    "patient_name",
    "readmitted_30days",
    "prediction",
    "probability"
).show(truncate=False)

# =========================================================
# STEP 5: TRAIN RANDOM FOREST MODEL
# =========================================================
print("\n--- Step 5: Training Random Forest Model ---")

# Create Random Forest model
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="readmitted_30days",
    numTrees=50,
    maxDepth=5,
    seed=42
)

# Create pipeline
pipeline_rf = Pipeline(stages=[indexer, assembler, scaler, rf])

# Train model
model_rf = pipeline_rf.fit(train_df)

# Make predictions
predictions_rf = model_rf.transform(test_df)

print("\n--- Random Forest Predictions ---")
predictions_rf.select(
    "patient_id",
    "patient_name",
    "readmitted_30days",
    "prediction",
    "probability"
).show(truncate=False)

# =========================================================
# STEP 6: EVALUATE MODELS
# =========================================================
print("\n--- Step 6: Model Evaluation ---")

# Binary Classification Evaluator (for AUC-ROC)
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="readmitted_30days",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# Multiclass Evaluator (for accuracy)
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="readmitted_30days",
    predictionCol="prediction",
    metricName="accuracy"
)

# Evaluate Logistic Regression
lr_auc = evaluator_auc.evaluate(predictions_lr)
lr_accuracy = evaluator_acc.evaluate(predictions_lr)

print(f"\n📊 LOGISTIC REGRESSION PERFORMANCE:")
print(f"   AUC-ROC: {lr_auc:.4f}")
print(f"   Accuracy: {lr_accuracy:.4f}")

# Evaluate Random Forest
rf_auc = evaluator_auc.evaluate(predictions_rf)
rf_accuracy = evaluator_acc.evaluate(predictions_rf)

print(f"\n📊 RANDOM FOREST PERFORMANCE:")
print(f"   AUC-ROC: {rf_auc:.4f}")
print(f"   Accuracy: {rf_accuracy:.4f}")

# =========================================================
# STEP 7: CREATE FINAL RISK PREDICTION TABLE
# =========================================================
print("\n--- Step 7: Creating Final Risk Prediction Table ---")

# Use the best model (let's use Random Forest for this example)
final_predictions = model_rf.transform(ml_dataset)

# Extract probability of readmission (probability vector's second element)
readmission_risk_df = final_predictions.select(
    "patient_id",
    "patient_name",
    F.col("prediction").cast("int").alias("predicted_readmission"),
    F.col("probability").getItem(1).alias("readmission_probability")
).withColumn(
    "risk_category",
    F.when(F.col("readmission_probability") >= 0.7, "HIGH RISK")
     .when(F.col("readmission_probability") >= 0.4, "MEDIUM RISK")
     .otherwise("LOW RISK")
).orderBy(F.col("readmission_probability").desc())

print("\n--- FINAL READMISSION RISK PREDICTIONS ---")
readmission_risk_df.show(truncate=False)

# Save to Delta table
readmission_risk_df.write.format("delta").mode("overwrite").saveAsTable("readmission_risk_predictions")

print("\n✓ Readmission risk predictions saved to table: readmission_risk_predictions")

# =========================================================
# STEP 8: FEATURE IMPORTANCE (Random Forest)
# =========================================================
print("\n--- Step 8: Feature Importance Analysis ---")

# Extract the Random Forest model from the pipeline
rf_model = model_rf.stages[-1]

# Get feature importances
feature_importance = list(zip(feature_cols, rf_model.featureImportances.toArray()))
feature_importance_sorted = sorted(feature_importance, key=lambda x: x[1], reverse=True)

print("\n📊 TOP FEATURES PREDICTING READMISSION:")
for i, (feature, importance) in enumerate(feature_importance_sorted, 1):
    print(f"   {i}. {feature}: {importance:.4f}")

# =========================================================
# STEP 9: SUMMARY STATISTICS
# =========================================================
print("\n" + "="*80)
print("READMISSION RISK PREDICTION SUMMARY")
print("="*80)

summary_stats = readmission_risk_df.groupBy("risk_category").agg(
    F.count("*").alias("num_patients"),
    F.avg("readmission_probability").alias("avg_risk_score")
).orderBy(F.desc("avg_risk_score"))

summary_stats.show()

print("\n✓ Use Case 4 Complete: Hospital Readmission Risk Prediction")
print("="*80)

USE CASE 4: HOSPITAL READMISSION RISK PREDICTION

--- Step 1: Loading and Engineering Features ---

--- Step 2: Joining All Features ---
Feature Engineering Complete. Sample:
+----------+------------+---------+-------------+----------------+---------------+-------------------+------------+---------------------+---------------------+-----------------+
|patient_id|patient_name|age_group|num_lab_tests|num_high_results|num_low_results|medication_mentions|notes_length|has_severity_keywords|has_positive_keywords|readmitted_30days|
+----------+------------+---------+-------------+----------------+---------------+-------------------+------------+---------------------+---------------------+-----------------+
|101       |John Doe    |senior   |3            |1               |0              |1                  |116         |1                    |1                    |0                |
|102       |Jane Smith  |senior   |2            |0               |1              |1                  |134        

[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-7925695283119308>, line 168[0m
[1;32m    165[0m pipeline_rf [38;5;241m=[39m Pipeline(stages[38;5;241m=[39m[indexer, assembler, scaler, rf])
[1;32m    167[0m [38;5;66;03m# Train model[39;00m
[0;32m--> 168[0m model_rf [38;5;241m=[39m pipeline_rf[38;5;241m.[39mfit(train_df)
[1;32m    170[0m [38;5;66;03m# Make predictions[39;00m
[1;32m    171[0m predictions_rf [38;5;241m=[39m model_rf[38;5;241m.[39mtransform(test_df)

File [0;32m/databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m call_succeeded [38;5;241m=[39m [38;5;28;01mFalse[39;00m
[1;32m     29[0m [38;5;28;01mtry[39;00m:
[0;32m---> 30[0m     result [38;5;241m=[39m

HOSPITAL READMISSION PREDICTION

In [0]:
"""
USE CASE 4: HOSPITAL READMISSION RISK PREDICTION (Using Existing Pipeline Tables)
==================================================================================
Predict the probability of a patient being readmitted within 30 days
using Spark ML - ONLY reading from existing Delta tables
"""

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

print("="*80)
print("USE CASE 4: HOSPITAL READMISSION RISK PREDICTION")
print("Using Existing Pipeline Tables Only - No Reprocessing!")
print("="*80)

# =========================================================
# STEP 1: LOAD EXISTING DELTA TABLES
# =========================================================
print("\n--- Step 1: Loading Existing Delta Tables ---")

transcripts_bronze = spark.read.table("transcripts_bronze")
lab_results_silver = spark.read.table("lab_results_silver")
nursing_notes_silver = spark.read.table("nursing_notes_silver")
discharge_summary_gold = spark.read.table("discharge_summary_gold")
patient_portal_gold = spark.read.table("patient_portal_gold")

print(f"✓ Loaded transcripts_bronze: {transcripts_bronze.count()} records")
print(f"✓ Loaded lab_results_silver: {lab_results_silver.count()} records")
print(f"✓ Loaded nursing_notes_silver: {nursing_notes_silver.count()} records")
print(f"✓ Loaded discharge_summary_gold: {discharge_summary_gold.count()} records")
print(f"✓ Loaded patient_portal_gold: {patient_portal_gold.count()} records")

# =========================================================
# STEP 2: FEATURE ENGINEERING FROM EXISTING TABLES
# =========================================================
print("\n--- Step 2: Feature Engineering from Existing Tables ---")

# --- Feature Set 1: Lab Results Analysis ---
lab_features = lab_results_silver.groupBy("patient_id").agg(
    F.count("test").alias("total_lab_tests"),
    F.countDistinct("test").alias("unique_tests"),
    F.sum(F.when(F.lower(F.col("result")).rlike("high|elevated|abnormal"), 1).otherwise(0)).alias("abnormal_high_count"),
    F.sum(F.when(F.lower(F.col("result")).rlike("low|decreased"), 1).otherwise(0)).alias("abnormal_low_count"),
    F.sum(F.when(F.col("date") == "NA", 1).otherwise(0)).alias("missing_dates_count")
)

print("Lab Features:")
lab_features.show(5, truncate=False)

# --- Feature Set 2: Nursing Notes Complexity ---
nursing_features = nursing_notes_silver.select(
    "patient_id",
    F.length("nursing_notes").alias("notes_length"),
    # Severity indicators
    (F.lower(F.col("nursing_notes")).rlike("pain|distress|critical|severe|acute|emergency")).cast("int").alias("severity_mentioned"),
    # Complication indicators
    (F.lower(F.col("nursing_notes")).rlike("infection|complication|deteriorat|worsen")).cast("int").alias("complications_mentioned"),
    # Positive progress indicators
    (F.lower(F.col("nursing_notes")).rlike("stable|improving|improved|recovering|better")).cast("int").alias("positive_progress"),
    # Count vital signs mentions (indicator of monitoring intensity)
    (F.size(F.split(F.lower(F.col("nursing_notes")), "vital")) - 1).alias("vital_signs_mentions")
)

print("\nNursing Notes Features:")
nursing_features.show(5, truncate=False)

# --- Feature Set 3: Patient Portal Medications ---
medications_features = patient_portal_gold.select(
    "patient_id",
    # Count number of medications
    F.when(F.col("medications_prescribed") == "None documented", 0)
     .otherwise(F.size(F.split(F.col("medications_prescribed"), ","))).alias("medication_count"),
    # Count number of conditions
    F.when(F.col("conditions_identified") == "None documented", 0)
     .otherwise(F.size(F.split(F.col("conditions_identified"), ","))).alias("condition_count"),
    # Check if follow-up exists
    (F.col("follow_up_actions").rlike("consult|appointment|follow")).cast("int").alias("has_followup")
)

print("\nMedication & Condition Features:")
medications_features.show(5, truncate=False)

# --- Feature Set 4: Discharge Summary Complexity ---
discharge_features = discharge_summary_gold.select(
    "patient_id",
    F.length("Full_History_Text").alias("discharge_text_length"),
    # Count medical terms (indicator of case complexity)
    (F.size(F.split(F.lower(F.col("Full_History_Text")), "diagnosis|treatment|procedure")) - 1).alias("medical_terms_count")
)

print("\nDischarge Summary Features:")
discharge_features.show(5, truncate=False)

# --- Feature Set 5: Transcript Indicators ---
transcript_features = transcripts_bronze.select(
    "patient_id",
    F.length("transcript_text").alias("transcript_length"),
    # Detect emotional distress in audio
    (F.lower(F.col("transcript_text")).rlike("worried|scared|anxious|concerned|afraid")).cast("int").alias("emotional_distress"),
    # Detect comprehension issues
    (F.lower(F.col("transcript_text")).rlike("don't understand|confused|unclear|repeat")).cast("int").alias("comprehension_issues")
)

print("\nTranscript Features:")
transcript_features.show(5, truncate=False)

# =========================================================
# STEP 3: JOIN ALL FEATURES
# =========================================================
print("\n--- Step 3: Joining All Features into Master Table ---")

ml_dataset = lab_features \
    .join(nursing_features, "patient_id", "left") \
    .join(medications_features, "patient_id", "left") \
    .join(discharge_features, "patient_id", "left") \
    .join(transcript_features, "patient_id", "left") \
    .fillna(0)  # Fill any remaining nulls with 0

# Add patient name for reference
ml_dataset = ml_dataset.join(
    patient_portal_gold.select("patient_id", "patient_name"),
    "patient_id",
    "left"
)

print("\nMaster Feature Table:")
ml_dataset.show(5, truncate=False)

# =========================================================
# STEP 4: CREATE SYNTHETIC READMISSION LABELS
# =========================================================
print("\n--- Step 4: Creating Readmission Labels (Synthetic for Demo) ---")

# In production, this would come from actual readmission data
# For demo: patients with high risk factors = readmitted
ml_dataset = ml_dataset.withColumn(
    "readmitted_30days",
    F.when(
        (F.col("abnormal_high_count") >= 2) |
        (F.col("complications_mentioned") == 1) |
        (F.col("medication_count") >= 3) |
        (F.col("severity_mentioned") == 1),
        1
    ).otherwise(0)
)

# Show label distribution
print("\nReadmission Label Distribution:")
ml_dataset.groupBy("readmitted_30days").count().show()

# =========================================================
# STEP 5: PREPARE DATA FOR ML
# =========================================================
print("\n--- Step 5: Preparing Data for Machine Learning ---")

# Select feature columns (all numeric)
feature_cols = [
    "total_lab_tests",
    "unique_tests",
    "abnormal_high_count",
    "abnormal_low_count",
    "missing_dates_count",
    "notes_length",
    "severity_mentioned",
    "complications_mentioned",
    "positive_progress",
    "vital_signs_mentions",
    "medication_count",
    "condition_count",
    "has_followup",
    "discharge_text_length",
    "medical_terms_count",
    "transcript_length",
    "emotional_distress",
    "comprehension_issues"
]

# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features_raw")

# Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features")

# Split data (80/20)
train_df, test_df = ml_dataset.randomSplit([0.8, 0.2], seed=42)

print(f"Training set: {train_df.count()} patients")
print(f"Test set: {test_df.count()} patients")

# =========================================================
# STEP 6: TRAIN RANDOM FOREST MODEL
# =========================================================
print("\n--- Step 6: Training Random Forest Classifier ---")

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="readmitted_30days",
    numTrees=100,
    maxDepth=5,
    seed=42
)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

# Train model
print("Training model...")
model = pipeline.fit(train_df)
print("✓ Model trained successfully")

# =========================================================
# STEP 7: MAKE PREDICTIONS
# =========================================================
print("\n--- Step 7: Making Predictions ---")

# Predict on all data
predictions = model.transform(ml_dataset)

# Extract readmission probability
readmission_predictions = predictions.select(
    "patient_id",
    "patient_name",
    "readmitted_30days",
    F.col("prediction").cast("int").alias("predicted_readmission"),
    F.col("probability").getItem(1).alias("readmission_risk_score")
).withColumn(
    "risk_category",
    F.when(F.col("readmission_risk_score") >= 0.7, "🔴 HIGH RISK")
     .when(F.col("readmission_risk_score") >= 0.4, "🟡 MEDIUM RISK")
     .otherwise("🟢 LOW RISK")
).orderBy(F.col("readmission_risk_score").desc())

print("\n--- READMISSION RISK PREDICTIONS ---")
readmission_predictions.show(20, truncate=False)

# =========================================================
# STEP 8: MODEL EVALUATION
# =========================================================
print("\n--- Step 8: Model Evaluation ---")

test_predictions = model.transform(test_df)

# AUC-ROC
evaluator_auc = BinaryClassificationEvaluator(
    labelCol="readmitted_30days",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc = evaluator_auc.evaluate(test_predictions)

# Accuracy
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="readmitted_30days",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator_acc.evaluate(test_predictions)

print(f"\n📊 MODEL PERFORMANCE:")
print(f"   AUC-ROC: {auc:.4f}")
print(f"   Accuracy: {accuracy:.4f}")

# =========================================================
# STEP 9: FEATURE IMPORTANCE
# =========================================================
print("\n--- Step 9: Feature Importance Analysis ---")

rf_model = model.stages[-1]
feature_importance = list(zip(feature_cols, rf_model.featureImportances.toArray()))
feature_importance_sorted = sorted(feature_importance, key=lambda x: x[1], reverse=True)

print("\n📊 TOP 10 FEATURES PREDICTING READMISSION:")
for i, (feature, importance) in enumerate(feature_importance_sorted[:10], 1):
    print(f"   {i}. {feature}: {importance:.4f}")

# =========================================================
# STEP 10: SAVE RESULTS TO GOLD LAYER
# =========================================================
print("\n--- Step 10: Saving Results to Gold Layer ---")

# Save predictions
readmission_predictions.write.format("delta").mode("overwrite").saveAsTable("readmission_risk_gold")
print("✓ Saved to table: readmission_risk_gold")

# =========================================================
# STEP 11: SUMMARY DASHBOARD
# =========================================================
print("\n" + "="*80)
print("READMISSION RISK PREDICTION SUMMARY")
print("="*80)

summary = readmission_predictions.groupBy("risk_category").agg(
    F.count("*").alias("num_patients"),
    F.avg("readmission_risk_score").alias("avg_risk_score"),
    F.min("readmission_risk_score").alias("min_risk_score"),
    F.max("readmission_risk_score").alias("max_risk_score")
).orderBy(F.desc("avg_risk_score"))

summary.display(truncate=False)

print("\n✓ Use Case 4 Complete: Hospital Readmission Risk Prediction")
print("✓ All data sourced from existing pipeline tables")
print("="*80)

USE CASE 4: HOSPITAL READMISSION RISK PREDICTION
Using Existing Pipeline Tables Only - No Reprocessing!

--- Step 1: Loading Existing Delta Tables ---
✓ Loaded transcripts_bronze: 20 records
✓ Loaded lab_results_silver: 31 records
✓ Loaded nursing_notes_silver: 19 records
✓ Loaded discharge_summary_gold: 19 records
✓ Loaded patient_portal_gold: 19 records

--- Step 2: Feature Engineering from Existing Tables ---
Lab Features:
+----------+---------------+------------+-------------------+------------------+-------------------+
|patient_id|total_lab_tests|unique_tests|abnormal_high_count|abnormal_low_count|missing_dates_count|
+----------+---------------+------------+-------------------+------------------+-------------------+
|101       |3              |3           |2                  |0                 |0                  |
|102       |2              |2           |0                  |1                 |0                  |
|103       |3              |3           |0                  |1   

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7925695283119309>, line 69[0m
[1;32m     55[0m nursing_features [38;5;241m=[39m nursing_notes_silver[38;5;241m.[39mselect(
[1;32m     56[0m     [38;5;124m"[39m[38;5;124mpatient_id[39m[38;5;124m"[39m,
[1;32m     57[0m     F[38;5;241m.[39mlength([38;5;124m"[39m[38;5;124mnursing_notes[39m[38;5;124m"[39m)[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mnotes_length[39m[38;5;124m"[39m),
[0;32m   (...)[0m
[1;32m     65[0m     (F[38;5;241m.[39msize(F[38;5;241m.[39msplit(F[38;5;241m.[39mlower(F[38;5;241m.[39mcol([38;5;124m"[39m[38;5;124mnursing_notes[39m[38;5;124m"[39m)), [38;5;124m"[39m[38;5;124mvital[39m[38;5;124m"[39m)) [38;5;241m-[39m [38;5;241m1[39m)[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mvital_signs_mentions[39m[38;5;124m"[39

USE CASE 5

In [0]:
# =========================================================
# DISCHARGE READINESS INDICATORS
# =========================================================
print("\n" + "="*80)
print("INSIGHT 6: DISCHARGE READINESS ASSESSMENT")
print("="*80)

discharge_readiness = patient_portal_gold.select(
    "patient_id",
    "patient_name",
    "follow_up_actions"
).join(
    nursing_notes_silver.select(
        "patient_id",
        (F.lower(F.col("nursing_notes")).rlike("stable|improved|recovering")).cast("int").alias("positive_indicators"),
        (F.lower(F.col("nursing_notes")).rlike("pain|distress|complication")).cast("int").alias("concerning_indicators")
    ),
    "patient_id"
).withColumn(
    "has_followup_plan",
    (F.col("follow_up_actions").rlike("appointment|consult|follow")).cast("int")
).withColumn(
    "discharge_readiness_score",
    (F.col("positive_indicators") * 2) - F.col("concerning_indicators") + F.col("has_followup_plan")
).withColumn(
    "discharge_status",
    F.when(F.col("discharge_readiness_score") >= 3, "✅ READY")
     .when(F.col("discharge_readiness_score") >= 1, "⏳ MONITOR")
     .otherwise("⚠️ NOT READY")
)

print("\n📋 Discharge Readiness Summary:")
discharge_readiness.groupBy("discharge_status").count().orderBy("discharge_status").show()

print("\n👥 Patient Discharge Status:")
discharge_readiness.select(
    "patient_id", "patient_name", "discharge_readiness_score", "discharge_status"
).orderBy(F.desc("discharge_readiness_score")).show(truncate=False)



INSIGHT 6: DISCHARGE READINESS ASSESSMENT

📋 Discharge Readiness Summary:


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7925695283119312>, line 33[0m
[1;32m      8[0m discharge_readiness [38;5;241m=[39m patient_portal_gold[38;5;241m.[39mselect(
[1;32m      9[0m     [38;5;124m"[39m[38;5;124mpatient_id[39m[38;5;124m"[39m,
[1;32m     10[0m     [38;5;124m"[39m[38;5;124mpatient_name[39m[38;5;124m"[39m,
[0;32m   (...)[0m
[1;32m     29[0m      [38;5;241m.[39motherwise([38;5;124m"[39m[38;5;124m⚠️ NOT READY[39m[38;5;124m"[39m)
[1;32m     30[0m )
[1;32m     32[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124m📋 Discharge Readiness Summary:[39m[38;5;124m"[39m)
[0;32m---> 33[0m discharge_readiness[38;5;241m.[39mgroupBy([38;5;124m"[39m[38;5;124mdischarge_status[39m[38;5;124m"[39m)[38;5;241m.[39mcount()[38;5;241m.[39morderBy([38;5;124m"[39m[3