# Download packages

In [None]:
!apt-get update -y
!apt-get install -y curl tesseract-ocr libopenjp2-7 libtiff5 ghostscript

# Python dependencies
!pip install flask flask_cors pyngrok requests werkzeug typing
!pip install fitz ocrmypdf PyMuPDF
!pip install ocrmypdf PyMuPDF==1.24.10
!pip install sentence-transformers pymilvus[model] pymilvus[milvus_lite] nltk

# Lock image libraries
!pip install --no-deps --force-reinstall "Pillow==10.3.0" "img2pdf==0.4.4"


0% [Working]
            
Hit:1 https://cli.github.com/packages stable InRelease

0% [Connecting to archive.ubuntu.com (91.189.91.82)] [Connecting to security.ub
                                                                               
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://archive.ubuntu.com/ubuntu jammy-b

In [None]:
from sentence_transformers import SentenceTransformer, models

In [None]:
# Ollama startup

# Install Ollama
!command -v ollama >/dev/null 2>&1 || (curl -fsSL https://ollama.com/install.sh | sh)

# Start Ollama server in background
!ollama serve > /dev/null 2>&1 &

# Pull model 
!ollama pull phi4

# Verify server is alive
!curl http://localhost:11434/api/tags

# Medical Files Processing (Pre-processing)

## Configs

In [None]:
import os, re, io, json, shutil, time, fitz, ocrmypdf, yaml
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union

try:
    from multiprocessing import Pool, cpu_count
    _MP_AVAILABLE = True
except Exception:
    _MP_AVAILABLE = False


# Embedded Config (from original YAML)
CONFIG = yaml.safe_load("""
lab_results_config:
  test_detection:
    name_stopwords: ["Final", "Report Link"]
  cleanup_words: ["Final", "Updated"]

medical_records_config:
  pdf_cleaning:
    hospital_patterns:
      - "National Cancer Centre"
      - "Singapore General Hospital"
      - ".*Hospital.*"
    footer_patterns:
      - "Requested by:"
      - "computer generated"
      - "printed from"
      - "page"
  section_headers:
    ignored: ["NUR", "PHA", "CTR", "DISTRESS SCREENING NOTE"]
    subsections:
      - "History, Examination and Investigations"
      - "Cancer Risk"
      - "Investigations"
      - "IMPRESSION"
      - "Clinical and Treatment Summary"
      - "DIAGNOSIS SUMMARY"
      - "MANAGEMENT FOR THIS VISIT"
      - "PATIENT STATUS"
  normalization:
    month_map:
      Jan: "01"
      Feb: "02"
      Mar: "03"
      Apr: "04"
      May: "05"
      Jun: "06"
      Jul: "07"
      Aug: "08"
      Sep: "09"
      Oct: "10"
      Nov: "11"
      Dec: "12"
    abbreviation_map:
      ANC: "Absolute Neutrophil Count"
      URTI: "Upper Respiratory Tract Infection"
      CRP: "C-Reactive Protein"
      FBC: "Full Blood Count"
      Hb: "Haemoglobin"
      Plt: "Platelets"
      Het: "Haematocrit"
      PR: "progesterone receptor"
      ER: "estrogen receptor"
      NAD: "no abnormality detected"
      O/E: "On examination:"
      Neoadjuvant: "pre-operative"
      Pt: "Patient"
      s/b: "seen by"
      c/o: "complaints of"
""")


## Lab Results Parser

In [None]:
class LabResultParser:
    _REGEX_HEADER_PATTERNS = [
        r"^\s*(?:National Cancer Centre|Patient Results|Singapore General Hospital|.*Hospital.*)\s*$",
        r"^\s*All results performed dates from.*$",
        r"^\s*Requested By:.*?\d{2}/\d{2}/\d{4}\s+\d{2}:\d{2}\s*$",
        r"^\s*Current Location:.*$",
    ]
    _REGEX_FOOTER_PATTERNS = [
        r"^\s*this is a computer generated report.*$",
        r"^\s*printed from:.*$",
        r"^\s*page:\s*\d+\s*$",
        r"^\s*requested by:.*page\s*\d+\s*of\s*\d+.*$",
        r"End of Report\s*$",
    ]
    _REGEX_DATETIME_PATTERN = r"(\d{1,2}-[A-Za-z]{3}-\d{4}\s+\d{2}:\d{2})\s*"
    _REGEX_DATE_PATTERN = r"(\d{1,2}-[A-Za-z]{3}-\d{4})"

    def __init__(self):
        self.config = CONFIG["lab_results_config"]
        self._compiled_header_patterns = [re.compile(p, re.IGNORECASE | re.MULTILINE) for p in self._REGEX_HEADER_PATTERNS]
        self._compiled_footer_patterns = [re.compile(p, re.IGNORECASE | re.MULTILINE) for p in self._REGEX_FOOTER_PATTERNS]

    def extract_text_no_header_footer(self, pdf_path: Union[str, Path]) -> str:
        doc = fitz.open(pdf_path)
        pages = []
        for page in doc:
            text = page.get_text("text")
            for pat in self._compiled_header_patterns: text = pat.sub("", text)
            for pat in self._compiled_footer_patterns: text = pat.sub("", text)
            text = re.sub(r"[^\x00-\x7F\n\r]+", "", text)
            text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text)
            text = re.sub(r" {2,}", " ", text)
            pages.append(text.strip())
        return "\n\n".join(pages)

    def clean_test_name(self, raw_header: str) -> str:
        stopwords = "|".join(self.config["test_detection"]["name_stopwords"])
        m = re.match(rf"(.*?)(?:\s+(?:{stopwords})|$)", raw_header, re.IGNORECASE)
        name = m.group(1).strip() if m and m.group(1) else raw_header
        return re.sub(r"\s+", " ", name).strip()

    def parse_all_tests(self, text: str) -> List[Dict[str, str]]:
        datetime_pattern = re.compile(self._REGEX_DATETIME_PATTERN, re.DOTALL)
        date_pattern = re.compile(self._REGEX_DATE_PATTERN)
        parts = datetime_pattern.split(text)
        tests = []
        for i in range(1, len(parts), 2):
            stamp = parts[i].strip()
            body = parts[i + 1].strip() if i + 1 < len(parts) else ""
            date = date_pattern.match(stamp).group(1) if date_pattern.match(stamp) else "UNKNOWN"
            test_header = body.split("\n", 1)[0].strip()
            test_name = self.clean_test_name(test_header)
            if test_name and body:
                tests.append({"date": date, "test_name": test_name, "raw_details": body})

        aggregated = []
        current = tests[0] if tests else None
        def key(d,n): return d + "-" + re.sub(r"[,\s\.]", "", n).lower()
        for nxt in tests[1:]:
            if current and key(current["date"], current["test_name"]) == key(nxt["date"], nxt["test_name"]):
                current["raw_details"] += "\n\n" + nxt["raw_details"]
            else:
                if current: aggregated.append(current)
                current = nxt
        if current: aggregated.append(current)
        return aggregated

    def normalize_test_details(self, text: str) -> str:
        cleanup_words = "|".join(self.config["cleanup_words"])
        text = re.sub(rf"({cleanup_words})\s*", " ", text, flags=re.IGNORECASE)
        text = re.sub(r"\s*\n\s*", "\n", text)
        return re.sub(r" {2,}", " ", text).strip()

    def build_timeline(self, pdf_path: Union[str, Path]) -> Dict[str, List[Dict]]:
        raw = self.extract_text_no_header_footer(pdf_path)
        tests = self.parse_all_tests(raw)
        grouped = defaultdict(dict)
        for t in tests:
            grouped[t["date"]][t["test_name"]] = self.normalize_test_details(t["raw_details"])
        return {d: [{"lab results": v}] for d, v in grouped.items()}


## Medical Records Parser

In [None]:
class MedicalRecordsParser:
    _REGEX_DMO_HEADER = (
        r".{0,10}?DMO\s*(Consult|Correspondence|Pre[- ]?clerk\s*Consult|"
        r"Inpatient\s*(?:Admission\s*Note|Daily\s*Ward\s*Round(?:\s*V\d+)?)|"
        r"Correspondence\s*Note).{0,50}?\[Charted\s*Location:"
    )
    _REGEX_AUTHORED_DATE = r"Authored:\s*(\d{1,2}-[A-Za-z]{3}-\d{4})"
    _REGEX_LAST_UPDATED_DOCTOR = r"Last\s*Updated:.*?by\s+([A-Za-z\s\-]+)\s*\(Doctor\)"
    _REGEX_JUNK_LINE = r"([^\w\s])\1{8,}"
    _REGEX_DATE_NORMALIZE = r"(\d{1,2})-([A-Za-z]{3})-(\d{4})"
    _REGEX_NON_ASCII = r"[^\x00-\x7F]+"
    _REGEX_ALLERGIES = r"Allergies[: ]+(.*)"

    def __init__(self):
        self.config = CONFIG["medical_records_config"]
        self.DMO_HEADER_REGEX = re.compile(self._REGEX_DMO_HEADER, re.IGNORECASE | re.DOTALL)
        self._authored_date_re = re.compile(self._REGEX_AUTHORED_DATE, re.IGNORECASE)
        self._last_updated_re = re.compile(self._REGEX_LAST_UPDATED_DOCTOR, re.IGNORECASE)
        self._junk_line_re = re.compile(self._REGEX_JUNK_LINE)
        self._date_norm = re.compile(self._REGEX_DATE_NORMALIZE)
        self._non_ascii = re.compile(self._REGEX_NON_ASCII)
        self._allergies_re = re.compile(self._REGEX_ALLERGIES, re.IGNORECASE)
        self.hospitals = self.config["pdf_cleaning"]["hospital_patterns"]
        self.footers = self.config["pdf_cleaning"]["footer_patterns"]
        self.subsections = self.config["section_headers"]["subsections"]
        self.month_map = self.config["normalization"]["month_map"]
        self.abbr_map = self.config["normalization"]["abbreviation_map"]

    def extract_text_no_header_footer(self, pdf_path: Union[str, Path]) -> str:
        doc = fitz.open(pdf_path)
        pages = []
        for page in doc:
            text = page.get_text("text").replace("#—! ", "")
            lines = []
            for line in text.splitlines():
                l = line.lower()
                if any(h.lower() in l for h in self.hospitals): continue
                if any(f.lower() in l for f in self.footers): continue
                lines.append(line)
            pages.append("\n".join(lines).strip())
        return "\n\n".join(pages)

    def match_dmo_section_header(self, line:str):
        return self.DMO_HEADER_REGEX.search(re.sub(r"^[^\w]*","",line))

    def extract_dmo_sections(self, text:str)->List[str]:
        lines = text.splitlines()
        sections, current = [], []
        inside = False
        for line in lines:
            s = line.strip()
            if len(s)<10: continue
            if self._junk_line_re.fullmatch(s): continue
            if self.match_dmo_section_header(s):
                inside=True
                if current: sections.append("\n".join(current))
                current=[s]; continue
            if inside:
                current.append(s)
                if s.upper().startswith("LAST UPDATED:"):
                    inside=False
                    sections.append("\n".join(current)); current=[]
        if current: sections.append("\n".join(current))
        return sections

    def split_into_subsections(self,text:str)->Dict[str,str]:
        headers=self.subsections
        norm={h.upper():h for h in headers}
        pattern="("+"|".join([re.escape(h)+":?" for h in headers])+")"
        parts=re.split(pattern,text,flags=re.IGNORECASE)
        subs,buf={},[]; cur="General"
        for p in parts:
            cand=p.strip().rstrip(":")
            if cand.upper() in norm:
                if buf: subs[cur]=" ".join(buf).strip(); buf=[]
                cur=norm[cand.upper()]
            else: buf.append(p)
        if buf: subs[cur]=" ".join(buf).strip()
        return {k:v for k,v in subs.items() if v.strip()}

    def normalize_formatting(self,text:str)->str:
        text=self._date_norm.sub(lambda m:f"{m.group(3)}-{self.month_map.get(m.group(2),'01')}-{int(m.group(1)):02d}",text)
        for abbr,full in self.abbr_map.items():
            text=re.sub(rf"\b{re.escape(abbr)}\b",full,text)
        text=text.replace("\n"," ")
        return self._non_ascii.sub("",text)

    def parse_dmo_metadata(self,sec:str)->Tuple[str,str,str]:
        d=self._authored_date_re.search(sec)
        doc=self._last_updated_re.search(sec)
        head=self.match_dmo_section_header(sec.splitlines()[0])
        return (d.group(1) if d else "UNKNOWN", doc.group(1).strip() if doc else "UNKNOWN", head.group(1) if head else "UNKNOWN")

    def enrich_dmo_entry(self,sec:Dict[str,Union[str,Dict]])->Dict:
        subs=self.split_into_subsections(sec["text"])
        allergies="NKA" if "no known allergies" in sec["text"].lower() else self._allergies_re.search(sec["text"]).group(1).strip() if self._allergies_re.search(sec["text"]) else None
        return {**sec,"subsections":list(subs.keys()),"allergies":allergies,"text":subs}

    def build_timeline(self,pdf_path:Union[str,Path])->Dict[str,List[Dict]]:
        raw=self.extract_text_no_header_footer(pdf_path)
        secs=self.extract_dmo_sections(raw)
        timeline=defaultdict(list)
        for s in secs:
            date,doctor,stype=self.parse_dmo_metadata(s)
            clean=self.normalize_formatting(s)
            entry={"doctor":doctor,"section_type":stype,"text":clean}
            enriched=self.enrich_dmo_entry(entry)
            timeline[date].append(enriched)
        return dict(timeline)


## Utilities

In [None]:
# OCR + Classification Utilities
def _is_pdf_searchable(p:Union[str,Path])->bool:
    try:
        d=fitz.open(p)
        t="".join(pg.get_text("text") for pg in d).strip()
        d.close()
        return len(t)>100
    except Exception as e:
        print(f"Error checking searchability for {Path(p).name}: {e}")
        return False

def _convert_scanned_pdf(fp:Union[str,Path],out:Union[str,Path])->None:
    print(f"Starting OCR for: {Path(fp).name}...")
    ocrmypdf.ocr(fp,out,skip_text=True,tesseract_pagesegmode=6,tesseract_oem=3,optimize=1,progress_bar=False)
    print(f"✅ OCR complete: {Path(out).name}")

def _process_ocr_task(t:Tuple[Path,Path])->Tuple[str,Path,bool]:
    src,out=t; name=src.name
    if not src.exists(): return (name,src,False)
    tgt=out/name
    if _is_pdf_searchable(src):
        shutil.copy2(src,tgt); return (name,tgt,True)
    else:
        tgt=out/f"OCR_{name}"
        try: _convert_scanned_pdf(src,tgt); return (name,tgt,True)
        except Exception as e: print(f"ERROR: {e}"); return (name,src,False)

def _classify_file_type(p:Path)->str:
    try:
        d=fitz.open(p)
        txt=d[0].get_text("text").lower()
        sec=txt.splitlines()[1] if len(txt.splitlines())>1 else ""
        return "Lab Results" if "patient results" in sec else "Medical Records"
    except Exception as e:
        print(f"Error classifying {p.name}: {e}")
        return "Unknown"

def _process_single_file(fd:Tuple[str,Path,str])->Dict[str,Any]:
    name,path,ftype=fd
    try:
        parser=LabResultParser() if ftype=="Lab Results" else MedicalRecordsParser() if ftype=="Medical Records" else None
        data=parser.build_timeline(path) if parser else {"error":"Unknown file type"}
        return {"original_filename":name,"file_type":ftype,"structured_data":data}
    except Exception as e:
        return {"original_filename":name,"file_type":ftype,"structured_data":{"error":str(e)}}


## Orchestrator

In [None]:
class PDFUploadProcessor:
    def __init__(self,upload_dir:str):
        self.input_dir=Path(upload_dir)
        self.output_dir=self.input_dir/"processed_pdfs"
        self.uploaded=list(self.input_dir.glob("*.pdf"))
        self.results=[]
        if not self.input_dir.is_dir(): raise ValueError(f"{upload_dir} not a valid directory")
        if not self.uploaded: print("No PDF files found in input directory.")

    def convert_files_to_searchable_pdfs(self,multi=False):
        self.output_dir.mkdir(parents=True,exist_ok=True)
        tasks=[(f,self.output_dir) for f in self.uploaded]
        print(f"\n--- OCR Conversion (Parallel={multi}) ---")
        if multi and _MP_AVAILABLE:
            with Pool(cpu_count()) as pool: pool.map(_process_ocr_task,tasks)
        else:
            for t in tasks: _process_ocr_task(t)
        print("OCR Conversion Complete")

    def extract_and_parse_documents(self,multi=False):
        files=list(self.output_dir.glob("*.pdf"))
        if not files: print("No files found for parsing."); return []
        print(f"\n--- Content Extraction & Parsing (Parallel={multi}) ---")
        if multi and _MP_AVAILABLE:
            with Pool(cpu_count()) as pool: types=pool.map(_classify_file_type,files)
        else: types=[_classify_file_type(f) for f in files]
        tasks=[(f.name,f,t) for f,t in zip(files,types)]
        if multi and _MP_AVAILABLE:
            with Pool(cpu_count()) as pool: res=pool.map(_process_single_file,tasks)
        else: res=[_process_single_file(t) for t in tasks]
        self.results=res
        print("Parsing Complete")
        return res

    def create_combined_patient_timeline(self)->Dict[str,List[Dict[str,Any]]]:
        if not self.results: raise ValueError("Run extract_and_parse_documents() first.")
        print("\n--- Creating Unified Patient Timeline ---")
        unified=defaultdict(list)
        for r in self.results:
            t=r["file_type"]; data=r["structured_data"]; name=r["original_filename"]
            if "error" in data: continue
            for date,records in data.items():
                if not isinstance(records,list): continue
                for rec in records:
                    event={"record_type":t,"source_file":name}
                    event.update(rec if t=="Medical Records" else {"tests":[rec]})
                    unified[date].append(event)
        final=dict(unified)
        if not final: raise ValueError("No extracted information.")
        out=self.output_dir/"combined_patient_timeline.json"
        with open(out,"w",encoding="utf-8") as f: json.dump(final,f,ensure_ascii=False,indent=4)
        print(f"Timeline saved to {out}")
        return final


# ============================================
# ▶️ Notebook Convenience Runner
# ============================================
def process_uploads(upload_dir:str,run_ocr=True,multi=True)->Dict[str,Any]:
    t0=time.time()
    p=PDFUploadProcessor(upload_dir)
    if run_ocr: p.convert_files_to_searchable_pdfs(multi)
    res=p.extract_and_parse_documents(multi)
    timeline=p.create_combined_patient_timeline()
    print(f"\n Total processing time: {time.time()-t0:.2f}s")
    return {"results":res,"timeline":timeline}


# RAG

In [None]:
# Import required libraries
import json
import re
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
from dataclasses import dataclass
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Check and download required NLTK data
def ensure_nltk_data():
    """Download NLTK tokenizer data with fallback for different versions"""
    try:
        nltk.data.find('tokenizers/punkt_tab')
        return True
    except LookupError:
        try:
            # Try downloading punkt_tab for newer NLTK versions
            nltk.download('punkt_tab')
            return True
        except:
            # Fallback to older punkt tokenizer
            try:
                nltk.data.find('tokenizers/punkt')
                return True
            except LookupError:
                nltk.download('punkt')
                return True

In [None]:
# Data structures for chunking
@dataclass
class TextChunk:
    """Data class to represent a text chunk with metadata"""
    text: str
    word_count: int
    metadata: Dict = None

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}
        if self.word_count == 0:
            self.word_count = len(self.text.split())

