In [18]:
# --- CELL 1: All Installations ---
print("Installing all required packages...")
%pip install -q fpdf reportlab
%pip install -q "fhir.resources>=7.0.0"
%pip install -q python-dateutil
%pip install -q ipywidgets
print("✅ All packages installed.")

Installing all required packages...
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
✅ All packages installed.


In [1]:
from paddleocr import PaddleOCR # main OCR dependencies
from matplotlib import pyplot as plt # plot images
import cv2 #opencv
import os
import pytesseract
from PIL import Image
# --- CELL 2: All Imports ---
print("Importing all libraries...")

# Core & Data Handling
import os
import re
import json
from pathlib import Path
from datetime import datetime
from dateutil.parser import parse
import uuid

# NLP / Machine Learning
import torch
from transformers import pipeline

# FHIR Resources
from fhir.resources.bundle import Bundle, BundleEntry
from fhir.resources.patient import Patient
from fhir.resources.condition import Condition
from fhir.resources.observation import Observation
from fhir.resources.medicationrequest import MedicationRequest
from fhir.resources.humanname import HumanName
from fhir.resources.identifier import Identifier
from fhir.resources.codeableconcept import CodeableConcept
from fhir.resources.coding import Coding
from fhir.resources.quantity import Quantity
from fhir.resources.reference import Reference
import ipywidgets as widgets
from IPython.display import display, FileLink

print("✅ All libraries imported.")

Importing all libraries...
✅ All libraries imported.


In [7]:
# --- CELL 3: OCR & Data Extraction System (V3) ---
print("\n--- CELL 3: Defining OCR and Data Extraction Functions ---")

# --- OCR Setup ---
if 'paddle_ocr' not in globals():
    paddle_ocr = PaddleOCR(lang='en', use_angle_cls=True)
    print("✅ PaddleOCR model loaded.")

def run_medical_ocr(image_path):
    """Run PaddleOCR on an image and return recognized text."""
    try:
        # --- FIX IS HERE ---
        # The 'cls' argument is no longer supported in this call.
        result = paddle_ocr.predict(image_path)
        # --- END FIX ---
        
        lines = result[0]
        texts = [line[1][0] for line in lines]
        return "\n".join(texts)
    except Exception as e:
        print(f"❌ PaddleOCR failed: {e}")
        return ""

# --- NER Setup ---
if 'ner_pipeline' not in globals():
    # Use device=0 for GPU if available, otherwise device=-1 for CPU
    device = 0 if torch.cuda.is_available() else -1
    ner_pipeline = pipeline("ner", model="d4data/biomedical-ner-all", tokenizer="d4data/biomedical-ner-all", device=device, grouped_entities=True)
    print(f"✅ BioBERT NER model loaded on device: {'GPU' if device == 0 else 'CPU'}.")

# --- Extraction Helper Functions ---
def safe_search(pattern, text, group=1):
    """Safely perform regex search."""
    match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
    return match.group(group).strip() if match else None

def extract_section(text, start_keywords, end_keywords):
    """Extract a section between start and end keywords."""
    start_pattern = '|'.join([re.escape(k) for k in start_keywords])
    end_pattern = '|'.join([re.escape(k) for k in end_keywords])
    pattern = rf'(?:{start_pattern})[\s:]*(.*?)(?=\n(?:{end_pattern})|$)'
    match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
    return match.group(1).strip() if match else None