def clean_text(text: str) -> str:
        """Clean and preprocess text"""
        # Replace all newlines with spaces first
        text = text.replace('\n', ' ')
        # Remove extra whitespace and newlines '\n'
        text = re.sub(r'\s+', ' ', text)
        return text.strip()

class TextChunker:
    """
    Chunker that chunks by section or by fixed size with overlap
    """

    def __init__(self, chunk_size: int = 256, overlap: int = 8):
        """
        Initialize the TextChunker

        Args:
            chunk_size: Maximum size of each chunk (in tokens)
            overlap: Number of tokens to overlap between chunks
        """
        self.chunk_size = chunk_size
        self.overlap = overlap
        self.chunks: List[TextChunk] = []

    def chunk_by_fixed_size(self, text: str) -> List[TextChunk]:
        """
        Chunk text by fixed token size with overlap

        Args:
            text: Input text to chunk

        Returns:
            List of TextChunk objects
        """
        chunks = []
        cleaned_text = clean_text(text)
        tokens = word_tokenize(cleaned_text)

        start = 0

        while start < len(tokens):
            # Calculate end position
            end = min(start + self.chunk_size, len(tokens))
            # Extract chunk tokens
            chunk_tokens = tokens[start:end]
            # Convert tokens back to text
            chunk_text = ' '.join(chunk_tokens)
            # Create chunk object
            chunk = TextChunk(
                text=chunk_text,
                word_count=len(chunk_tokens),
            )
            chunks.append(chunk)
            # Move start position with overlap
            start = end - self.overlap
            # Break if we've reached the end
            if end >= len(tokens):
                break

        return chunks

In [None]:
def process_all_medical_records(medical_data: dict, max_tokens: int = 256, overlap: int = 8) -> List[TextChunk]:
    """
    Process medical records from a dictionary structure and create text chunks.

    Args:
        medical_data (dict): Dictionary containing medical records data
        max_tokens (int): Maximum tokens per chunk
        overlap (int): Number of overlapping tokens between chunks

    Returns:
        List[TextChunk]: List of processed text chunks with metadata
    """
    all_chunks = []
    chunker = TextChunker(chunk_size=max_tokens, overlap=overlap)

    # print("Processing medical records with metadata...")

    # Process each date section
    for date, records in medical_data.items():
        print(f"Processing date: {date}")

        # Process each record within the date
        for record_idx, record in enumerate(records):
            if not isinstance(record, dict):
                continue

            record_type = record.get('record_type', '')

            text_content = {}

            if record_type == 'Lab Results':
              doctor = ''
              section_type = 'Lab Results'
              subsections = []
              allergies = ''
              tests = record.get('tests', [])
              for test in tests:
                  test_results = test.get('lab results', {})
                  for key, value in test_results.items():
                      text_content[key] = value

            else: # record_type == 'Medical Records'
            # Extract metadata
              doctor = record.get('doctor', '')
              section_type = record.get('section_type', '')
              subsections = record.get('subsections', [])
              allergies = record.get('allergies') or ''
              text_content = record.get('text', {})

            if isinstance(text_content, dict):
                for category, category_text in text_content.items():
                    if not category_text or not category_text.strip():
                        continue

                    # Clean the text
                    cleaned_text = clean_text(category_text)

                    # Calculate token count
                    tokens = word_tokenize(cleaned_text)
                    tokens_len = len(tokens)

                    # Create base metadata for this text chunk
                    base_metadata = {
                        "date": date,
                        "doctor": doctor,
                        "section_type": section_type,
                        "text_category": category,
                        "subsections": subsections,
                        "allergies": allergies,
                        "record_index": record_idx
                    }

                    if tokens_len <= max_tokens:
                        # Single chunk for this category
                        chunk = TextChunk(
                            text=cleaned_text,
                            word_count=tokens_len,
                            metadata={**base_metadata, "chunk": 1, "total_chunks": 1}
                        )
                        all_chunks.append(chunk)
                        # print(f"  {category}: Single chunk ({tokens_len} tokens)")
                    else:
                        # Multiple chunks needed for this category
                        category_chunks = chunker.chunk_by_fixed_size(cleaned_text)

                        # Update metadata for each chunk
                        for chunk_idx, chunk in enumerate(category_chunks, 1):
                            chunk.metadata = {
                                **base_metadata,
                                "chunk": chunk_idx,
                                "total_chunks": len(category_chunks)
                            }

                        all_chunks.extend(category_chunks)
                        # print(f"  {category}: {len(category_chunks)} chunks ({tokens_len} tokens total)")

    print(f"Completed. Total chunks: {len(all_chunks)}")
    return all_chunks

In [None]:
def prepare_chunks_for_embedding(chunks: List[TextChunk]) -> List[Dict]:
    prepared_chunks = []

    for i, chunk in enumerate(chunks):
        # Generate unique ID using index, date, category, and chunk number
        chunk_id = i + 1
        if chunk.metadata and 'date' in chunk.metadata:
            # Include text_category to avoid duplicates from same date/chunk
            category = chunk.metadata.get('text_category', 'unknown')
            chunk_num = chunk.metadata.get('chunk', 1)
            chunk_id = f"{chunk.metadata['date']}_{category}_chunk_{chunk_num}_{i}"
        prepared_chunk = {
            'id': chunk_id,
            'text': chunk.metadata['date'] + ", " + chunk.metadata['text_category'] + ": " + chunk.text,
            'metadata': {
                **chunk.metadata
            }
        }
        prepared_chunks.append(prepared_chunk)

    return prepared_chunks

In [None]:
# Embedding Generation using Bio_ClinicalBERT
# from sentence_transformers import SentenceTransformer, models
import torch
import time

def build_bioclinical_sentence_model(max_seq_len: int = 384):
    word_emb = models.Transformer("emilyalsentzer/Bio_ClinicalBERT", max_seq_length=max_seq_len)
    pooling = models.Pooling(
        word_emb.get_word_embedding_dimension(),
        pooling_mode_mean_tokens=True,
        pooling_mode_cls_token=False,
        pooling_mode_max_tokens=False,
    )
    return SentenceTransformer(modules=[word_emb, pooling])

def generate_embeddings(prepared_chunks: List[Dict], model_name: str = "emilyalsentzer/Bio_ClinicalBERT") -> List[Dict]:
    print(f"Loading model: {model_name}")
    model = build_bioclinical_sentence_model()

    # Check if GPU is available
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # model = model.to(device)
    # print(f"Device: {device}")

    # Extract texts for embedding
    texts = [chunk['text'] for chunk in prepared_chunks]

    print(f"Processing {len(texts)} text chunks...")
    # start_time = time.time()

    # Generate embeddings for all texts
    embeddings = model.encode(
        texts,
        convert_to_tensor=True,
        show_progress_bar=False
    )

    # Convert to CPU and numpy for storage
    embeddings = embeddings.cpu().numpy()

    # end_time = time.time()
    # print(f"Embedding generation completed in {end_time - start_time:.1f}s")
    # print(f"Vector dimension: {len(embeddings[0])}")

    # Add embeddings to chunks
    embedded_chunks = []
    for chunk, embedding in zip(prepared_chunks, embeddings):
        embedded_chunk = {
            **chunk,
            'embedding': embedding.tolist(),  # Convert numpy array to list for JSON serialization
            'embedding_model': "emilyalsentzer/Bio_ClinicalBERT",
            'embedding_dimension': len(embedding)
        }
        embedded_chunks.append(embedded_chunk)

    return embedded_chunks

In [None]:
# Vector Database Storage using Milvus Lite
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import uuid

class MilvusVectorStore:

    def __init__(self, collection_name: str = "medical_rag_embeddings", db_file: str = "./milvus_lite.db"):
        self.collection_name = collection_name
        self.db_file = db_file
        self.collection = None

    def connect(self):
        try:
            connections.connect("default", uri=self.db_file)
            print(f"Connected to Milvus Lite at {self.db_file}")
            return True
        except Exception as e:
            print(f"Connection failed: {e}")
            return False

    def create_collection(self, embedding_dim: int = 768):
        # Drop existing collection if it exists
        if utility.has_collection(self.collection_name):
            utility.drop_collection(self.collection_name)
            print(f"Removed existing collection: {self.collection_name}")

        # Define schema
        fields = [
            FieldSchema(name="id", dtype=DataType.VARCHAR, max_length=200, is_primary=True),
            FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=10000),
            FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=embedding_dim),
            FieldSchema(name="date", dtype=DataType.VARCHAR, max_length=50),
            FieldSchema(name="chunk_number", dtype=DataType.INT64),
            FieldSchema(name="word_count", dtype=DataType.INT64),
        ]

        schema = CollectionSchema(fields, f"Medical RAG embeddings collection with {embedding_dim}D vectors")

        # Create collection
        self.collection = Collection(self.collection_name, schema)
        # print(f"Collection created: {self.collection_name}")

        # Create index for vector search
        index_params = {
            "index_type": "FLAT",
            "metric_type": "COSINE",
            "params": {}
        }

        self.collection.create_index("embedding", index_params)
        # print("Vector index ready")

    def insert_embeddings(self, embedded_chunks: List[Dict]):
        """
        Insert embedded chunks into Milvus collection

        Args:
            embedded_chunks: List of chunks with embeddings
        """
        if not self.collection:
            print("Collection not initialized. Call create_collection() first.")
            return

        # Prepare data for insertion
        ids = []
        texts = []
        embeddings = []
        dates = []
        chunk_numbers = []
        word_counts = []
        i = 0
        for chunk in embedded_chunks:
            # Generate unique ID if not present
            chunk_id = chunk.get('id', str(uuid.uuid4()))

            ids.append(str(chunk_id))
            texts.append(chunk['text'][:9999])  # Truncate if too long
            embeddings.append(chunk['embedding'])
            dates.append(chunk['metadata'].get('date', 'unknown'))
            chunk_numbers.append(chunk['metadata'].get('chunk', 1))
            word_counts.append(chunk['metadata'].get('word_count', 0))

        # Insert data
        data = [ids, texts, embeddings, dates, chunk_numbers, word_counts]

        try:
            insert_result = self.collection.insert(data)
            self.collection.flush()
            print(f"Data inserted ({len(embedded_chunks)} chunks)")
            # print(f"Sample IDs: {insert_result.primary_keys[:3]}..." if len(insert_result.primary_keys) > 3 else f"IDs: {insert_result.primary_keys}")

            self.load_collection()
            # print(f"Loaded into memory for search")
            return insert_result
        except Exception as e:
            print(f"Insert failed: {e}")
            return None

    def load_collection(self):
        if self.collection:
            self.collection.load()
            # print("Collection loaded into memory")

    def search_similar(self, query_embedding: List[float], top_k: int = 8, date_filter: str = None):
        if not self.collection:
            print("Collection not initialized")
            return []

        search_params = {"metric_type": "COSINE", "params": {}}

        # Optional date filtering
        expr = None
        if date_filter:
            expr = f'date == "{date_filter}"'
        results = self.collection.search(
            [query_embedding],
            "embedding",
            search_params,
            limit=top_k,
            expr=expr,
            output_fields=["text", "date", "chunk_number", "word_count"]
        )
        return results

    def get_collection_stats(self):
        if self.collection:
            self.collection.flush()
            stats = self.collection.num_entities
            print(f"Collection '{self.collection_name}' contains {stats} vectors")
            return stats
        return 0

In [None]:
# RAG Pipeline - Retrieval System for Medical Form Fields
# from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict
import json

class MedicalRAGRetriever:

    def __init__(self, vector_store, embedding_model_name: str = "emilyalsentzer/Bio_ClinicalBERT"):

        self.vector_store = vector_store
        self.embedding_model = build_bioclinical_sentence_model()

        # Move to GPU if available
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.embedding_model = self.embedding_model.to(device)

        print(f"RAG Retriever initialized ({embedding_model_name})")
        # print(f"Device: {device}")

    def generate_query_embedding(self, query: str) -> List[float]:
        embedding = self.embedding_model.encode(query, convert_to_tensor=True)
        return embedding.cpu().numpy().tolist()

    def retrieve_for_queries(self, queries: List[str], top_k: int) -> Dict:
        all_chunks = []

        for i, query in enumerate(queries):
            print(f"Processing query {i+1}/{len(queries)}: {query[:50]}...")

            # Generate embedding for the query
            query_embedding = self.generate_query_embedding(query)

            # Simple search for top 2 results
            results = self.vector_store.search_similar(query_embedding, top_k=top_k)
            chunks = self._process_search_results(results)

            if chunks:
                all_chunks.extend(chunks)

        # Remove duplicates based on chunk_id
        seen_ids = set()
        unique_chunks = []
        for chunk in all_chunks:
            if chunk['chunk_id'] not in seen_ids:
                unique_chunks.append(chunk)
                seen_ids.add(chunk['chunk_id'])

        # Aggregate text from all unique chunks
        aggregated_text = "\n\n".join([chunk['text'] for chunk in unique_chunks])

        return {
            'queries': queries,
            'retrieved_chunks': unique_chunks,
            'aggregated_text': aggregated_text,
            'chunk_count': len(unique_chunks)
        }

    def _process_search_results(self, results) -> List[Dict]:
        chunks = []
        for hits in results:
            for hit in hits:
                chunks.append({
                    'text': hit.entity.get('text', ''),
                    'score': float(hit.score),
                    'date': hit.entity.get('date', 'unknown'),
                    'chunk_number': hit.entity.get('chunk_number', 0),
                    'word_count': hit.entity.get('word_count', 0),
                    'chunk_id': hit.id
                })
        return chunks

In [None]:
def retrieve_rag(timeline, field_sets, top_k=2, chunk_size=256, overlap=8):

    ensure_nltk_data()

    # Process using the timeline variable
    all_processed_chunks = process_all_medical_records(timeline, chunk_size, overlap)

    # Prepare chunks for the next stage of RAG pipeline (embedding generation)
    prepared_for_embedding = prepare_chunks_for_embedding(all_processed_chunks)

    # Generate embeddings for all prepared chunks
    embedded_chunks = generate_embeddings(prepared_for_embedding)

    # Show sample embedded chunk structure (without the full embedding vector)
    sample_chunk = embedded_chunks[0].copy()
    sample_chunk['embedding'] = f"[{len(sample_chunk['embedding'])}-dim vector]"

    # Initialize and setup Milvus Lite vector database
    vector_store = MilvusVectorStore()

    # Connect to Milvus Lite
    if vector_store.connect():
        # Create collection with appropriate embedding dimension
        embedding_dim = embedded_chunks[0]['embedding_dimension'] if embedded_chunks else 768
        vector_store.create_collection(embedding_dim)

        # Insert all embeddings
        insert_result = vector_store.insert_embeddings(embedded_chunks)

        if insert_result:
            # Load collection for search
            vector_store.load_collection()

            # Get statistics
            vector_store.get_collection_stats()
        else:
            print("Failed to insert embeddings")
    else:
        print("Could not connect to Milvus")

    # Initialize the retriever
    if 'vector_store' in locals() and hasattr(vector_store, 'collection') and vector_store.collection:
        retriever = MedicalRAGRetriever(vector_store)

        # Store retrieval results for all field sets
        all_retrieval_results = {}

        # Process each field set
        for field_num, field_queries in field_sets.items():
            retrieval_result = retriever.retrieve_for_queries(field_queries, top_k)
            all_retrieval_results[field_num] = retrieval_result

        return all_retrieval_results

    else:
        print("Vector store not available. Run the database setup cell first.")

### NTUC RAG Prompt

In [None]:
NTUC_FIELD_JSON_1 = ["Over what period do your records extend? Start date (dd/mm/yyyy):" ,
                "Over what period do your records extend? End date (dd/mm/yyyy):",
                "When did the Insured first consult you for this condition? (dd/mm/yyyy)",
                "When you first saw the Insured, what were the symptoms presented and their duration?, symptoms presented, Duration of symptom, Date symptoms first occurred (dd/mm/yyyy)"]
NTUC_FIELD_JSON_2 = [
    "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you?",
    "Details of other doctors consulted (rows), Name of doctor, Name and address of clinic / hospital, Date(s) of consultation (dd/mm/yyyy), Diagnosis made",
    "Histological diagnosis of disease, Date of diagnosis (dd/mm/yyyy), Doctor and clinic / hospital where diagnosis was first made, Date Insured was first informed of diagnosis (dd/mm/yyyy)",
    "Was a biopsy of the tumour performed?, Biopsy date (dd/mm/yyyy), If No, how was the diagnosis confirmed?",
    "Site or organ involved",
    "Staging of tumour, Has the cancer spread beyond the layer of cells?, Was the disease completely localised?, Was there invasion of adjacent tissues?, Were regional lymph nodes involved?, Were there distant metastases?, Metastases details",
    "Is the condition carcinoma-in-situ?",
    "Is the condition Pre-malignant / non-invasive",
    "Is the condition Borderline / suspicious malignancy",
    "Is the condition Cervical dysplasia CIN1-3 (without CIS)"
]
NTUC_FIELD_JSON_3 = [
"Is the condition Carcinoma-in-situ of biliary system",
"Is the conditionHyperkeratoses, basal/squamous skin cancers",
"Is the condition Bladder cancer T1N0M0 or below, Bladder papillary micro-carcinoma",
"Is the condition Prostate cancer T1N0M0, T1, or a equivalent or lesser classification?, T1 subclass",
"Is the condition Thyriod cancer T1N0M0 or below?, Thyriod diameter, Is Thyroid papillary micro-carcinoma?, Thyroid papillary micro-carcinoma size",
"If the diagnosis is Leukaemia, state type, Leukaemia RAI staging",
"If the diagnosis is Melanoma, size/thickness (Breslow mm), Melanoma Clark level, Has the condition caused invasion beyond the epidermis?"
"If the condition is GIST TNM, classification, GIST mitotic count (HPF)",
"Has the patient received treatment for this illness?, Treatment type, Date of treatment (dd/mm/yyyy), Duration of treatment, Has active treatment and therapy been rejected in favour of symptoms relief, Active treatment rejection reason",
"Was radical surgery done?, Radical surgery code/table, Radical surgery date (dd/mm/yyyy)"
]
NTUC_FIELD_JSON_4 = [
"Is the Insured still on follow-up at your clinic? (follow-up/discharge), Next appointment date (dd/mm/yyyy), Discharge date (dd/mm/yyyy)",
"Is the Insured terminally ill (i.e. death expected within 12 months)?, Terminal illness evaluation, Terminal illness assessment date (dd/mm/yyyy)",
"Is the Insured referred to hospice care? Hospice name, Hospice inpatient admission date (dd/mm/yyyy), Hospice daycare start date (dd/mm/yyyy), Hospice care type",
"Details of Doctors and clinics / hospitals consulted for this condition, Name of doctorName and Address of Clinic/Hospital, Date(s) of consultation (dd/mm/yyyy), Diagnosis made",
"Has insured ever had Malignant, pre-malignant or other related conditions or risk factors?, Malignant, pre-malignant or other related conditions or risk factors details",
"Details of Medical history that would have increased the risk of cancer",
"Details of Family history that would have increased the risk of Cancer",
"Details of habits related to Smoking habits",
"Details of habits related to Alcohol consumption habits",
"Is the tumour or cancer in any way caused directly or indirectly by alcohol or drug abuse?, Alcohol/drug abuse details"
]
NTUC_FIELD_JSON_5 = [
"Tumour caused by HIV or AIDS?, HIV antibody status, HIV/AIDS diagnosis date (dd/mm/yyyy)",
"Any other significant health conditions",
"Details of other health conditions (rows), Diagnosis, Name of doctor, Name/address of clinic/hospital, Date of diagnosis (dd/mm/yyyy), Duration of condition, Treatment received"
]

ntuc_queries = {
1: NTUC_FIELD_JSON_1,
2: NTUC_FIELD_JSON_2,
3: NTUC_FIELD_JSON_3,
4: NTUC_FIELD_JSON_4,
5: NTUC_FIELD_JSON_5
}


### GE RAG PROMPT

In [None]:
GE_FIELD_JSON_1 = [
    "Date when insured first consulted you for cancer (ddmmyyyy)",
    "Symptoms presented and the date they first appeared?",
    "Source of above information",
    "What is the source of the above information? If Referring Doctor / Others, specify name & address",
    "Date when Cancer was FIRST diagnosed (ddmmyyyy)",
    "Diagnosis was first made by (name of Doctor)"
]

GE_FIELD_JSON_2 = [
    "Actual diagnosis",
    "Date when insured first became aware of this illness (ddmmyyyy)",
    "Was the illness suffered by Life Assured caused directly or indirectly by alcohol or drug abuse?",
    "If illness caused directly or indirectly by alcohol or drug abuse, please give details",
    "What is the staging of the tumour?",
    "Please state the tumour classification (eg TMN classification etc)",
    "Was the cancer completely localised?",
    "Was there invasion of tissues?",
    "Were regional lymph nodes involved?",
    "Were there distant metastases?",
    "Did the Life Assured undergo any surgery?",
    "Date of surgery (ddmmyyyy)",
    "Surgical procedure performed",
    "Was there any other mode of treatment, other than surgery, which could be undertaken to treat the Life Assured’s condition?",
    "Type of treatment other than surgery that could be undertaken to treat condition"
]

GE_FIELD_JSON_3 = [
    "Has the Life Assured underwent other mode of treatment?",
    "Date of other treatment (ddmmyyyy)",
    "Reason for no other mode of treatment",
    "What other forms of treatment did the Life Assured undergo (eg chemotherapy, radiotherapy etc)?",
    "If diagnosis is leukaemia, please provide the type of leukaemia",
    "If the diagnosis is malignant melanoma, please give full details of size, thickness (Breslow classification) and/or depth of invasion (Clark level)",
    "Is the diagnosis related to Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)?",
    "Date of diagnosis for HIV/AIDS (ddmmyyyy)",
    "Life Assured’s mental and cognitive abiliites",
    "Is Life Assured mentally incapacitated?"
]

GE_FIELD_JSON_4 = [
    "Does Life Assured have any other medical conditions?",
    "Medical conditions",
    "Does Life Assured have any family history?",
    "Family History",
    "Details of the Life Assured’s habits in relation to cigarette smoking, including the duration of smoking habit, number of cigarettes smoked per day and source of information",
    "Details of the Life Assured’s habit in relation to alcohol consumption including the amount of alcohol consumption per day and source of information",
    "Please provide any other information which may be of assistance to us in assessing this claim"
]

ge_queries = {
1: GE_FIELD_JSON_1,
2: GE_FIELD_JSON_2,
3: GE_FIELD_JSON_3,
4: GE_FIELD_JSON_4
}

# LLM

## Meta Rules

In [None]:
META_RULES = """
You are a precise clinical information extraction assistant.
Return ONLY valid JSON. Do not include markdown, comments, or explanations.

META RULES (apply to all fields):
- Output MUST be a single JSON object with ALL target keys present.
- Every field must be represented as an object with two keys:
- "value": the extracted string or "Yes"/"No"
- "confidence": a floating-point probability between 0 and 1 (e.g., 0.85), which should reflect probability that YOUR answer is correct. you are to generate this value
- Yes/No questions:
  - "value" MUST be "Yes" or "No".
  - If the notes explicitly confirm → "Yes".
  - If there is explicit denial (e.g., “no history of…”, “denies…”) OR the field is unrelated or not mentioned at all in the notes -> "No"
  - If “No” is chosen purely because of absence of mention,
    confidence should be capped at 0.5 to reflect uncertainty.
  - Always consider “No” the *default absence state*, not an explicit negation.ß
- Extraction hierarchy for patient tumour site/histology/staging:
1) DIAGNOSIS SUMMARY → use exact strings for “Primary Site”, “Diagnosis”.
2) CLINICAL AND TREATMENT SUMMARY → use ONLY the segment BEFORE the token "family history:".
3) Pathology/biopsy sections
4) Imaging IMPRESSION lines that clearly refer to the patient, not relatives.
- Family history handling:
- Ignore content AFTER "family history:" when filling patient fields.
- Text with family markers ["family history", "FH", "mother", "father", "sibling", "aunt",
"uncle", "cousin", "grandmother", "grandfather", "relative"] must NOT be used for patient fields.
- You WILL still extract family-history/risk-factor answers when those specific questions come later.
- For non-binary fields:
- If a value is unknown or not stated, use "" for "value" (not null).
- Still provide a "confidence" score (e.g., 0.5 if very uncertain).
- If conflicting mentions exist, the LAST appointment’s explicit patient diagnosis/site wins.
- Use exact phrases from the notes where possible (e.g., “HER2 3+ (Positive)”).
- Keep dates exactly as they appear in the notes; do not reformat.
- Do NOT invent or infer information beyond the yes/no rule.
- Do NOT add extra keys or structures.

Remember, return ONLY valid JSON. DO NOT include markdown, comments, or explanations.
"""


## NTUC META PROMPT

In [None]:
NTUC_FIELD_JSON_WITH_INLINE_1 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
"Over what period do your records extend? Start date (dd/mm/yyyy)": {
    "value": "",                // Find the EARLIEST "Visit/Appointment Date" or "Authored:" date
    "confidence": 0.0
},
"Over what period do your records extend? End date (dd/mm/yyyy)": {
    "value": "",                // Find the LATEST "Visit/Appointment Date" or "Authored:" date.
    "confidence": 0.0
},
"When did the Insured first consult you for this condition? (dd/mm/yyyy)": {
    "value": "",                // Earliest consult date associated with the condition
    "confidence": 0.0
},