# --- Main Extraction Logic ---
def extract_medical_data(report_text, ner_pipeline):
    """Main function to detect document type and extract data accordingly."""
    # Simple document type detection
    if re.search(r"COVID-19|PCR|MOLECULAR\s+DIAGNOSTIC", report_text, re.IGNORECASE):
        doc_type = "lab_report"
    elif re.search(r"DISCHARGE\s+SUMMARY", report_text, re.IGNORECASE):
        doc_type = "discharge_summary"
    else:
        doc_type = "clinical_report"
    
    print(f"Detected document type: {doc_type}")

    # For this consolidated example, we will use a generic extractor.
    print("Processing with generic clinical report extractor...")
    extracted = {
        "document_type": doc_type, "patient_info": {}, "report_meta": {},
        "conditions": [], "medications": [], "vitals": [],
    }

    # Patient Info
    extracted["patient_info"]['name'] = safe_search(r"(?:Patient|Name)\s*:\s*([A-Za-z\s.-]+)", report_text)
    extracted["patient_info"]['id'] = safe_search(r"(?:MRN|Patient ID)\s*:\s*([A-Za-z0-9-]+)", report_text)
    extracted["patient_info"]['gender'] = safe_search(r"(?:Sex|Gender)\s*:\s*(Male|Female|M|F)", report_text)
    extracted["patient_info"]['dob'] = safe_search(r"(?:DOB|Date of Birth)\s*:\s*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})", report_text)

    # Vitals
    vital_patterns = {
        'BP': r"BP\s*[:=]\s*(\d+/\d+)", 'HR': r"HR\s*[:=]\s*(\d+)",
        'RR': r"RR\s*[:=]\s*(\d+)", 'TEMP': r"T\s*[:=]\s*([0-9.,]+)",
    }
    for vital, pattern in vital_patterns.items():
        value = safe_search(pattern, report_text)
        if value:
            extracted["vitals"].append({"name": vital, "value": value})
    
    # NER for Conditions and Medications
    try:
        ner_results = ner_pipeline(report_text[:4000]) # Limit text for performance
        for entity in ner_results:
            entity_group = entity['entity_group']
            entity_text = entity['word']
            
            if entity['score'] > 0.8: # Confidence threshold
                if entity_group in ["DISEASE", "PROBLEM"] and entity_text not in extracted["conditions"]:
                    extracted["conditions"].append(entity_text)
                elif entity_group in ["DRUG", "CHEMICAL"] and entity_text not in extracted["medications"]:
                    extracted["medications"].append(entity_text)
    except Exception as e:
        print(f"Error during NER processing: {e}")

    return extracted

print("✅ CELL 3 Complete: OCR and Extraction functions defined.")


--- CELL 3: Defining OCR and Data Extraction Functions ---
✅ CELL 3 Complete: OCR and Extraction functions defined.


In [3]:
# --- CELL 4: Custom-to-FHIR Transformer ---
print("\n--- CELL 4: Defining Custom-to-FHIR Transformer ---")

def safe_parse_date(date_string):
    """Safely parse various date formats into FHIR-compatible ISO format."""
    if not date_string: return None
    try:
        return parse(date_string.replace("/", "-"), yearfirst=False, dayfirst=True).date().isoformat()
    except Exception:
        return None

def create_patient_resource(patient_info):
    """Converts patient_info dict to a FHIR Patient resource."""
    patient = Patient()
    patient.id = str(uuid.uuid4())
    if patient_info.get('name'):
        name = HumanName(text=patient_info['name'])
        patient.name = [name]
    if patient_info.get('gender'):
        patient.gender = patient_info['gender'].lower().strip()[0] == 'm' and 'male' or 'female'
    if patient_info.get('dob'):
        patient.birthDate = safe_parse_date(patient_info.get('dob'))
    if patient_info.get('id'):
        patient.identifier = [Identifier(system="http://hospital.example/mrn", value=patient_info['id'])]
    return patient

def create_condition_resources(diagnoses, patient_ref):
    """Converts a list of diagnoses to FHIR Condition resources."""
    return [Condition(
        id=str(uuid.uuid4()),
        subject=patient_ref,
        clinicalStatus=CodeableConcept(coding=[Coding(system="http://terminology.hl7.org/CodeSystem/condition-clinical", code="active")]),
        code=CodeableConcept(text=diag_text)
    ) for diag_text in diagnoses]

def create_observation_resources(vitals, patient_ref):
    """Converts vitals list to FHIR Observation resources."""
    resources = []
    loinc_map = {'BP': '85354-9', 'HR': '8867-4', 'RR': '9279-1', 'TEMP': '8310-5'}
    for vital in vitals:
        if vital.get('name') not in loinc_map: continue
        obs = Observation(
            id=str(uuid.uuid4()), status="final", subject=patient_ref,
            category=[CodeableConcept(coding=[Coding(system="http://terminology.hl7.org/CodeSystem/observation-category", code="vital-signs")])],
            code=CodeableConcept(coding=[Coding(system="http://loinc.org", code=loinc_map[vital['name']])], text=vital['name'])
        )
        if vital['name'] == 'BP':
            parts = vital['value'].split('/')
            if len(parts) == 2:
                obs.component = [
                    Observation.Component(code=CodeableConcept(coding=[Coding(system="http://loinc.org", code="8480-6")]), valueQuantity=Quantity(value=float(parts[0]), unit="mmHg")),
                    Observation.Component(code=CodeableConcept(coding=[Coding(system="http://loinc.org", code="8462-4")]), valueQuantity=Quantity(value=float(parts[1]), unit="mmHg"))
                ]
        else:
            numeric_val = re.search(r'([0-9.,]+)', vital['value'])
            if numeric_val:
                obs.valueQuantity = Quantity(value=float(numeric_val.group(1).replace(',','.')), unit = 'C' if vital['name'] == 'TEMP' else '/min')
        if obs.component or obs.valueQuantity:
            resources.append(obs)
    return resources
    