"When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2)": [
    // Rules:
    // - If no symptoms are stated → return [].
    // - If exactly one symptom is stated → return ONE object (length = 1).
    // - If two or more symptoms are stated → return TWO objects (take the first two).
    // - Keep date strings exactly as in the notes; do not reformat.
    {
        "Symptom presented": {
            // CRITICAL: Initial symptom that led to the diagnosis.
            // DO NOT extract treatment side effects (e.g., "nausea", "mucositis", "rash") that appear after a treatment is mentioned.
            "value": "",
            "confidence": 0.0
        },
        "Duration of symptom": {
            "value": "",               // Copy duration associated with this symptom
            "confidence": 0.0
        },
        "Date of onset (dd/mm/yyyy)": {
            "value": "",               // Keep date text as-is
            "confidence": 0.0
        }
    }
    // Add a second object ONLY if there is a second symptom to capture.
]
}
"""

NTUC_FIELD_JSON_WITH_INLINE_2 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
"Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you?": {
    "value": "",           // Allowed values: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Details of other doctors consulted (rows 0..3)": [
    // Rules:
    // - If no consultation → return []
    // - If consultations exist → return 1..3 row objects (as many as are clearly present, capped at 3)
    {
        "Name of doctor": {
            "value": "",
            "confidence": 0.0
        },
        "Name and address of clinic / hospital": {
            "value": "",
            "confidence": 0.0
        },
        "Date(s) of consultation (dd/mm/yyyy)": {
            "value": "",
            "confidence": 0.0
        },
        "Diagnosis made": {
            "value": "",
            "confidence": 0.0
        }
    }
    // Add a second/third object ONLY if multiple consultations are clearly present
],
"Histological diagnosis": {
    "value": "",           // Extract overall diagnosis from DIAGNOSIS SUMMARY
    "confidence": 0.0
},
"Date of diagnosis (dd/mm/yyyy)": {
    "value": "",           // Copy date associated with diagnosis given
    "confidence": 0.0
},
"Doctor/clinic where diagnosis was first made": {
    "value": "",           // Look for doctor/clinic names for the first diagnosis or biopsy.
    "confidence": 0.0
},
"Date Insured was first informed of diagnosis (dd/mm/yyyy)": {
    "value": "",
    "confidence": 0.0
},
"Was a biopsy of the tumour performed?": {
    "value": "",           // "Yes" if "biopsy", "cnb", "TBLB", or "Surgical resection" is mentioned.
    "confidence": 0.0
},
"Biopsy date (dd/mm/yyyy)": {
    "value": "",           // Only if previous == "Yes"; keep original date format of biopsy
    "confidence": 0.0
},
"If No, how was the diagnosis confirmed?": {
    "value": "",           // Only if previous == "No"; copy explanation verbatim if present
    "confidence": 0.0
},
"Site or organ involved": {
    "value": "",           // Extract from "Primary Site" in "DIAGNOSIS SUMMARY". Patient site ONLY.
    "confidence": 0.0
},
"Staging": {
    "value": "",           // Extract TNM staging from "DIAGNOSIS SUMMARY" or "CLINICAL AND TREATMENT SUMMARY".
    "confidence": 0.0
},
"Has the cancer spread beyond the layer of cells?": {
    "value": "",           // "Yes" if diagnosis is "Invasive" or staging is T1 or higher. "No" if "in-situ".
    "confidence": 0.0
},
"Was the disease completely localised?": {
    "value": "",           // "Yes" if staging is N0 AND M0. "No" if N is 1+ OR M is 1+.
    "confidence": 0.0
},
"Was there invasion of adjacent tissues?": {
    "value": "",           // "Yes" if staging is T4.
    "confidence": 0.0
},
"Were regional lymph nodes involved?": {
    "value": "",           // "N0" means "No". "N1" or "N2" means "Yes". Prioritize final staging over speculative notes.
    "confidence": 0.0
},
"Were there distant metastases?": {
    "value": "",           // "M0" means "No". "M1" means "Yes". Prioritize final staging.
    "confidence": 0.0
},
"Metastases details": {
    "value": "",           // Only if previous == "Yes"; copy verbatim details (e.g., "liver and lung metastases")
    "confidence": 0.0
},
"Is the condition carcinoma-in-situ?": {
    "value": "",           // Allowed: "Yes" / "No"
    "confidence": 0.0
},
"Pre-malignant / non-invasive": {
    "value": "",           // Allowed: "Yes" / "No"
    "confidence": 0.0
},
"Borderline / suspicious malignancy": {
    "value": "",           // Allowed: "Yes" / "No"
    "confidence": 0.0
},
"Cervical dysplasia CIN1-3 (without CIS)": {
    "value": "",           // Allowed: "Yes" / "No"
    "confidence": 0.0
}
}
"""

NTUC_FIELD_JSON_WITH_INLINE_3 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
"Carcinoma-in-situ of biliary system": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"Hyperkeratoses, basal/squamous skin cancers": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"Bladder cancer T1N0M0 or below": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"Bladder papillary micro-carcinoma": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"Is Prostate cancer T1N0M0, T1, or a equivalent or lesser classification?": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"T1 subclass": {
    "value": "",           // Only fill if previous == "Yes" (T1a / T1b / T1c)
    "confidence": 0.0
},
"Is Thyriod cancer T1N0M0 or below?": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"Thyriod diameter": {
    "value": "",           // Only fill if previous == "Yes"
    "confidence": 0.0
},
"Is Thyroid papillary micro-carcinoma?": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"Thyroid papillary micro-carcinoma size": {
    "value": "",           // Only fill if previous == "Yes"
    "confidence": 0.0
},
"Leukaemia type": {
    "value": "",           // Leave blank if not applicable
    "confidence": 0.0
},
"Leukaemia RAI staging": {
    "value": "",           // Leave blank if not applicable
    "confidence": 0.0
},
"Melanoma size/thickness (Breslow mm)": {
    "value": "",           // Leave blank if not applicable
    "confidence": 0.0
},
"Melanoma Clark level": {
    "value": "",           // Leave blank if not applicable
    "confidence": 0.0
},
"Has the condition caused invasion beyond the epidermis?": {
    "value": "",           // Allowed values: "Yes" / "No"
    "confidence": 0.0
},
"GIST TNM classification": {
    "value": "",           // Only if diagnosis is Gastro-Intestinal Stroma Tumour (GIST)
    "confidence": 0.0
},
"GIST mitotic count (HPF)": {
    "value": "",           // Only if diagnosis is GIST
    "confidence": 0.0
},
"Has the patient received treatment for this illness? (rows 0..3)": [
    # Rules:
    # - If no treatment → return []
    # - If treatment exists → return 1..3 row objects (capped at 3)
    # - Keep date strings exactly as in notes
    {
        "Treatment type": {
            "value": "",
            "confidence": 0.0
        },
        "Date of treatment (dd/mm/yyyy)": {
            "value": "",
            "confidence": 0.0
        },
        "Duration of treatment": {
            "value": "",
            "confidence": 0.0
        }
    }
    # Add a second/third object ONLY if multiple distinct treatments are clearly present
],
"Has active treatment and therapy been rejected in favour of symptoms relief": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Active treatment rejection reason": {
    "value": "",           # Only if previous == "Yes"; copy verbatim
    "confidence": 0.0
},
"Was radical surgery done?": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Radical surgery code/table": {
    "value": "",           # Only if previous == "Yes"; copy verbatim
    "confidence": 0.0
},
"Radical surgery date date (dd/mm/yyyy)": {
    "value": "",           # Only if previous == "Yes"; keep original date format
    "confidence": 0.0
},
"For mastectomy cases, was reconstrucive surgery done or recommended?": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Reconstructive surgery date (dd/mm/yyyy)": {
    "value": "",           # Only if previous == "Yes"; keep original date format
    "confidence": 0.0
}
}
"""

NTUC_FIELD_JSON_WITH_INLINE_4 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
"Is the Insured still on follow-up at your clinic?": {
    "value": "",           // Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Next appointment date (dd/mm/yyyy)": {
    "value": "",           // Only if follow-up == "Yes"; leave blank if "No" or not stated
    "confidence": 0.0
},
"Discharge date (dd/mm/yyyy)": {
    "value": "",           // Only if follow-up == "No"; leave blank if "Yes" or not stated
    "confidence": 0.0
},
"Is the Insured terminally ill (i.e. death expected within 12 months)?": {
    "value": "",           // Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Terminal illness evaluation": {
    "value": "",           // Only if terminally ill == "Yes"; verbatim if present
    "confidence": 0.0
},
"Terminal illness assessment date (dd/mm/yyyy)": {
    "value": "",           // Only if terminally ill == "Yes"; keep original date format
    "confidence": 0.0
},
"Is the Insured referred to hospice care?": {
    "value": "",           // Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Hospice name": {
    "value": "",           // Only if hospice care == "Yes"; copy verbatim
    "confidence": 0.0
},
"Hospice care type - Inpatient": {
    "value": "",           // Allowed: "Yes" / ""; "Yes" if inpatient ticked. Only for this field, not inpatient, do not give "No"
    "confidence": 0.0
},
"Hospice inpatient admission date (dd/mm/yyyy)": {
    "value": "",           // Only if Hospice care type - Inpatient == "Yes"
    "confidence": 0.0
},
"Hospice care type - Day care": {
    "value": "",           // Allowed: "Yes" / ""; "Yes" if day care ticked. Only for this field, not day care, do not give "No"
    "confidence": 0.0
},
"Hospice daycare start date (dd/mm/yyyy)": {
    "value": "",           // Only if Hospice care type - Day care == "Yes"
    "confidence": 0.0
},
"Doctors/hospitals consulted for this condition (rows 0..3)": [
    // Rules:
    // - If no other doctors/hospitals consulted → return []
    // - If other doctors/hospitals consulted → return 1..3 row objects (as many as are clearly present, capped at 3)
    {
        "Name of doctor": {
            "value": "",
            "confidence": 0.0
        },
        "Name and Address of Clinic/Hospital": {
            "value": "",
            "confidence": 0.0
        },
        "Date(s) of consultation (dd/mm/yyyy)": {
            "value": "",
            "confidence": 0.0
        },
        "Diagnosis made": {
            "value": "",
            "confidence": 0.0
        }
    }
    // Add a second/third object only if multiple consultations are clearly present
],
"Malignant, pre-malignant or other related conditions or risk factors?": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Malignant, pre-malignant or other related conditions or risk factors details": {
    "value": "",           # Only if previous == "Yes"; include diagnosis, dates, doctors, source info
    "confidence": 0.0
},
"Medical history that would have increased the risk of cancer": {
    "value": "",           # Nature of illness, date of diagnosis, source of info
    "confidence": 0.0
},
"Family history that would have increased the risk of Cancer": {
    "value": "",           # Relationship, nature of illness, date, source
    "confidence": 0.0
},
"Smoking habits": {
    "value": "",           # Past and present, duration, # cigarettes/day, source of info
    "confidence": 0.0
},
"Alcohol consumption habits": {
    "value": "",           # Type, amount/day, duration, source of info
    "confidence": 0.0
},
"Is the tumour or cancer in any way caused directly or indirectly by alcohol or drug abuse?": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Alcohol/drug abuse details": {
    "value": "",           # Only if previous == "Yes"; copy verbatim if present
    "confidence": 0.0
}
}
"""

NTUC_FIELD_JSON_WITH_INLINE_5 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
"Tumour caused by HIV or AIDS?": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"HIV antibody status": {
    "value": "",           # Only if previous == "Yes"; leave blank if "No" or unknown
    "confidence": 0.0
},
"HIV/AIDS diagnosis date (dd/mm/yyyy)": {
    "value": "",           # Only if previous == "Yes"; keep original date format
    "confidence": 0.0
},
"Any other significant health conditions": {
    "value": "",           # Allowed: "Yes" / "No" / "" (unknown)
    "confidence": 0.0
},
"Details of other health conditions (rows 0..3)": [
    # Rules:
    # - If no health conditions → return []
    # - If conditions exist → return 1..3 row objects (as many as are clearly present, capped at 3)
    {
        "Diagnosis": {
            "value": "",
            "confidence": 0.0
        },
        "Name of doctor": {
            "value": "",
            "confidence": 0.0
        },
        "Name/address of clinic/hospital": {
            "value": "",
            "confidence": 0.0
        },
        "Date of diagnosis (dd/mm/yyyy)": {
            "value": "",
            "confidence": 0.0
        },
        "Duration of condition": {
            "value": "",
            "confidence": 0.0
        },
        "Treatment received": {
            "value": "",
            "confidence": 0.0
        }
    }
    # Add a second/third object ONLY if multiple conditions are clearly present
]
}
"""

In [None]:
NTUC_FIELD_JSON_SCHEMAS = {
    1: NTUC_FIELD_JSON_WITH_INLINE_1,
    2: NTUC_FIELD_JSON_WITH_INLINE_2,
    3: NTUC_FIELD_JSON_WITH_INLINE_3,
    4: NTUC_FIELD_JSON_WITH_INLINE_4,
    5: NTUC_FIELD_JSON_WITH_INLINE_5,
}

## GE META PROMPT

In [None]:
GE_FIELD_JSON_WITH_INLINE_1 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
    "Date when insured first consulted you for cancer (ddmmyyyy)": {
        "value": "",                // Find the EARLIEST "Visit/Appointment Date" or "Authored:" date
        "confidence": 0.0
    },

    "Please state symptoms presented and date symptoms first appeared (rows 0..3)": [
        // Rules:
        // - If no symptoms are stated → return [].
        // - If exactly one symptom is stated → return ONE object (length = 1).
        // - If three or more symptoms are stated → return THREE objects (take the first three).
        // - Keep date strings exactly as in the notes; do not reformat.
        {
            "Symptom": {
              // CRITICAL: Initial symptom that led to the diagnosis.
              // DO NOT extract treatment side effects (e.g., "nausea", "mucositis", "rash") that appear after a treatment is mentioned.
                "value": "",
                "confidence": 0.0
            },
            "Duration of symptom": {
                "value": "",               // Copy duration associated with this symptom
                "confidence": 0.0
            },
            "Date symptoms first started (dd/mm/yyyy)": {
                "value": "",               // Keep date text as-is
                "confidence": 0.0
            }
        }
        // Add a second object ONLY if multiple distinct symptoms are clearly present
    ],

    "Source of above information": {
        "value": "",                // State the source. Allowed: "Patient" / "Referring Doctor" / "Others" (unknown)
        "confidence": 0.0
    },

    "What is the source of the above information? If Referring Doctor / Others, specify name & address (rows 0..2)": [
        // Rules:
        // - If no referring doctors or Others are stated → return [].
        // - If one referring doctor is stated → return ONE object (length = 1).
        // - If two or more are stated → return up to TWO objects (take the first two).
        {
            "Name": {
                "value": "",               // Name of the referring doctor.
                "confidence": 0.0
            },
            "Address": {
                "value": "",               // Address of the referring doctor.
                "confidence": 0.0
            }
      }
      // Add a second and third object ONLY if there are multiple disticnt sources to capture.

    ],

    "Date when Cancer was FIRST diagnosed (DD/MM/YYYY)": {
        "value": "",                // Date of initial cancer diagnosis. Format as DD/MM/YYYY.
        "confidence": 0.0
    },
    "Diagnosis was first made by (name of Doctor)": {
        "value": "",                // Name of the doctor who made the initial diagnosis.
        "confidence": 0.0
    }
}
"""