def create_medication_resources(medications, patient_ref):
    """Converts medication list to FHIR MedicationRequest resources."""
    return [MedicationRequest(
        id=str(uuid.uuid4()), status="active", intent="order", subject=patient_ref,
        medicationCodeableConcept=CodeableConcept(text=med_text)
    ) for med_text in medications]

def convert_custom_to_fhir(custom_data):
    """Main function to convert the custom dictionary into a FHIR Bundle."""
    if not custom_data or not custom_data.get('patient_info'):
        print("❌ FHIR Conversion Failed: No patient information found.")
        return None
        
    bundle = Bundle(type="collection", id=str(uuid.uuid4()), entry=[])
    
    patient = create_patient_resource(custom_data['patient_info'])
    patient_ref = Reference(reference=f"Patient/{patient.id}")
    bundle.entry.append(Bundle.Entry(resource=patient, fullUrl=f"urn:uuid:{patient.id}"))

    resources = [
        *create_condition_resources(custom_data.get('conditions', []), patient_ref),
        *create_observation_resources(custom_data.get('vitals', []), patient_ref),
        *create_medication_resources(custom_data.get('medications', []), patient_ref)
    ]
    for res in resources:
        bundle.entry.append(Bundle.Entry(resource=res, fullUrl=f"urn:uuid:{res.id}"))
        
    print(f"✅ FHIR Bundle created with {len(bundle.entry)} resources.")
    return bundle

print("✅ CELL 4 Complete: FHIR Transformer functions defined.")


--- CELL 4: Defining Custom-to-FHIR Transformer ---
✅ CELL 4 Complete: FHIR Transformer functions defined.


In [2]:
# --- CELL 5: Single-Button Interactive Pipeline ---
print("\n--- CELL 5: Initializing Simplified UI Pipeline ---")

# We need FPDF for PDF generation
from fpdf import FPDF
from datetime import datetime
from IPython.display import FileLink, clear_output
import ipywidgets as widgets
import json

# --- 1. Helper Function to Generate a PDF Report ---
def create_medical_report_pdf(data, pdf_path):
    """Generates a human-readable PDF summary from the extracted data."""
    pdf = FPDF()
    pdf.add_page()
    pdf.set_font("Arial", size=10)

    # Helper to add a titled section
    def add_section(title, content_dict):
        pdf.set_font("Arial", 'B', 14)
        pdf.cell(0, 10, title, 0, 1, 'L')
        pdf.set_font("Arial", size=10)
        if not content_dict:
            pdf.cell(0, 7, "  - No data found -", 0, 1)
            return
        for key, value in content_dict.items():
            line = f"  {str(key).replace('_', ' ').title()}: {str(value)}"
            try:
                pdf.cell(0, 7, line, 0, 1)
            except UnicodeEncodeError:
                pdf.cell(0, 7, line.encode('latin-1', 'replace').decode('latin-1'), 0, 1)
        pdf.ln(5)

    # Helper for lists
    def add_list_section(title, content_list):
        pdf.set_font("Arial", 'B', 14)
        pdf.cell(0, 10, title, 0, 1, 'L')
        pdf.set_font("Arial", size=10)
        if not content_list:
            pdf.cell(0, 7, "  - No data found -", 0, 1)
            return
        for item in content_list:
            line = f"  - {str(item)}"
            try:
                pdf.multi_cell(0, 7, line)
            except UnicodeEncodeError:
                pdf.multi_cell(0, 7, line.encode('latin-1', 'replace').decode('latin-1'))
        pdf.ln(5)

    # --- Build the PDF ---
    pdf.set_font("Arial", 'B', 16)
    pdf.cell(0, 10, "Medical Report Summary", 0, 1, 'C')
    pdf.ln(10)

    add_section("Patient Information", data.get("patient_info"))
    add_list_section("Vitals", [f"{v['name']}: {v['value']}" for v in data.get("vitals", [])])
    add_list_section("Conditions / Diagnoses", data.get("conditions"))
    add_list_section("Medications", data.get("medications"))
    
    pdf.output(pdf_path)

# --- 2. Define the UI Widgets ---
output_area = widgets.Output()
upload_widget = widgets.FileUpload(
    accept=".pdf,.png,.jpg,.jpeg",
    multiple=False,
    description="Upload Report"
)
process_btn = widgets.Button(
    description="Process Uploaded File",
    button_style='success',
    tooltip='Run the full OCR and FHIR pipeline'
)