GE_FIELD_JSON_WITH_INLINE_2 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
    "Actual diagnosis": {
        "value": "",                // State the final medical diagnosis from the diagnosis summary
        "confidence": 0.0
    },
    "Date when insured first became aware of this illness (ddmmyyyy)": {
        "value": "",                // Date patient was first aware of the illness.
        "confidence": 0.0
    },

    "Was the illness suffered by Life Assured caused directly or indirectly by alcohol or drug abuse?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    },

    "If illness caused directly or indirectly by alcohol or drug abuse, please give details": {
        "value": "",                // Only if previous == "Yes"; include details
        "confidence": 0.0
    },
    "What is the staging of the tumour?": {
        "value": "",                // e.g., Stage II, Stage IV.
        "confidence": 0.0
    },
    "Please state the tumour classification (eg TMN classification etc)": {
        "value": "",                // Extract the TNM code (e.g., "pT2NO", "T1cN0M0").
        "confidence": 0.0
    },
    "Was the cancer completely localised?": {
        "value": "",               //"Yes" if staging is N0 AND M0. "No" if N is 1+ OR M is 1+.
        "confidence": 0.0
    },
    "Was there invasion of tissues?": {
        "value":"",                // "Yes" if staging is T4
        "confidence": 0.0
    },
    "Were regional lymph nodes involved?": {
        "value": "",                // "N0" means "No". "N1" or "N2" means "Yes".
        "confidence": 0.0
    },
    "Were there distant metastases?": {
        "value": ""                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    },
    "Did the Life Assured undergo any surgery?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    },
    "Date of surgery (ddmmyyyy)": {
        "value": "",                // Only if previous == "Yes"; keep original date format
        "confidence": 0.0
    },
    "Surgical procedure performed": {
        "value": "",                // If yes, name of the surgical procedure.
        "confidence": 0.0
    },
    "Was there any other mode of treatment, other than surgery, which could be undertaken to treat the Life Assured’s condition?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    },
    "Type of treatment other than surgery that could be undertaken to treat condition": {
        "value": "",               // Only if previous == "Yes";
        "confidence": 0.0
    }
}
"""

GE_FIELD_JSON_WITH_INLINE_3 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{

    "Has the Life Assured underwent other mode of treatment?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    },

    "Date of other treatment (ddmmyyyy)": {
        "value": "",                // Only if previous == "Yes"; keep original date format
        "confidence": 0.0
    },
    "Reason for no other mode of treatment": {
        "value": "",                // If no other mode of treatment, provide reason
        "confidence": 0.0
    },
    "What other forms of treatment did the Life Assured undergo (eg chemotherapy, radiotherapy etc)?": {
        "value": "",                // List other treatments like chemotherapy, radiotherapy.
        "confidence": 0.0
    },
    "If diagnosis is leukaemia, please provide the type of leukaemia": {
        "value": "",                // Specify type, e.g., AML, CML.
        "confidence": 0.0
    },
    "If the diagnosis is malignant melanoma, please give full details of size, thickness (Breslow classification) and/or depth of invasion (Clark level)": {
        "value": "",                // Provide specifics for melanoma, e.g., "Breslow 0.5mm, Clark Level II".
        "confidence": 0.0
    },
    "Is the diagnosis related to Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    },
    "Date of diagnosis for HIV/AIDS (ddmmyyyy)": {
        "value": "",                // Only if previous == "Yes"; keep original date format
        "confidence": 0.0
    }
    "Life Assured’s mental and cognitive abiliites": {
        "value": "",                // Summary of patient's mental/cognitive state.
        "confidence": 0.0
    },
    "Is Life Assured mentally capable?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    }
}


"""
GE_FIELD_JSON_WITH_INLINE_4 = """
FIELD JSON WITH INLINE META (guidance in comments — DO NOT return comments):

{
    "Does Life Assured have any other medical conditions?": {
        "value": "",                // Check "PMHx" or "Other Med History". "nil" = "No". "HTN" etc. = "Yes".
        "confidence": 0.0
    }

    "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3)": [
        // Rules:
        // - If no other conditions are stated → return [].
        // - If one other condition is stated → return ONE object (length = 1).
        // - If three or more are stated → return up to THREE objects (take the first three).
        {
            "Medical condition": {
                "value": "",
                "confidence": 0.0
            },
            "Diagnosis date (dd/mm/yyyy)": {
                "value": "",
                "confidence": 0.0
            },
            "Name & address of treating doctor": {
                "value": "",
                "confidence": 0.0
            }
        }
        // Add a second and third object ONLY if there are multiple distinct medical conditions
    ],

    "Does Life Assured have any family history?": {
        "value": "",                // Allowed: "Yes" / "No" / "" (unknown)
        "confidence": 0.0
    }

    "Family History (rows 0..3)": [
        // Rules:
        // - If no family history is stated → return [].
        // - If one family member is stated → return ONE object (length = 1).
        // - If two or more are stated → return up to THREE objects (take the first three).
        {
            "Relationship to Life Assured": {
                "value": "",
                "confidence": 0.0
            },
            "Family history condition": {
                "value": "",
                "confidence": 0.0
            },
            "Age of onset": {
                "value": "",
                "confidence": 0.0
            }
        }
        // Add a second and third object ONLY if there are multiple distinct family history
    ],
    "Details of the Life Assured’s habits in relation to cigarette smoking, including the duration of smoking habit, number of cigarettes smoked per day and source of information": {
        "value": "",                // e.g., "Smoker, 10 years, 1 pack/day" or "Non-smoker".
        "confidence": 0.0
    },
    "Details of the Life Assured’s habit in relation to alcohol consumption including the amount of alcohol consumption per day and source of information": {
        "value": "",                // e.g., "Social drinker, 2-3 units/week" or "No alcohol consumption".
        "confidence": 0.0
    },
    "Please provide any other information which may be of assistance to us in assessing this claim": {
        "value": "",                // Any other relevant medical notes or summary.
        "confidence": 0.0
    }

}
"""


In [None]:
GE_FIELD_JSON_SCHEMAS = {
    1: GE_FIELD_JSON_WITH_INLINE_1,
    2: GE_FIELD_JSON_WITH_INLINE_2,
    3: GE_FIELD_JSON_WITH_INLINE_3,
    4: GE_FIELD_JSON_WITH_INLINE_4,
}

## BUILD PROMPT

In [None]:
# NTUC_FIELD_JSON_SCHEMAS
# GE_FIELD_JSON_SCHEMAS

def build_prompt(i_txt: str, page_num: int, field_json_schemas: dict):
  system = META_RULES.strip()
  schema = field_json_schemas.get(page_num, "")

  user = f"""
You are given the retrieval results from RAG, where it details the most relevant sections of doctor's records for a specific patient in Singapore. They are excerpts from different sections found in the appointment notes with relevant dates and information:
RETRIREVED TEXT:
<<<
{i_txt}
>>>


Task:
- Fill the JSON schema below using ONLY information from the notes, and also include the confidence score which should reflect probability that YOUR answer is correct
- If no information exists, output "".
- Try your best to fill in the JSON as completely as possible, even if it is not accurate, you may score it a low confidence score.
- Return JSON only.

Remember, return ONLY valid JSON. DO NOT include markdown, comments, or explanations. No comments at all.

JSON schema:
{schema}
""".strip()

  return system + "\n" + user

# Post-Processing

## Clean LLM output

In [None]:
import re
import json

def process_llm_output(input_text: str) -> dict:
    """
    Cleans an LLM output text file and converts it into a single flattened JSON file.

    Args:
        input_text (str): raw LLM text.

    Returns:
        dict: The merged and flattened JSON object.
    """

    # --- Helpers ---
    def clean_llm_output(text: str) -> str:
        text = re.sub(r"--- Page \d+ ---", "", text)
        text = re.sub(r"```json", "", text)
        text = re.sub(r"```", "", text)

        def _strip_comments(match):
            s = match.group(0)
            if s.startswith('"'):
                return s
            return re.sub(r"//.*", "", s)

        text = re.sub(r'"(?:\\.|[^"\\])*"|[^"\n]+', _strip_comments, text)
        text = re.sub(r",\s*([}\]])", r"\1", text)
        text = re.sub(r"[ \t]+", " ", text)
        text = re.sub(r"\n\s*\n", "\n", text)
        return text.strip()

    def extract_json_objects(text: str):
        objs = []
        n = len(text)
        i = 0
        in_str = False
        escape = False
        depth = 0
        start = None
        while i < n:
            ch = text[i]
            if ch == '"' and not escape:
                in_str = not in_str
            if not in_str:
                if ch == '{':
                    if depth == 0:
                        start = i
                    depth += 1
                elif ch == '}':
                    depth -= 1
                    if depth == 0 and start is not None:
                        objs.append(text[start:i+1])
                        start = None
            if ch == "\\" and not escape:
                escape = True
            else:
                escape = False
            i += 1
        if start is not None:
            tail = text[start:]
            opens = tail.count('{') - tail.count('}')
            opens_sq = tail.count('[') - tail.count(']')
            tail_fixed = tail + ('}' * opens) + (']' * opens_sq)
            objs.append(tail_fixed)
        return objs

    def repair_json(s: str) -> str:
        s = re.sub(r",\s*([}\]])", r"\1", s)
        open_curly = s.count('{') - s.count('}')
        open_sq = s.count('[') - s.count(']')
        if open_curly > 0:
            s += '}' * open_curly
        if open_sq > 0:
            s += ']' * open_sq
        return s

    def flatten_json(obj: dict, parent_key: str = "", sep: str = " ") -> dict:
        items = {}
        for k, v in obj.items():
            new_key = f"{parent_key}{sep}{k}".strip()
            if isinstance(v, dict):
                items.update(flatten_json(v, new_key, sep=sep))
            elif isinstance(v, list):
                for idx, elem in enumerate(v, 1):
                    if isinstance(elem, dict):
                        items.update(flatten_json(elem, f"{new_key} ({idx})", sep=sep))
                    else:
                        items[f"{new_key} ({idx})"] = elem
            else:
                items[new_key] = v
        return items

    def fix_short_dates(data: dict) -> dict:
        """Detect and fix ddmmyy-style date strings by expanding to ddmmyyyy."""
        fixed = {}
        for k, v in data.items():
            if isinstance(v, str):
                # match exactly 6 digits (e.g., '101025' -> '10/10/25' style)
                if re.fullmatch(r"\d{6}", v):
                    dd, mm, yy = v[:2], v[2:4], v[4:]
                    v = f"{dd}{mm}20{yy}"  # add '20' before the year part
                # 'dd/mm/yy' format
                elif re.fullmatch(r"\d{2}/\d{2}/\d{2}", v):
                    dd, mm, yy = v.split("/")
                    v = f"{dd}/{mm}/20{yy}"
            fixed[k] = v
        return fixed
    # --- Read + clean ---
    cleaned = clean_llm_output(input_text)
    json_chunks = extract_json_objects(cleaned)

    merged = {}
    all_keys = set()

    for i, chunk in enumerate(json_chunks, 1):
        fixed = repair_json(chunk)
        try:
            obj = json.loads(fixed)
        except json.JSONDecodeError:
            try:
                first = fixed.find('{')
                last = fixed.rfind('}')
                if first != -1 and last != -1 and last > first:
                    candidate = repair_json(fixed[first:last+1])
                    obj = json.loads(candidate)
                else:
                    continue
            except Exception:
                continue

        flat = flatten_json(obj)
        merged.update(flat)
        all_keys.update(flat.keys())

    merged = fix_short_dates(merged)

    return merged

## Utilities for Mapping

In [None]:
import json
import re

def split_date(date_str):
    """Split various date formats into (dd, mm, yyyy)."""
    if not date_str:
        return "", "", ""
    # dd/mm/yyyy
    m = re.match(r"(\d{2})/(\d{2})/(\d{4})", date_str)
    if m:
        return m.group(1), m.group(2), m.group(3)
    # yyyy-mm-dd
    m = re.match(r"(\d{4})-(\d{2})-(\d{2})", date_str)
    if m:
        return m.group(3), m.group(2), m.group(1)
    # dd-mmm-yyyy
    m = re.match(r"(\d{2})-([A-Za-z]{3})-(\d{4})", date_str)
    if m:
        cal = {
            "Jan": "01",
            "Feb": "02",
            "Mar": "03",
            "Apr": "04",
            "May": "05",
            "Jun": "06",
            "Jul": "07",
            "Aug": "08",
            "Sep": "09",
            "Oct": "10",
            "Nov": "11",
            "Dec": "12"
        }
        month = m.group(2)
        month_num = cal.get(month)
        return m.group(1), month_num, m.group(3)
    # ddmmyy
    m = re.match(r"(\d{2})-(\d{2})-(\d{2})", date_str)
    if m:
        year = m.group(3)
        new_year = "20" + year
        return m.group(1), m.group(2), new_year
    # fallback
    return date_str, "", ""

def set_field_with_confidence(field, combined, key_base):
    """Set value + confidence for text fields."""
    value = combined.get(f"{key_base} value", "")
    confidence = combined.get(f"{key_base} confidence", "")
    field["field_value"] = value
    field["confidence"] = str(confidence) if confidence != "" else ""


def set_date_with_confidence(field, combined, key_base, name):
    """Set dd/mm/yyyy split fields with confidence."""
    date_str = combined.get(f"{key_base} value", "")
    confidence = combined.get(f"{key_base} confidence", "")
    dd, mm, yyyy = split_date(date_str)

    if "(dd)" in name:
        field["field_value"] = dd
    elif "(mm)" in name:
        field["field_value"] = mm
    elif "(yyyy)" in name:
        field["field_value"] = yyyy

    field["confidence"] = str(confidence) if confidence != "" else ""


def set_checkbox_with_confidence(field, combined, key_base):
    """Set Yes/No checkboxes with confidence."""
    value = combined.get(f"{key_base} value", "")
    confidence = combined.get(f"{key_base} confidence", "")

    if "Yes" in field["field_name"] and value == "Yes":
        field["field_value"] = "Yes"
    elif "No" in field["field_name"] and value == "No":
        field["field_value"] = "Yes"
    else:
        field["field_value"] = ""

    field["confidence"] = str(confidence) if confidence != "" else ""

def set_delete_with_confidence(field, combined, key_base):
    """Set Yes/No delete fields with confidence."""
    value = combined.get(f"{key_base} value", "")
    confidence = combined.get(f"{key_base} confidence", "")

    if "Yes" in field["field_name"] and value == "No":
        field["field_value"] = "X"
    elif "No" in field["field_name"] and value == "Yes":
        field["field_value"] = "X"
    else:
        field["field_value"] = ""

    field["confidence"] = str(confidence) if confidence != "" else ""

# only for GE form, where unique field needs to be deleted
def set_source_with_confidence(field, combined, key_base):
    """Set Yes/No delete fields with confidence."""
    value = combined.get(f"{key_base} value", "")
    confidence = combined.get(f"{key_base} confidence", "")

    field_name = field["field_name"]

    # Case 1: If the selected value is "Patient"
    if value == "Patient":
        if "Referring Doctor" in field_name or "Others" in field_name:
            field["field_value"] = "X"
        elif "Patient" in field_name:
            field["field_value"] = ""
        else:
            field["field_value"] = ""

    # Case 2: If the selected value is "Referring Doctor"
    elif value == "Referring Doctor":
        if "Patient" in field_name or "Others" in field_name:
            field["field_value"] = "X"
        elif "Referring Doctor" in field_name:
            field["field_value"] = ""
        else:
            field["field_value"] = ""

    # Case 3: If the selected value is "Others"
    elif value == "Others":
        if "Patient" in field_name or "Referring Doctor" in field_name:
            field["field_value"] = "X"
        elif "Others" in field_name:
            field["field_value"] = ""
        else:
            field["field_value"] = ""

    # Default
    else:
        field["field_value"] = ""

    # Set confidence if present
    field["confidence"] = str(confidence) if confidence != "" else ""

## Mapping

### MAPPER - NTUC

In [None]:
def map_combined_to_fields_income(combined, form_fields):
    for field in form_fields["fields"]:
        name = field["field_name"]

        # --- Doctors/hospitals consulted (explicit mappings) ---
        if "Please provide the date(s) of consultations at listed clinics/hospitals to which the Insured has attended for this condition (1)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (1) Date(s) of consultation (dd/mm/yyyy)")
        elif "Please provide the name of doctor(s) which the Insured has been referred to for this condition (1)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (1) Name of doctor")
        elif "Please provide the name and address of clinics/hospitals to which the Insured has attended for this condition (1)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (1) Name and Address of Clinic/Hospital")
        elif "Please provide details of diagnosis made during the consultation(s) at listed clinics/hospitals to which the Insured has attended for this condition (1)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (1) Diagnosis made")

        if "Please provide the date(s) of consultations at listed clinics/hospitals to which the Insured has attended for this condition (2)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (2) Date(s) of consultation (dd/mm/yyyy)")
        elif "Please provide the name of doctor(s) which the Insured has been referred to for this condition (2)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (2) Name of doctor")
        elif "Please provide the name and address of clinics/hospitals to which the Insured has attended for this condition (2)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (2) Name and Address of Clinic/Hospital")
        elif "Please provide details of diagnosis made during the consultation(s) at listed clinics/hospitals to which the Insured has attended for this condition (2)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (2) Diagnosis made")

        if "Please provide the date(s) of consultations at listed clinics/hospitals to which the Insured has attended for this condition (3)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (3) Date(s) of consultation (dd/mm/yyyy)")
        elif "Please provide the name of doctor(s) which the Insured has been referred to for this condition (3)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (3) Name of doctor")
        elif "Please provide the name and address of clinics/hospitals to which the Insured has attended for this condition (3)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (3) Name and Address of Clinic/Hospital")
        elif "Please provide details of diagnosis made during the consultation(s) at listed clinics/hospitals to which the Insured has attended for this condition (3)" in name:
            set_field_with_confidence(field, combined, "Doctors/hospitals consulted for this condition (rows 0..3) (3) Diagnosis made")

        elif "Has the Insured ever had any malignant, pre-malignant or other related conditions or risk factors? If “Yes”, please provide details, including diagnosis, date of diagnosis, dates of consultation, name and address of doctor/ clinic and source of information" in name:
            set_field_with_confidence(field, combined, "Malignant, pre-malignant or other related conditions or risk factors details")
        elif "Has the Insured ever had any malignant, pre-malignant or other related conditions or risk factors?" in name:
            set_checkbox_with_confidence(field, combined, "Malignant, pre-malignant or other related conditions or risk factors?")

        # --- Period of records ---
        elif "Over what period do your records extend? Start date" in name:
            set_date_with_confidence(field, combined, "Over what period do your records extend? Start date (dd/mm/yyyy)", name)

        elif "Over what period do your records extend? End date" in name:
            set_date_with_confidence(field, combined, "Over what period do your records extend? End date (dd/mm/yyyy)", name)

        # --- First consultation ---
        elif "When did the Insured first consult you" in name:
            set_date_with_confidence(field, combined, "When did the Insured first consult you for this condition? (dd/mm/yyyy)", name)

        elif "duration of symptoms" in name and "(1)" in name:
            set_field_with_confidence(field, combined, "When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2) (1) Duration of symptom")

        elif "date of onset" in name and "(1)" in name:
            set_field_with_confidence(field, combined, "When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2) (1) Date of onset (dd/mm/yyyy)")

        elif "symptoms presented" in name and "(1)" in name:
            set_field_with_confidence(field, combined, "When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2) (1) Symptom presented")

        elif "duration of symptoms" in name and "(2)" in name:
            set_field_with_confidence(field, combined, "When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2) (2) Duration of symptom")

        elif "date of onset" in name and "(2)" in name:
            set_field_with_confidence(field, combined, "When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2) (2) Date of onset (dd/mm/yyyy)")

        elif "symptoms presented" in name and "(2)" in name:
            set_field_with_confidence(field, combined, "When you first saw the Insured, what were the symptoms presented and their duration? (rows 0..2) (2) Symptom presented")

        # --- Other doctors consulted ---
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Name of Doctor (1)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (1) Name of doctor")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and  "clinic / hospital (1)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (1) Name and address of clinic / hospital")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Date(s) of consultation (dd/mm/yyyy) (1)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (1) Date(s) of consultation (dd/mm/yyyy)")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Diagnosis made (1)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (1) Diagnosis made")

        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Name of Doctor (2)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (2) Name of doctor")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and  "clinic / hospital (2)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (2) Name and address of clinic / hospital")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Date(s) of consultation (dd/mm/yyyy) (2)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (2) Date(s) of consultation (dd/mm/yyyy)")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Diagnosis made (2)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (2) Diagnosis made")

        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Name of Doctor (3)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (3) Name of doctor")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and  "clinic / hospital (3)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (3) Name and address of clinic / hospital")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Date(s) of consultation (dd/mm/yyyy) (3)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (3) Date(s) of consultation (dd/mm/yyyy)")
        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you? If “Yes”," and "Diagnosis made (3)" in name:
            set_field_with_confidence(field, combined, "Details of other doctors consulted (rows 0..3) (3) Diagnosis made")

        elif "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you?" in name:
            set_checkbox_with_confidence(field, combined, "Did the Insured consult any other doctors for this illness or its symptoms before he/she consulted you?")

        # --- Histological diagnosis ---
        elif "histological diagnosis" in name.lower():
            set_field_with_confidence(field, combined, "Histological diagnosis")

        elif "Date of diagnosis (dd/mm/yyyy):" in name:
            set_date_with_confidence(field, combined, "Date of diagnosis (dd/mm/yyyy)", name)

        elif "where the diagnosis was first made" in name:
            set_field_with_confidence(field, combined, "Doctor/clinic where diagnosis was first made")

        elif "when the Insured was first informed of the diagnosis" in name:
            set_date_with_confidence(field, combined, "Date Insured was first informed of diagnosis (dd/mm/yyyy)", name)

        # --- Biopsy ---
        elif "date of biopsy" in name:
            set_date_with_confidence(field, combined, "Biopsy date (dd/mm/yyyy)", name)
        elif "Was a biopsy of the tumour performed? If “No”, please state why and how the diagnosis was confirmed" in name:
            set_field_with_confidence(field, combined, "If No, how was the diagnosis confirmed?")
        elif "Was a biopsy of the tumour performed?" in name:
            set_checkbox_with_confidence(field, combined, "Was a biopsy of the tumour performed?")

        # --- Tumour details ---
        elif "site or organ involved" in name:
            set_field_with_confidence(field, combined, "Site or organ involved")

        elif "is the staging of the tumour?" in name:
            set_field_with_confidence(field, combined, "Staging")

        elif "Has the cancer spread" in name:
            set_checkbox_with_confidence(field, combined, "Has the cancer spread beyond the layer of cells?")
        elif "Was the disease completely localised" in name:
            set_checkbox_with_confidence(field, combined, "Was the disease completely localised?")
        elif "Was there invasion of adjacent tissues" in name:
            set_checkbox_with_confidence(field, combined, "Was there invasion of adjacent tissues?")
        elif "Were regional lymph nodes involved" in name:
            set_checkbox_with_confidence(field, combined, "Were regional lymph nodes involved?")
        elif "Were there distant metastases? If “Yes”, please provide full details, including site of any metastases, etc" in name:
            set_field_with_confidence(field, combined, "Metastases details")
        elif "Were there distant metastases" in name:
            set_checkbox_with_confidence(field, combined, "Were there distant metastases?")

        # --- Special conditions (CIS, premalignant, etc.) ---
        elif "Is the condition carcinoma-in-situ?" in name:
            set_checkbox_with_confidence(field, combined, "Is the condition carcinoma-in-situ?")
        elif "Is the condition pre-malignant or non-invasive?" in name:
            set_checkbox_with_confidence(field, combined, "Pre-malignant / non-invasive")
        elif "borderline malignancy" in name:
            set_checkbox_with_confidence(field, combined, "Borderline / suspicious malignancy")
        elif "Cervical Dysplasia" in name:
            set_checkbox_with_confidence(field, combined, "Cervical dysplasia CIN1-3 (without CIS)")

        elif "Carcinoma-in-situ of the Biliary" in name:
            set_checkbox_with_confidence(field, combined, "Carcinoma-in-situ of biliary system")
        elif "Hyperkeratoses" in name:
            set_checkbox_with_confidence(field, combined, "Hyperkeratoses, basal/squamous skin cancers")
        elif "Bladder Cancer" in name:
            set_checkbox_with_confidence(field, combined, "Bladder cancer T1N0M0 or below")
        elif "Papillary Micro-carcinoma of the Bladder" in name:
            set_checkbox_with_confidence(field, combined, "Bladder papillary micro-carcinoma")

        elif "Prostate cancer" in name:
            set_checkbox_with_confidence(field, combined, "Is Prostate cancer T1N0M0, T1, or a equivalent or lesser classification?")
        elif "Thyroid Cancer" in name:
            set_checkbox_with_confidence(field, combined, "Is Thyriod cancer T1N0M0 or below?")
        elif "size in diameter (cm)" in name and "Thyroid" in name:
            set_field_with_confidence(field, combined, "Thyriod diameter")

        elif "Papillary Micro-carcinoma of the Thyroid" in name:
            set_checkbox_with_confidence(field, combined, "Is Thyroid papillary micro-carcinoma?")
        elif "size in diameter (cm)" in name and "Papillary Micro-carcinoma" in name:
            set_field_with_confidence(field, combined, "Thyroid papillary micro-carcinoma size")

        # --- Leukaemia / Melanoma / GIST ---
        elif "If the diagnosis is leukaemia, please state type of leukaemia" in name:
            set_field_with_confidence(field, combined, "Leukaemia type")
        elif "If the diagnosis is leukaemia, please state type of RAI staging" in name:
            set_field_with_confidence(field, combined, "Leukaemia RAI staging")
        elif "malignant melanoma" in name and "Breslow" in name:
            set_field_with_confidence(field, combined, "Melanoma size/thickness (Breslow mm)")
        elif "malignant melanoma" in name and "Clark" in name:
            set_field_with_confidence(field, combined, "Melanoma Clark level")
        elif "Has the condition caused invasion beyond the epidermis" in name:
            set_checkbox_with_confidence(field, combined, "Has the condition caused invasion beyond the epidermis?")
        elif "GIST" in name and "classification" in name:
            set_field_with_confidence(field, combined, "GIST TNM classification")
        elif "GIST" in name and "Mitotic" in name:
            set_field_with_confidence(field, combined, "GIST mitotic count (HPF)")

        # --- Treatments ---

        elif "Please provide full details of all type of treatment provided (1)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (1) Treatment type")
        elif "Please provide full details of date of treatment provided (dd/mm/yyyy) (1)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (1) Date of treatment (dd/mm/yyyy)")
        elif "Please provide full details of duration of treatment provided (1)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (1) Duration of treatment")

        elif "Please provide full details of all type of treatment provided (2)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (2) Treatment type")
        elif "Please provide full details of date of treatment provided (dd/mm/yyyy) (2)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (2) Date of treatment (dd/mm/yyyy)")
        elif "Please provide full details of duration of treatment provided (2)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (2) Duration of treatment")

        elif "Please provide full details of all type of treatment provided (3)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (3) Treatment type")
        elif "Please provide full details of date of treatment provided (dd/mm/yyyy) (3)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (3) Date of treatment (dd/mm/yyyy)")
        elif "Please provide full details of duration of treatment provided (3)" in name:
            set_field_with_confidence(field, combined, "Has the patient received treatment for this illness? (rows 0..3) (3) Duration of treatment")

        # --- Active treatment rejection ---
        elif "Has active treatment and therapy" in name:
            set_checkbox_with_confidence(field, combined, "Has active treatment and therapy been rejected in favour of symptoms relief")
        elif "Active treatment rejection reason" in name:
            set_field_with_confidence(field, combined, "Active treatment rejection reason")

        # --- Surgeries ---
        elif "Was radical surgery (total and complete removal of the affected organ) done? If “Yes”, please state the name of the surgery, surgical code/table" in name:
            set_field_with_confidence(field, combined, "Radical surgery code/table")
        elif "Was radical surgery (total and complete removal of the affected organ) done? If “Yes”, please state the date surgery was performed" in name:
            set_date_with_confidence(field, combined, "Radical surgery date (dd/mm/yyyy)", name)
        elif "Was radical surgery (total and complete removal of the affected organ) done?" in name:
            set_checkbox_with_confidence(field, combined, "Was radical surgery done?")

        elif "For mastectomy cases, was reconstructive surgery done or recommended? If “Yes”, please state date surgery was performed" in name:
            set_date_with_confidence(field, combined, "Reconstructive surgery date (dd/mm/yyyy)", name)
        elif "For mastectomy cases, was reconstructive surgery done or recommended?" in name:
            set_checkbox_with_confidence(field, combined, "For mastectomy cases, was reconstructive surgery done or recommended?")

        elif "reconstructive surgery" in name:
            set_date_with_confidence(field, combined, "Reconstructive surgery date (dd/mm/yyyy)", name)

        # --- Follow-up / Discharge ---
        elif "Is the Insured still on follow-up at your clinic? If “Yes”, please provide state date of next appointment (dd/mm/yyyy)" in name:
            set_date_with_confidence(field, combined, "Next appointment date (dd/mm/yyyy)", name)
        elif "Is the Insured still on follow-up at your clinic? If \"No”, please provide date of discharge (dd/mm/yyyy)" in name:
            set_date_with_confidence(field, combined, "Discharge date (dd/mm/yyyy)", name)
        elif "Is the Insured still on follow-up" in name:
            set_checkbox_with_confidence(field, combined, "Is the Insured still on follow-up at your clinic?")

        # --- Terminal illness ---
        elif "Is the Insured terminally ill, ie death is expected within 12 months? If “Yes”, please provide details on the basis of your evaluation" in name:
            set_field_with_confidence(field, combined, "Terminal illness evaluation")
        elif "the Insured terminally ill, ie death is expected within 12 months? If “Yes”, please indicate the date on which the Insured is assessed to be terminally ill" in name:
            set_date_with_confidence(field, combined, "Terminal illness assessment date (dd/mm/yyyy)", name)
        elif "Is the Insured terminally ill, ie death is expected within 12 months?" in name:
            set_checkbox_with_confidence(field, combined, "Is the Insured terminally ill (i.e. death expected within 12 months)?")

        # --- Hospice ---
        elif "name of hospice" in name:
            set_field_with_confidence(field, combined, "Hospice name")

        elif "Is the Insured referred to hospice care? If inpatient, please state date of admission" in name:
            set_date_with_confidence(field, combined, "Hospice inpatient admission date (dd/mm/yyyy)", name)
        elif "Is the Insured referred to hospice care? If yes, please state if it is inpatient" in name:
            set_field_with_confidence(field, combined, "Hospice care type - Inpatient")

        elif "Is the Insured referred to hospice care? If day care, please state start date (dd/mm/yyyy)" in name:
            set_date_with_confidence(field, combined, "Hospice daycare start date (dd/mm/yyyy)", name)
        elif "Is the Insured referred to hospice care? If yes, please state if it is day care" in name:
            set_field_with_confidence(field, combined, "Hospice care type - Day care")


        elif "hospice care" in name and ("Yes" in name or "No" in name):
            set_checkbox_with_confidence(field, combined, "Is the Insured referred to hospice care?")

        # --- Family / Medical / Lifestyle ---

        elif "Please give details of the Insured’s medical history which would have increased the risk of Cancer (including nature of illness, date of diagnosis and source of information)" in name:
            set_field_with_confidence(field, combined, "Medical history that would have increased the risk of cancer")
        elif "Please give details of the Insured’s family history which would have increased the risk of Cancer (including the relationship, nature of illness, date of  diagnosis and source of information)" in name:
            set_field_with_confidence(field, combined, "Family history that would have increased the risk of Cancer")
        elif "Please give details of the Insured’s habits in relation to past and present smoking, including the duration of smoking habits, number of cigarettes smoked  per day and source of this information" in name:
            set_field_with_confidence(field, combined, "Smoking habits")
        elif "Please give details of the Insured’s habits in relation to alcohol consumption, including the type of alcohol, amount of alcohol consumption per day,  duration of such consumption and source of this information" in name:
            set_field_with_confidence(field, combined, "Alcohol consumption habits")

        elif "Is the tumour or cancer in any way caused directly or indirectly by alcohol or drug abuse?" in name:
            set_checkbox_with_confidence(field, combined, "Is the tumour or cancer in any way caused directly or indirectly by alcohol or drug abuse?")


        # --- HIV / AIDS ---
        elif "Is the tumour in the presence of Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)? If “Yes” please state HIV antibody status" in name:
            set_field_with_confidence(field, combined, "HIV antibody status")
        elif "Is the tumour in the presence of Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)? If “Yes” please state date of diagnosis for HIV/AIDS (dd/mm/yyyy)" in name:
            set_date_with_confidence(field, combined, "HIV/AIDS diagnosis date (dd/mm/yyyy)", name)
        elif "Is the tumour in the presence of Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)?" in name:
            set_checkbox_with_confidence(field, combined, "Tumour caused by HIV or AIDS?")


        # --- Other significant health conditions ---

        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of diagnosis (1)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (1) Diagnosis")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide name of doctor that diagnosed (1)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (1) Name of doctor")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of Name and address of clinic/ hospital (1)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (1) Name/address of clinic/hospital")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of date of diagnosis (dd/mm/yyyy) (1)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (1) Date of diagnosis (dd/mm/yyyy)")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of Duration of condition (1)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (1) Duration of condition")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of treatment received (1)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (1) Treatment received")

        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of diagnosis (2)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (2) Diagnosis")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide name of doctor that diagnosed (2)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (2) Name of doctor")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of Name and address of clinic/ hospital (2)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (2) Name/address of clinic/hospital")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of date of diagnosis (dd/mm/yyyy) (2)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (2) Date of diagnosis (dd/mm/yyyy)")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of Duration of condition (2)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (2) Duration of condition")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of treatment received (2)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (2) Treatment received")

        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of diagnosis (3)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (3) Diagnosis")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide name of doctor that diagnosed (3)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (3) Name of doctor")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of Name and address of clinic/ hospital (3)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (3) Name/address of clinic/hospital")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of date of diagnosis (dd/mm/yyyy) (3)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (3) Date of diagnosis (dd/mm/yyyy)")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of Duration of condition (3)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (3) Duration of condition")
        elif "Does Insured have or ever had any other significant health condition(s)? If “Yes”, please provide details of treatment received (3)" in name:
            set_field_with_confidence(field, combined, "Details of other health conditions (rows 0..3) (3) Treatment received")

        elif "Does Insured have or ever had any other significant health condition(s)?" in name:
            set_checkbox_with_confidence(field, combined, "Any other significant health conditions")

    return form_fields