# --- 3. Define the Main Callback Function ---
def process_uploaded_file(btn):
    """The main function triggered by the 'Process' button."""
    with output_area:
        clear_output(wait=True)
        
        if not upload_widget.value:
            print("⚠️ Please upload a file first.")
            return
            
        # --- File Handling ---
        uploaded_file_dict = upload_widget.value[0]
        temp_file_path = Path(f"temp_{uploaded_file_dict['name']}")
        with open(temp_file_path, "wb") as f:
            f.write(uploaded_file_dict['content'])
        print(f"✅ File saved locally: {temp_file_path}")

        base_name = f"{temp_file_path.stem}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        try:
            # --- Step 1: Run OCR (from Cell 3) ---
            print("🔹 Running PaddleOCR...")
            ocr_text = run_medical_ocr(str(temp_file_path))
            if not ocr_text:
                print("❌ OCR Failed. Cannot continue.")
                return
            print("✅ OCR Completed.")

            # --- Step 2: Run Extraction (from Cell 3) ---
            print("🔹 Extracting medical entities...")
            custom_data = extract_medical_data(ocr_text, ner_pipeline)
            print("✅ Custom data extracted.")

            # --- Step 3: Run FHIR Conversion (from Cell 4) ---
            print("🔹 Converting to FHIR Bundle...")
            fhir_bundle = convert_custom_to_fhir(custom_data)
            print("✅ FHIR conversion complete.")

            # --- Step 4: Save all files ---
            custom_json_path = f"{base_name}_custom.json"
            with open(custom_json_path, "w") as f:
                json.dump(custom_data, f, indent=2)
            print(f"📄 Custom JSON saved: {custom_json_path}")

            if fhir_bundle:
                fhir_json_path = f"{base_name}_fhir.json"
                with open(fhir_json_path, "w") as f:
                    f.write(fhir_bundle.json(indent=2))
                print(f"📄 FHIR Bundle saved: {fhir_json_path}")
            
            pdf_path = f"{base_name}_report.pdf"
            create_medical_report_pdf(custom_data, pdf_path)
            print(f"📄 PDF Report generated: {pdf_path}")

            # --- Step 5: Display download links ---
            print("\n--- Download Your Files ---")
            display(FileLink(custom_json_path))
            if fhir_bundle:
                display(FileLink(fhir_json_path))
            display(FileLink(pdf_path))
            
        except Exception as e:
            print(f"\n❌❌❌ An error occurred during processing: {e}")
            import traceback
            traceback.print_exc()
        finally:
            # Clean up the temporary file
            temp_file_path.unlink()
            
            # --- FIX IS HERE ---
            # Set value to an empty tuple () to clear the widget
            upload_widget.value = ()
            # --- END FIX ---
            
            upload_widget._counter = 0

# --- 4. Link Callback and Display UI ---
process_btn.on_click(process_uploaded_file)

ui = widgets.VBox([upload_widget, process_btn, output_area])
print("✅ Ready to process.")
display(ui)


--- CELL 5: Initializing Simplified UI Pipeline ---
✅ Ready to process.


VBox(children=(FileUpload(value=(), accept='.pdf,.png,.jpg,.jpeg', description='Upload Report'), Button(button…

In [10]:
%pip install -q reportlab pdf2image pillow ipywidgets python-dateutil 

Note: you may need to restart the kernel to use updated packages.


In [12]:
%pip install fpdf


Collecting fpdf
  Downloading fpdf-1.7.2.tar.gz (39 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: fpdf
[33m  DEPRECATION: Building 'fpdf' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to the source tree of 'fpdf'. Discussion can be found at https://github.com/pypa/pip/issues/6334[0m[33m
[0m  Building wheel for fpdf (setup.py) ... [?25ldone
[?25h  Created wheel for fpdf: filename=fpdf-1.7.2-py2.py3-none-any.whl size=40758 sha256=9c8284cd405ebb77505f861c3390368e6b7a436ed2003dd47b3f69975eb82bd6
  Stored in directory: /home/aditya73/.cache/pip/wheels/f9/95/ba/f418094659025eb9611f17cbcaf2334236bf39a0c3453ea455
Successfully built fpdf
Installing collected package

In [None]:
page_result = result[0]

# Extract recognized text
texts = page_result['rec_texts']
scores = page_result['rec_scores']

# Print text with confidence
for text, score in zip(texts, scores):
    print(f"{text}")

Output()

FileUpload(value=(), accept='.pdf,.png,.jpg,.jpeg,.webp', description='Upload')

Button(button_style='success', description='Process Uploaded File', style=ButtonStyle())