### MAPPER - GE

In [None]:
def map_combined_to_fields_ge(combined, form_fields):
    for field in form_fields["fields"]:
        name = field["field_name"]

        # --- Page 1 ---
        if "Date when insured first consulted you for cancer" in name:
            set_field_with_confidence(field, combined, "Date when insured first consulted you for cancer (ddmmyyyy)")

        elif "Please state symptoms presented (1)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (1) Symptom")
        elif "Please state duration of symptoms presented (1)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (1) Duration of symptom")
        elif "Please state the date that the symptoms first appeared (1)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (1) Date symptoms first started (dd/mm/yyyy)")

        elif "Please state symptoms presented (2)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (2) Symptom")
        elif "Please state duration of symptoms presented (2)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (2) Duration of symptom")
        elif "Please state the date that the symptoms first appeared (2)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (2) Date symptoms first started (dd/mm/yyyy)")

        elif "Please state symptoms presented (3)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (3) Symptom")
        elif "Please state duration of symptoms presented (3)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (3) Duration of symptom")
        elif "Please state the date that the symptoms first appeared (3)" in name:
            set_field_with_confidence(field, combined, "Please state symptoms presented and date symptoms first appeared (rows 0..3) (3) Date symptoms first started (dd/mm/yyyy)")

        elif "What is the source of the above information? If \"Referring Doctor / Others\", please specify name (1)" in name:
            set_field_with_confidence(field, combined, "What is the source of the above information? If Referring Doctor / Others, specify name & address (rows 0..2) (1) Name")
        elif "What is the source of the above information? If \"Referring Doctor / Others\", please specify address (1)" in name:
            set_field_with_confidence(field, combined, "What is the source of the above information? If Referring Doctor / Others, specify name & address (rows 0..2) (1) Address")

        elif "What is the source of the above information? If \"Referring Doctor / Others\", please specify name (2)" in name:
            set_field_with_confidence(field, combined, "What is the source of the above information? If Referring Doctor / Others, specify name & address (rows 0..2) (2) Name")
        elif "What is the source of the above information? If \"Referring Doctor / Others\", please specify address (2)" in name:
            set_field_with_confidence(field, combined, "What is the source of the above information? If Referring Doctor / Others, specify name & address (rows 0..2) (2) Address")

        elif "What is the source of the above information?" in name:
            set_source_with_confidence(field, combined, "Source of above information")

        elif "Diagnosis was first made by (name of Doctor)" in name:
            set_field_with_confidence(field, combined, "Diagnosis was first made by (name of Doctor)")
        elif "Date when Cancer was FIRST diagnosed" in name:
            set_field_with_confidence(field, combined, "Date when Cancer was FIRST diagnosed (ddmmyyyy)")

        # --- Page 2 ---
        elif "Actual diagnosis" in name:
            set_field_with_confidence(field, combined, "Actual diagnosis")
        elif "Date when insured first became aware of this illness (ddmmyyyy)" in name:
            set_field_with_confidence(field, combined, "Date when insured first became aware of this illness (ddmmyyyy)")

        elif "Was the illness suffered by Life Assured caused directly or indirectly by alcohol or drug abuse? If \"yes\", please give details" in name:
            set_field_with_confidence(field, combined, "If illness caused directly or indirectly by alcohol or drug abuse, please give details")
        elif "Was the illness suffered by Life Assured caused directly or indirectly by alcohol or drug abuse?" in name:
            set_delete_with_confidence(field, combined, "Was the illness suffered by Life Assured caused directly or indirectly by alcohol or drug abuse?")

        elif "staging of the tumour" in name:
            set_field_with_confidence(field, combined, "What is the staging of the tumour?")
        elif "tumour classification" in name:
            set_field_with_confidence(field, combined, "Please state the tumour classification (eg TMN classification etc)")

        elif "Was the cancer completely localised?" in name:
            set_delete_with_confidence(field, combined, "Was the cancer completely localised?")
        elif "Was there invasion of tissues?" in name:
            set_delete_with_confidence(field, combined, "Was there invasion of tissues?")
        elif "Were regional lymph nodes involved?" in name:
            set_delete_with_confidence(field, combined, "Were regional lymph nodes involved?")
        elif "Were there distant metastases?" in name:
            set_delete_with_confidence(field, combined, "Were there distant metastases?")

        elif "Did the Life Assured undergo any surgery? If \"Yes\", please indicate the surgical procedure performed" in name:
            set_field_with_confidence(field, combined, "Surgical procedure performed")
        elif "Did the Life Assured undergo any surgery? If \"Yes\", state the date of surgery (ddmmyyyy)" in name:
            set_field_with_confidence(field, combined, "Date of surgery (ddmmyyyy)")
        elif "Did the Life Assured undergo any surgery?" in name:
            set_delete_with_confidence(field, combined, "Did the Life Assured undergo any surgery?")

        elif "Was there any other mode of treatment, other than surgery, which could be undertaken to treat the Life Assured's condition? If \"YES\", please specify type of treatment" in name:
            set_field_with_confidence(field, combined, "Type of treatment other than surgery that could be undertaken to treat condition")
        elif "Was there any other mode of treatment, other than surgery, which could be undertaken to treat the Life Assured's condition?" in name:
            set_delete_with_confidence(field, combined, "Was there any other mode of treatment, other than surgery, which could be undertaken to treat the Life Assured’s condition?")

        # --- Page 3 ---
        elif "Has the Life Assured underwent other mode of treatment? If \"Yes\", please state date of treatment (ddmmyyyy)" in name:
            set_field_with_confidence(field, combined, "Date of other treatment (ddmmyyyy)")
        elif "Has the Life Assured underwent other mode of treatment? If \"No\", please state why not" in name:
            set_field_with_confidence(field, combined, "Reason for no other mode of treatment")
        elif "Has the Life Assured underwent other mode of treatment?" in name:
            set_delete_with_confidence(field, combined, "Has the Life Assured undergone other mode of treatment?")


        elif "What other forms of treatment did the Life Assured undergo" in name:
            set_field_with_confidence(field, combined, "What other forms of treatment did the Life Assured undergo (eg chemotherapy, radiotherapy etc)?")
        elif "If diagnosis is leukaemia" in name:
            set_field_with_confidence(field, combined, "If diagnosis is leukaemia, please provide the type of leukaemia")
        elif "malignant melanoma" in name:
            set_field_with_confidence(field, combined, "If the diagnosis is malignant melanoma, please give full details of size, thickness (Breslow classification) and/or depth of invasion (Clark level)")

        elif "Is the diagnosis related to Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)? If \"Yes\", please provide the date of diagnosis for HIV / AIDS (ddmmyyyy)" in name:
            set_field_with_confidence(field, combined, "Date of diagnosis for HIV/AIDS (ddmmyyyy)")
        elif "Is the diagnosis related to Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)?" in name:
            set_delete_with_confidence(field, combined, "Is the diagnosis related to Human Immunodeficiency Virus (HIV) or Acquired Immune Deficiency Syndrome (AIDS)?")

        elif "Please describe the Life Assured's mental and cognitive abilities" in name:
            set_field_with_confidence(field, combined, "Life Assured’s mental and cognitive abilities")
        elif "Is the Life Assured mentally capable in accordance to the Mental Capacity Act (Chapter 177A of Singapore)? " in name:
            set_delete_with_confidence(field, combined, "Is Life Assured mentally capable?")

        # --- Page 4 ---
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state medical condition (1)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (1) Medical condition")
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state date of diagnosis (dd/mm/yyyy) (1)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (1) Diagnosis date (dd/mm/yyyy)")
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state name & address of treating doctor (1)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (1) Name & address of treating doctor")

        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state medical condition (2)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (2) Medical condition")
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state date of diagnosis (dd/mm/yyyy) (2)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (2) Diagnosis date (dd/mm/yyyy)")
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state name & address of treating doctor (2)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (2) Name & address of treating doctor")

        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state medical condition (3)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (3) Medical condition")
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state date of diagnosis (dd/mm/yyyy) (3)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (3) Diagnosis date (dd/mm/yyyy)")
        elif "Does the Life Assured have any other medical conditions? If \"YES\", please state name & address of treating doctor (1)" in name:
            set_field_with_confidence(field, combined, "Medical conditions, date of diagnosis, name & address of treating doctor (rows 0..3) (3) Name & address of treating doctor")

        elif "Does the Life Assured have any other medical conditions?" in name:
            set_delete_with_confidence(field, combined, "Does Life Assured have any other medical conditions?")

        elif "Does the Life Assured have any family history? If \"Yes\", please provide details of the nature of condition (1)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (1) Family history condition")
        elif "Does the Life Assured have any family history? If \"Yes\", please provide details including relationship to the Life Assured (1)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (1) Relationship to Life Assured")
        elif "Does the Life Assured have any family history? If \"Yes\", please provide details of the age of onset (1)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (1) Age of onset")

        elif "Does the Life Assured have any family history? If \"Yes\", please provide details of the nature of condition (2)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (2) Family history condition")
        elif "Does the Life Assured have any family history? If \"Yes\", please provide details including relationship to the Life Assured (2)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (2) Relationship to Life Assured")
        elif "Does the Life Assured have any family history? If \"Yes\", please provide details of the age of onset (2)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (2) Age of onset")

        elif "Does the Life Assured have any family history? If \"Yes\", please provide details of the nature of condition (3)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (3) Family history condition")
        elif "Does the Life Assured have any family history? If \"Yes\", please provide details including relationship to the Life Assured (3)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (3) Relationship to Life Assured")
        elif "Does the Life Assured have any family history? If \"Yes\", please provide details of the age of onset (3)" in name:
            set_field_with_confidence(field, combined, "Family History (rows 0..3) (3) Age of onset")

        elif "Does the Life Assured have any family history?" in name:
            set_delete_with_confidence(field, combined, "Does Life Assured have any family history?")

        elif "Please give details of the Life Assured's habits in relation to cigarette smoking, including the duration of smoking habit, number of cigarettes smoked per day and source of information" in name:
            set_field_with_confidence(field, combined, "Details of the Life Assured’s habits in relation to cigarette smoking, including the duration of smoking habit, number of cigarettes smoked per day and source of information")

        elif "Please give details of the Life Assured's habit in relation to alcohol consumption including the amount of alcohol consumption per day and source of information" in name:
            set_field_with_confidence(field, combined, "Details of the Life Assured’s habit in relation to alcohol consumption including the amount of alcohol consumption per day and source of information")

        elif "Please provide any other information which may be of assistance to us in assessing this claim" in name:
            set_field_with_confidence(field, combined, "Please provide any other information which may be of assistance to us in assessing this claim")

        # --- Default for unmapped fields ---
        else:
            field["field_value"] = ""
            field["confidence"] = ""

    return form_fields


In [None]:
import re
import json

def process_llm_output(input_text: str) -> dict:
    """
    Cleans an LLM output text file and converts it into a single flattened JSON file.

    Args:
        input_text (str): raw LLM text.

    Returns:
        dict: The merged and flattened JSON object.
    """

    # --- Helpers ---
    def clean_llm_output(text: str) -> str:
        text = re.sub(r"--- Page \d+ ---", "", text)
        text = re.sub(r"```json", "", text)
        text = re.sub(r"```", "", text)

        def _strip_comments(match):
            s = match.group(0)
            if s.startswith('"'):
                return s
            return re.sub(r"//.*", "", s)

        text = re.sub(r'"(?:\\.|[^"\\])*"|[^"\n]+', _strip_comments, text)
        text = re.sub(r",\s*([}\]])", r"\1", text)
        text = re.sub(r"[ \t]+", " ", text)
        text = re.sub(r"\n\s*\n", "\n", text)
        return text.strip()

    def extract_json_objects(text: str):
        objs = []
        n = len(text)
        i = 0
        in_str = False
        escape = False
        depth = 0
        start = None
        while i < n:
            ch = text[i]
            if ch == '"' and not escape:
                in_str = not in_str
            if not in_str:
                if ch == '{':
                    if depth == 0:
                        start = i
                    depth += 1
                elif ch == '}':
                    depth -= 1
                    if depth == 0 and start is not None:
                        objs.append(text[start:i+1])
                        start = None
            if ch == "\\" and not escape:
                escape = True
            else:
                escape = False
            i += 1
        if start is not None:
            tail = text[start:]
            opens = tail.count('{') - tail.count('}')
            opens_sq = tail.count('[') - tail.count(']')
            tail_fixed = tail + ('}' * opens) + (']' * opens_sq)
            objs.append(tail_fixed)
        return objs

    def repair_json(s: str) -> str:
        s = re.sub(r",\s*([}\]])", r"\1", s)
        open_curly = s.count('{') - s.count('}')
        open_sq = s.count('[') - s.count(']')
        if open_curly > 0:
            s += '}' * open_curly
        if open_sq > 0:
            s += ']' * open_sq
        return s

    def flatten_json(obj: dict, parent_key: str = "", sep: str = " ") -> dict:
        items = {}
        for k, v in obj.items():
            new_key = f"{parent_key}{sep}{k}".strip()
            if isinstance(v, dict):
                items.update(flatten_json(v, new_key, sep=sep))
            elif isinstance(v, list):
                for idx, elem in enumerate(v, 1):
                    if isinstance(elem, dict):
                        items.update(flatten_json(elem, f"{new_key} ({idx})", sep=sep))
                    else:
                        items[f"{new_key} ({idx})"] = elem
            else:
                items[new_key] = v
        return items

    # --- Read + clean ---
    cleaned = clean_llm_output(input_text)
    json_chunks = extract_json_objects(cleaned)

    merged = {}
    all_keys = set()

    for i, chunk in enumerate(json_chunks, 1):
        fixed = repair_json(chunk)
        try:
            obj = json.loads(fixed)
        except json.JSONDecodeError:
            try:
                first = fixed.find('{')
                last = fixed.rfind('}')
                if first != -1 and last != -1 and last > first:
                    candidate = repair_json(fixed[first:last+1])
                    obj = json.loads(candidate)
                else:
                    continue
            except Exception:
                continue

        flat = flatten_json(obj)
        merged.update(flat)
        all_keys.update(flat.keys())

    return merged

# Fill Form

In [None]:
import json
import fitz  # PyMuPDF library for PDF manipulation
from typing import Dict, Tuple, Union, List
import io

# HELPER FUNCTION: Set checkbox/radio button state
def _set_on_off(widget, should_check: bool) -> None:
    """
    Sets the state of a checkbox or radio button widget.

    Args:
        widget: PyMuPDF widget object (checkbox or radio button)
        should_check: Boolean indicating whether to check (True) or uncheck (False)
    """
    try:
        # Get the widget's "on" and "off" state values (usually "Yes"/"Off")
        on_val  = widget.on_state()  if hasattr(widget, "on_state")  else "Yes"
        off_val = widget.off_state() if hasattr(widget, "off_state") else "Off"

        # Set the appropriate value
        widget.field_value = on_val if should_check else off_val
        widget.update()  # Apply the change to the PDF
    except Exception as e:
        print(f"[warn] could not set widget '{widget.field_name}': {e}")

# HELPER FUNCTION: Auto-fit text to widget width
def _fit_text_to_width(widget: fitz.Widget, value: str, max_fs: float = 11.0,
                       min_fs: float = 6.0, pad: float = 2.0, fontname: str = "Helv") -> None:
    """
    Dynamically adjusts font size to fit text within the widget's width.
    Prevents text from being cut off in narrow fields.

    Args:
        widget: PyMuPDF text widget object
        value: Text string to be inserted
        max_fs: Maximum font size (default 11pt)
        min_fs: Minimum font size (default 6pt)
        pad: Padding on each side in points (default 2pt)
        fontname: Font name - using "Helv" (Helvetica) to avoid font resource issues
    """
    rect = fitz.Rect(widget.rect)
    # Calculate available width (widget width minus padding on both sides)
    avail = max(1.0, rect.width - 2 * pad)

    # Find the longest line in the text (handles multi-line values)
    lines = (value or "").splitlines() or [""]
    longest = max(lines, key=len)

    # Measure how wide the text would be at max font size
    width_at_max = fitz.get_text_length(longest, fontname=fontname, fontsize=max_fs)

    # If text fits at max size, use max size; otherwise scale down proportionally
    if width_at_max <= avail:
        fs = max_fs
    else:
        fs = max(min_fs, (avail * max_fs) / max(1.0, width_at_max))

    # Apply the calculated font size and value
    widget.text_font = fontname
    widget.text_fontsize = fs
    widget.field_value = value
    widget.update()  # Apply changes to the PDF

# MAIN FUNCTION: Fill PDF form and return as bytes (Flask-ready)
def fill_pdf_form(
    pdf_source: Union[str, bytes],
    form_data: Union[dict, str, List[dict]],
    flatten: bool = False
) -> bytes:
    """
    Fills a PDF form with data and returns the filled PDF as bytes.
    Perfect for Flask routes that serve PDFs directly.

    Args:
        pdf_source: Either a file path (str) or PDF bytes
        form_data: Either:
                   - A dict with "fields" key containing list of field dicts
                   - A JSON string representing the same structure
                   - A list of field dicts directly
        flatten: If True, converts form fields to static content (non-editable)

    Returns:
        bytes: The filled PDF as bytes, ready to be sent via Flask

    Example usage in Flask:
        @app.route('/fill-form', methods=['POST'])
        def fill_form():
            pdf_bytes = fill_pdf_form(
                pdf_source='template.pdf',
                form_data=request.json,
                flatten=False
            )
            return send_file(
                io.BytesIO(pdf_bytes),
                mimetype='application/pdf',
                as_attachment=True,
                download_name='filled_form.pdf'
            )
    """

    # STEP 1: Parse form data from various input formats
    if isinstance(form_data, str):
        # Parse JSON string
        data = json.loads(form_data)
        fields = data.get("fields", [])
    elif isinstance(form_data, list):
        # Direct list of fields
        fields = form_data
    elif isinstance(form_data, dict):
        # Dict with "fields" key or direct field mapping
        fields = form_data.get("fields", form_data if "field_name" in str(form_data) else [])
    else:
        raise ValueError("form_data must be dict, list, or JSON string")

    # STEP 2: Build lookup maps for efficient field matching
    # Map (page_number, field_name) -> field_value for exact page+name matches
    exact_map: Dict[Tuple[int, str], str] = {}
    # Map field_name -> list of values (for fields appearing on multiple pages)
    name_map: Dict[str, list] = {}

    for it in fields:
        page = int(it.get("page", 0))
        name = (it.get("field_name") or "").strip()
        val  = it.get("field_value", "")

        # Store in exact map if both page and name are available
        if page and name:
            exact_map[(page, name)] = val
        # Store in name map for all fields with names
        if name:
            name_map.setdefault(name, []).append(val)

    # STEP 3: Open the PDF document from file path or bytes
    if isinstance(pdf_source, bytes):
        doc = fitz.open(stream=pdf_source, filetype="pdf")
    else:
        doc = fitz.open(pdf_source)

    # STEP 4: Iterate through all pages and widgets (form fields)
    for page in doc:
        page_no = page.number + 1  # PyMuPDF uses 0-based indexing, convert to 1-based

        # Get all form field widgets on this page
        for w in (page.widgets() or []):
            fname = (w.field_name or "").strip()
            if not fname:
                continue  # Skip widgets without names

            # Find the value for this field
            value = None

            # First, try exact match (page + field name)
            if (page_no, fname) in exact_map:
                value = exact_map[(page_no, fname)]
            else:
                # If no exact match, try matching by name alone
                # Only use this if the field name appears exactly once in the JSON
                vals = name_map.get(fname)
                if vals and len(vals) == 1:
                    value = vals[0]

            if value is None:
                continue  # No value found for this field, skip it

            # Fill the field based on its type
            t = w.field_type
            try:
                # TEXT FIELD: Regular text input
                if t == fitz.PDF_WIDGET_TYPE_TEXT:
                    # Use auto-fit function to prevent text cutoff
                    _fit_text_to_width(w, "" if value is None else str(value), max_fs=11.0, min_fs=6.0)

                # CHECKBOX: On/off toggle
                # LLM outputs "Yes" in field_value when the checkbox should be ticked
                # Empty string or other values mean unchecked
                elif t == fitz.PDF_WIDGET_TYPE_CHECKBOX:
                    should_check = (str(value).strip().lower() == "yes")
                    _set_on_off(w, should_check)

                # RADIO BUTTON: Mutually exclusive options
                # LLM outputs "Yes" in field_value when the radio button should be selected
                elif t == fitz.PDF_WIDGET_TYPE_RADIOBUTTON:
                    should_check = (str(value).strip().lower() == "yes")
                    _set_on_off(w, should_check)

                # COMBOBOX: Dropdown with optional text input
                elif t == fitz.PDF_WIDGET_TYPE_COMBOBOX:
                    choices = w.choice_values or []
                    # Check if the combobox allows custom text entry
                    is_editable_flag = getattr(fitz, "PDF_CH_FIELD_IS_EDIT", 1 << 18)
                    is_editable = bool((w.field_flags or 0) & is_editable_flag)
                    val_str = "" if value is None else str(value)

                    # Only set value if it's in the choices OR the field is editable
                    if is_editable or (choices and val_str in choices):
                        w.field_value = val_str
                        w.update()

                # LISTBOX: Selection from a list
                elif t == fitz.PDF_WIDGET_TYPE_LISTBOX:
                    choices = w.choice_values or []
                    val_str = "" if value is None else str(value)
                    # Only set value if it exists in the available choices
                    if choices and val_str in choices:
                        w.field_value = val_str
                        w.update()

                # SIGNATURE: Digital signature field (skip filling)
                elif t == fitz.PDF_WIDGET_TYPE_SIGNATURE:
                    pass  # Signature fields require special handling

            except Exception as e:
                print(f"[warn] failed to set '{fname}' on page {page_no}: {e}")

    # STEP 5: Optionally flatten the PDF (make fields non-editable)
    if flatten:
        if hasattr(doc, "bake"):
            # "Bake" converts interactive fields to static content
            doc.bake(widgets=True, annots=False)
        else:
            print("[info] Flatten skipped: your PyMuPDF version lacks Document.bake().")

    # STEP 6: Return the filled PDF as bytes
    # Write PDF to a bytes buffer instead of file
    pdf_bytes = doc.tobytes(deflate=True)
    doc.close()

    return pdf_bytes

# Flask

In [None]:
import os, io, json, base64, threading, uuid, traceback
from flask import Flask, request, jsonify, send_file
from werkzeug.utils import secure_filename
from flask_cors import CORS
from pyngrok import ngrok

# Flask App Setup + CORS
app = Flask(__name__)
CORS(app, supports_credentials=True, resources={
    r"/*": {
        "origins": ["http://localhost:3000", "https://localhost:3000", "*"],
        "allow_headers": [
            "Content-Type",
            "Authorization",
            "ngrok-skip-browser-warning",
        ],
        "expose_headers": ["Content-Disposition", "Content-Type"],
        "methods": ["GET", "POST", "OPTIONS"],
    }
})

BASE_DIR = "/tmp/app"
os.makedirs(BASE_DIR, exist_ok=True)
jobs = {}

# Flask Utilities
def read_json(path):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def write_json(data, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

# Helper Functions: Query Ollama LLM
def query_ollama(prompt: str) -> str:
    import requests
    OLLAMA_URL = "http://localhost:11434/api/generate"
    MODEL_NAME = "phi4"
    payload = {"model": MODEL_NAME, "prompt": prompt}
    output = ""
    with requests.post(OLLAMA_URL, json=payload, stream=True) as r:
        for line in r.iter_lines():
            if line:
                data = json.loads(line.decode("utf-8"))
                if "response" in data:
                    output += data["response"]
    return output.strip()

# Main Logic: run_llm_extraction_with_rag()
def run_llm_extraction_with_rag(timeline: dict, insurer_type: str):
    """
    RAG + LLM extraction using unified timeline from PDFUploadProcessor.
    """
    import concurrent.futures

    if not timeline:
        print("Empty timeline — skipping RAG.")
        return {}

    # STEP 1: Select insurer configuration
    if insurer_type == "NTUC":
        print("Running NTUC RAG retrieval ...")
        all_retrieval_results = retrieve_rag(timeline, ntuc_queries)
        field_json_schemas = NTUC_FIELD_JSON_SCHEMAS
    elif insurer_type == "GE":
        print("Running GE RAG retrieval ...")
        all_retrieval_results = retrieve_rag(timeline, ge_queries)
        field_json_schemas = GE_FIELD_JSON_SCHEMAS
    else:
        raise ValueError(f"Unknown insurer_type = {insurer_type}")

    # STEP 2: LLM per page / segment
    def query_page(i, retrieved_text):
        prompt = build_prompt(retrieved_text, i, field_json_schemas)
        response = query_ollama(prompt)
        return i, f"\n--- Page {i} ---\n{response}"

    def run_all(all_retrieval_results, n_pages, field_json_schema, use_multithreading=True):
        results = {}
        if use_multithreading:
            with concurrent.futures.ThreadPoolExecutor(max_workers=n_pages) as executor:
                futures = []
                for i in range(1, n_pages + 1):
                    if i not in all_retrieval_results:
                        continue
                    text = all_retrieval_results[i].get("aggregated_text", "")
                    futures.append(executor.submit(query_page, i, text))
                for f in concurrent.futures.as_completed(futures):
                    i, output = f.result()
                    results[i] = output
        else:
            for i in range(1, n_pages + 1):
                if i not in all_retrieval_results:
                    continue
                text = all_retrieval_results[i].get("aggregated_text", "")
                i, output = query_page(i, text)
                results[i] = output
        return results

    # Call it here
    n_pages = min(len(all_retrieval_results.keys()), 5)
    results = run_all(all_retrieval_results, n_pages, field_json_schemas, use_multithreading=True)

    final_text = "\n".join([results[i] for i in sorted(results.keys())])
    combined_fields = process_llm_output(final_text)
    return combined_fields

# API ROUTES
@app.route("/health", methods=["GET"])
def health():
    return jsonify({"status": "ok"}), 200


@app.route("/ask", methods=["POST"])
def ask():
    job_id = str(uuid.uuid4())
    job_dir = os.path.join(BASE_DIR, job_id)
    os.makedirs(job_dir, exist_ok=True)

    # accept multiple input PDFs
    input_pdfs = request.files.getlist("input_pdfs")
    template_pdf = request.files.get("template_pdf")
    form_fields_json = request.files.get("form_fields_json")

    if not input_pdfs or not template_pdf or not form_fields_json:
        return jsonify({"error": "Missing one or more files"}), 400

    # save all PDF inputs
    for f in input_pdfs:
        save_path = os.path.join(job_dir, secure_filename(f.filename))
        f.save(save_path)

    template_path = os.path.join(job_dir, secure_filename(template_pdf.filename))
    template_pdf.save(template_path)

    json_path = os.path.join(job_dir, secure_filename(form_fields_json.filename))
    form_fields_json.save(json_path)

    # detect insurer
    fname = template_pdf.filename.lower()
    if "income" in fname:
        insurer_type = "NTUC"
    elif "ge" in fname or "greateastern" in fname:
        insurer_type = "GE"
    else:
        insurer_type = "UNKNOWN"
    print(f"[{job_id}] Insurer detected: {insurer_type}")

    jobs[job_id] = {"status": "pending", "result": None, "error": None}

    thread = threading.Thread(
        target=process_pipeline,
        args=(job_id, job_dir, template_path, json_path, job_dir, insurer_type),
        daemon=True,
    )
    thread.start()
    return jsonify({"job_id": job_id})


@app.route("/result/<job_id>", methods=["GET"])
def get_result(job_id):
    job = jobs.get(job_id)
    if not job:
        return jsonify({"error": "Job not found"}), 404
    if job["status"] == "completed":
        return jsonify({
            "status": "completed",
            "pdf_b64": job["result"]["pdf_b64"],
            "form_fields_filled": job["result"]["json"],
        })
    elif job["status"] == "error":
        return jsonify({"status": "error", "error": job["error"]})
    else:
        return jsonify({"status": "pending"})


@app.route("/download/<job_id>", methods=["GET"])
def download(job_id):
    job_dir = os.path.join(BASE_DIR, job_id)
    filled_path = os.path.join(job_dir, "filled_template.pdf")
    if not os.path.exists(filled_path):
        return jsonify({"error": "PDF not found"}), 404
    return send_file(filled_path, as_attachment=True, download_name=f"filled_{job_id}.pdf")

# PROCESSING PIPELINE 
def process_pipeline(job_id, input_dir, template_path, json_path, job_dir, insurer_type):
    try:
        print(f"[{job_id}] Starting pipeline for {insurer_type}...")

        form_fields = read_json(json_path)

        # STEP 1: OCR + Preprocessing (multi-file)
        result = process_uploads(input_dir, run_ocr=True, multi=True)
        timeline = result["timeline"]
        print(f"[{job_id}] Pre-processing complete — Timeline ready with {len(timeline)} entries.")

        # stop if no extracted info
        if not timeline or len(timeline) == 0:
            raise ValueError("No extracted information. Please check the uploaded files.")

        # STEP 2: RAG + LLM extraction
        combined_fields = run_llm_extraction_with_rag(timeline, insurer_type)
        combined_path = os.path.join(job_dir, "combined_fields.json")
        write_json(combined_fields, combined_path)
        print(f"[{job_id}] LLM extraction done. Fields: {len(combined_fields.keys())}")

        # STEP 3: Field mapping (insurer-specific)
        if insurer_type == "NTUC":
            filled_fields = map_combined_to_fields_income(combined_fields, form_fields)
        elif insurer_type == "GE":
            filled_fields = map_combined_to_fields_ge(combined_fields, form_fields)
        else:
            raise ValueError("Unsupported insurer form.")

        filled_json_path = os.path.join(job_dir, "form_fields_filled.json")
        write_json(filled_fields, filled_json_path)
        print(f"[{job_id}] Mapping done.")

        # STEP 4: Fill PDF
        filled_pdf_path = os.path.join(job_dir, "filled_template.pdf")
        pdf_bytes = fill_pdf_form(template_path, filled_fields)
        with open(filled_pdf_path, "wb") as f:
            f.write(pdf_bytes)
        print(f"[{job_id}] PDF filled successfully.")

        # STEP 5: Encode final output
        with open(filled_pdf_path, "rb") as f:
            b64_pdf = base64.b64encode(f.read()).decode("utf-8")

        jobs[job_id] = {
            "status": "completed",
            "result": {"pdf_b64": b64_pdf, "json": filled_fields},
            "error": None,
        }
        print(f"[{job_id}] Completed successfully.")

    except Exception as e:
        traceback.print_exc()
        jobs[job_id] = {"status": "error", "error": str(e)}
        print(f"[{job_id}] Error: {e}")

# Start Flask + ngrok
def run_flask():
    app.run(host="0.0.0.0", port=5000, threaded=True)

ngrok.set_auth_token("") # fill with ngrok key
public_tunnel = ngrok.connect(5000, "http")
public_url = public_tunnel.public_url
print("Public BASE_URL:", public_url)

threading.Thread(target=run_flask, daemon=True).start()
