In [34]:
import spacy
from transformers import pipeline
from transformers import AutoConfig
from transformers import AutoModelForSequenceClassification
from transformers import BertTokenizerFast
from collections import Counter
import re
import json
import torch

REVIEWING MODELS

In [35]:
# deBERTa : NER
"Helios9/BioMed_NER"

# DistilBART : Summarization
"Mahalingam/DistilBart-Med-Summary"

# RoBERTa : Sentiment Analysis
"hazarri/fine-tuned-roberta-sentiment"

# GEMINI API for SOAP NOTE

'hazarri/fine-tuned-roberta-sentiment'

Load Models

In [36]:
nlp = spacy.load("en_core_web_sm")
print("Loaded spaCy model: en_core_web_sm")
ner_model = pipeline("ner", model="Helios9/BioMED_NER",tokenizer="Helios9/BioMed_NER", aggregation_strategy="simple")
print("Loaded NER model: Helios9/BioMED_NER")
summarizer_pipeline = pipeline("summarization", model="Mahalingam/DistilBart-Med-Summary")
print("Loaded Summarization model: Mahalingam/DistilBart-Med-Summary")

sentiment_model = pipeline("sentiment-analysis", model="hazarri/fine-tuned-roberta-sentiment")
print("Loaded Sentiment Analysis model: hazarri/fine-tuned-roberta-sentiment")

Loaded spaCy model: en_core_web_sm


Device set to use cuda:0


Loaded NER model: Helios9/BioMED_NER


Device set to use cuda:0


Loaded Summarization model: Mahalingam/DistilBart-Med-Summary


Device set to use cuda:0


Loaded Sentiment Analysis model: hazarri/fine-tuned-roberta-sentiment


Review Model's labels

In [37]:
model_name = "hazarri/fine-tuned-roberta-sentiment"
# Load the model's configuration
config = AutoConfig.from_pretrained(model_name)
# The id2label dictionary contains the mapping from output IDs to label names
labels = config.id2label

# Print all the labels
print(f"All entity labels supported by '{model_name}':\n")
for i, label in labels.items():
    print(f"ID {i}: {label}")

# Get a simple list of the unique entity types:
unique_entity_types = sorted(list(set([label.split('-')[1] for label in labels.values() if '-' in label])))
print(f"\nUnique entity categories: {unique_entity_types}")

All entity labels supported by 'hazarri/fine-tuned-roberta-sentiment':

ID 0: Negative
ID 1: Neutral
ID 2: Positive

Unique entity categories: []


In [38]:
model_name = "Helios9/BioMed_NER"
# Load the model's configuration
config = AutoConfig.from_pretrained(model_name)
# The id2label dictionary contains the mapping from output IDs to label names
labels = config.id2label

# Print all the labels
print(f"All entity labels supported by '{model_name}':\n")
for i, label in labels.items():
    print(f"ID {i}: {label}")

# Get a simple list of the unique entity types:
unique_entity_types = sorted(list(set([label.split('-')[1] for label in labels.values() if '-' in label])))
print(f"\nUnique entity categories: {unique_entity_types}")

All entity labels supported by 'Helios9/BioMed_NER':

ID 0: O
ID 1: B-Activity
ID 2: I-Activity
ID 3: B-Administration
ID 4: I-Administration
ID 5: B-Age
ID 6: I-Age
ID 7: B-Area
ID 8: I-Area
ID 9: B-Biological_attribute
ID 10: I-Biological_attribute
ID 11: B-Biological_structure
ID 12: I-Biological_structure
ID 13: B-Clinical_event
ID 14: I-Clinical_event
ID 15: B-Color
ID 16: I-Color
ID 17: B-Coreference
ID 18: I-Coreference
ID 19: B-Date
ID 20: I-Date
ID 21: B-Detailed_description
ID 22: I-Detailed_description
ID 23: B-Diagnostic_procedure
ID 24: I-Diagnostic_procedure
ID 25: B-Disease_disorder
ID 26: I-Disease_disorder
ID 27: B-Distance
ID 28: I-Distance
ID 29: B-Dosage
ID 30: I-Dosage
ID 31: B-Duration
ID 32: I-Duration
ID 33: B-Family_history
ID 34: I-Family_history
ID 35: B-Frequency
ID 36: I-Frequency
ID 37: B-Height
ID 38: I-Height
ID 39: B-History
ID 40: I-History
ID 41: B-Lab_value
ID 42: I-Lab_value
ID 43: B-Mass
ID 44: I-Mass
ID 45: B-Medication
ID 46: I-Medication
ID 47: 

# 1. MEDICAL NLP SUMMARIZATION

OPEN TRANSCRIPT

In [39]:
with open('transcript.txt', 'r', encoding='utf-8') as file:
    TRANSCRIPT = file.read()

Get Name

In [40]:
def run_spacy_ner(text: str, nlp):
    """Runs a spaCy NER model and returns the patient name."""
    print("Running spaCy (en_core_web_sm) for General NER \n")

    # check is found name entity has a doctor prefix
    def has_doctor_prefix(ent, doc) -> bool:
      # Token-based check: up to 2 tokens before entity start
        start_tok = ent.start  # token index where the entity starts
        prev_text = doc[max(0, start_tok - 2): start_tok].text.lower().strip()
        if re.search(r"(?:^|\s)(dr\.?|doctor)\s*$", prev_text):
            return True

        # Char-based fallback: look a few chars before the entity start to catch 'Dr.John' / 'Dr-John'
        left_ctx = text[max(0, ent.start_char - 6): ent.start_char].lower()
        if re.search(r"(?:^|[\s,.-])dr\.?\s*$|doctor\s*$", left_ctx):
            return True

        return False

    doc = nlp(text)
    patient_name = ""
    for ent in doc.ents:
        if ent.label_ == "PERSON" and not patient_name and not has_doctor_prefix(ent, doc):
            patient_name = ent.text  # Assume the first PERSON found is patient name
            break
    return patient_name

Get merged entities from Medical NER

In [41]:
def run_helios_ner_with_offsets(text, ner_model):
    """
    Runs NER on chunks and adjusts entity indices to be relative to the full text.
    """
    text_chunks = text.split('\n\n')
    all_ner_results = []
    current_offset = 0
    print("Running Helios9/BioMED_NER for Medical NER \n ")
    for chunk in text_chunks:
        if chunk.strip():
            chunk_results = ner_model(chunk)
            for entity in chunk_results:
                entity['start'] += current_offset
                entity['end'] += current_offset
            all_ner_results.extend(chunk_results)
        current_offset += len(chunk) + 2
    return all_ner_results

def merge_adjacent_entities(entities, text):
    """
    Merges entities that are consecutive (adjacent) and of the same type.
    """
    if not entities:
        return []
    entities = sorted(entities, key=lambda x: x['start'])
    merged_entities = []
    current_entity = entities[0]
    for i in range(1, len(entities)):
        next_entity = entities[i]
        text_between = text[current_entity['end']:next_entity['start']]
        if (next_entity['entity_group'] == current_entity['entity_group'] and
            len(text_between) <= 1):  # Allow for '' or ' ' or '-'
            current_entity['word'] = text[current_entity['start']:next_entity['end']]
            current_entity['end'] = next_entity['end']
            current_entity['score'] = max(current_entity['score'], next_entity['score'])
        else:
            merged_entities.append(current_entity)
            current_entity = next_entity
    merged_entities.append(current_entity)
    # Remove low confidence score entities with different thresholds per type
    # KEYWORD EXTRACTION
    merged_entities = [entity for entity in merged_entities
                        if entity['score'] >= (0.6 if entity['entity_group'] == "Sign_symptom"
                                          else 0.4 if entity['entity_group'] == "Medication"
                                          else 0.7 if entity['entity_group'] == "Disease_disorder"
                                          else 0.1 if entity['entity_group'] == "Lab_value"
                                          else 0.5)]
    return merged_entities

print merged_entities (debug)

In [42]:
def print_merged_entities(merged_entities):
    print("\n Found Merged Entities : \n ")
    for entity in merged_entities:
        print(f"{entity['entity_group']}: {entity['word']} (Score: {entity['score']:.2f})")

KeyWord Extraction

In [43]:
def extract_medical_keywords(merged_entities, top_k=12):

    relevant_keywords = []

    # Define which entity types should be included as medical keywords
    keyword_entity_types = {
        'Disease_disorder',
        'Medication',
        'Therapeutic_procedure',
        'Diagnostic_procedure',
        'Dosage'
    }

    # Extract relevant entities
    for entity in merged_entities:
        entity_group = entity['entity_group']
        entity_word = entity['word'].strip().lower()

        # Include primary medical keywords
        if entity_group in keyword_entity_types:
            if entity_word not in relevant_keywords:
                relevant_keywords.append({
                    'keyword': entity_word,
                    'score': entity['score'],
                    'type': entity_group
                })

    # Sort by clinical importance (score) and remove duplicates
    relevant_keywords.sort(key=lambda x: x['score'], reverse=True)

    # Return just the keyword strings (top K)
    return [kw['keyword'] for kw in relevant_keywords[:top_k]]


Filter Negated Entities

In [44]:
def segment_dialogue_turns(transcript_text):
    dialogue_turns = []
    lines = transcript_text.split('\n')
    current_pos = 0

    for line in lines:
        line = line.strip()
        if not line:
            continue
        # Assume lines start with "Physician:" or "Patient:"
        if line.startswith("Physician:"):
            speaker = "Physician"
            utterance = line[len("Physician:"):].strip()
        elif line.startswith("Patient:"):
            speaker = "Patient"
            utterance = line[len("Patient:"):].strip()
        else:
            # Possibly continuation of previous speaker, handle as needed
            continue

        start = transcript_text.find(line, current_pos)
        end = start + len(line)
        dialogue_turns.append({"speaker": speaker, "text": utterance, "start": start, "end": end})
        current_pos = end

    return dialogue_turns



def filter_negated_entities_spacy(transcript_text, entity_phrases, nlp, dialogue_turns):
    """
    Cross-turn dialogue negation handling.

    Args:
    - transcript_text (str): Full transcript text.
    - entity_phrases (iterable of str): Extracted medical entities to filter.
    - nlp: spaCy language model.
    - dialogue_turns (list of dict): List of {"speaker": ..., "text": ..., "start": ..., "end": ...} dicts.

    Returns:
    - Set of entity_phrases NOT negated.

    """

    negation_cues = {"no", "not", "without", "denies", "absent", "never", "none", "free of", "not present"}
    filtered_entities = set()
    doc = nlp(transcript_text)

    for entity in entity_phrases:
        found_negation = False

        # Find all occurrences of the entity phrase (case-insensitive)
        for match in re.finditer(re.escape(entity), transcript_text, re.IGNORECASE):
            e_start, e_end = match.start(), match.end()
            span = doc.char_span(e_start, e_end)
            if span is None:
                # Unable to map char span to spaCy span, skip this occurrence
                continue

            # Find dialogue turn containing this entity occurrence
            current_turn = None
            next_turn = None
            for i, turn in enumerate(dialogue_turns):
                if turn["start"] <= e_start < turn["end"]:
                    current_turn = turn
                    if i + 1 < len(dialogue_turns):
                        next_turn = dialogue_turns[i+1]
                    break

            # Check negation cues within the same utterance (intra-turn negation)
            if current_turn is not None:
                utterance_lower = current_turn["text"].lower()
                if any(cue in utterance_lower for cue in negation_cues):
                    found_negation = True
            else:
                # Fallback: check sentence negation if audio turn not found
                if any(tok.lower_ in negation_cues for tok in span.sent):
                    found_negation = True

            # If entity in physician turn and not negated in the same turn,
            # check next patient turn for negation (inter-turn negation)
            if not found_negation and current_turn is not None:
                if current_turn["speaker"].lower() == "physician" and next_turn is not None and next_turn["speaker"].lower() == "patient":
                    next_turn_lower = next_turn["text"].lower()
                    if any(cue in next_turn_lower for cue in negation_cues):
                        found_negation = True

            # For entity in patient turn, also verify negation cues occur BEFORE entity in that utterance
            if not found_negation and current_turn is not None and current_turn["speaker"].lower() == "patient":
                before_entity_text = transcript_text[current_turn["start"]:e_start].lower()
                if any(re.search(r'\b' + re.escape(cue) + r'\b', before_entity_text) for cue in negation_cues):
                    found_negation = True

            if found_negation:
                break

        if not found_negation:
            filtered_entities.add(entity)

    return filtered_entities


Get Transcript Summary

In [45]:
def run_bart_summarizer(text: str, summarizer_pipeline):
    print("Running BART Summarizer \n")
    summary = summarizer_pipeline(text, max_length=150, min_length=120, do_sample=False)
    return summary[0]['summary_text']

Define OUTPUT SETS

In [46]:
symptoms = set()
diagnosis = set()
treatments = set()

Get SYMPTOMS Set

In [47]:

def get_symptoms_set(merged_entities, transcript_text):
    i = 0
    while i < len(merged_entities):
        entity = merged_entities[i]
        # Merge biological_structure with adjacent sign_symptom

        # If current entity is Biological_structure, check for runs of adjacent Biological_structures
        if entity['entity_group'] == "Biological_structure":
            # Collect adjacent biological structures (including joined by commas or 'and'/'or')
            bio_structs_indices = [i]
            j = i + 1
            while j < len(merged_entities) and merged_entities[j]['entity_group'] == "Biological_structure":
                bio_structs_indices.append(j)
                j += 1
            # Check if sign_symptom is immediately before the first Biological_structure
            symptom_indices = []
            if i - 1 >= 0 and merged_entities[i - 1]['entity_group'] == "Sign_symptom" and \
                merged_entities[i]['start'] - merged_entities[i - 1]['end'] <= 3:  # small threshold for punctuation/space
                    symptom_indices.append(i - 1)
            # Check if sign_symptom is immediately after the last Biological_structure
            if j < len(merged_entities) and merged_entities[j]['entity_group'] == "Sign_symptom" and \
                merged_entities[j]['start'] - merged_entities[bio_structs_indices[-1]]['end'] <= 3:
                    symptom_indices.append(j)
            # For each detected sign_symptom before or after, merge all bio structures with it
            for sym_idx in symptom_indices:
                symptom_word = transcript_text[merged_entities[sym_idx]['start']:merged_entities[sym_idx]['end']].strip().lower()
                for bio_idx in bio_structs_indices:
                    bio_word = transcript_text[merged_entities[bio_idx]['start']:merged_entities[bio_idx]['end']].strip().lower()
                    merged_symptom = f"{bio_word} {symptom_word}"
                    symptoms.add(merged_symptom)
            i = bio_structs_indices[-1] + 1
            continue

        # If standalone sign_symptom (not part of recent merge)
        if entity['entity_group'] == "Sign_symptom":
            prev_bio = (i > 0 and merged_entities[i-1]['entity_group'] == "Biological_structure"
                        and merged_entities[i-1]['end'] >= entity['start'] - 1)
            if not prev_bio:
                symptoms.add(entity['word'].strip().lower())
        i += 1
    #end of while loop
    return symptoms


Get DIAGNOSIS Set

In [48]:
def get_diagnosis_set(merged_entities):
    # Fill diagnosis set with Disease_disorder entities
    for entity in merged_entities:
        if entity['entity_group'] == "Disease_disorder":
            diagnosis.add(entity['word'].strip().capitalize())
    return diagnosis

Get TREATMENT Set

In [49]:
def get_treatment_set(merged_entities, transcript_text):
    # Fill TREATMENT set with Medication and Therapeutic_procedure entities
    ''' If a numerical-type entity (Lab_value, Dosage, Duration, Frequency) appears
        immediately before a Medication or Therapeutic_procedure entity, combine them.'''
    quantity_entities = [ent for ent in merged_entities if ent['entity_group'] in {"Lab_value", "Dosage", "Duration", "Frequency", "Detailed_description"}]
    treatment_entities = [ent for ent in merged_entities if ent['entity_group'] in {"Medication", "Therapeutic_procedure"}]

    used_quantities = set() # To prevent using the same quantity twice

    for treat_ent in treatment_entities:
        found_link = False
        # Look for a quantity entity within a 30-character window BEFORE the treatment
        window_start = treat_ent['start'] - 30

        for quant_ent in quantity_entities:
            # Check if the quantity is within the window and hasn't been used yet
            if quant_ent['end'] < treat_ent['start'] and quant_ent['start'] >= window_start and id(quant_ent) not in used_quantities:

                # Check if the text between them is short (e.g., " sessions of ")
                text_between = transcript_text[quant_ent['end']:treat_ent['start']]
                if len(text_between.strip().split()) <= 3:
                    combined_phrase = f"{quant_ent['word']} units of {treat_ent['word']}"
                    treatments.add(combined_phrase.strip().lower())
                    used_quantities.add(id(quant_ent)) # Mark this quantity as used
                    found_link = True
                    break # Found a link, move to the next treatment

        # If no quantity was linked, add the treatment by itself
        if not found_link:
            treatments.add(treat_ent['word'].strip().lower())

    # --- End of Treatment Extraction ---
    return treatments

Get CURRENT_STATUS & PROGNOSIS

In [50]:
def get_current_status(bart_summary):
    patterns = [
        r"(currently.*?)(?:\.|,|;|$)",
        r"(still.*?)(?:\.|,|;|$)",
        r"(occasional.*?)(?:\.|,|;|$)",
        r"(patient is.*?stable.*?)(?:\.|,|;|$)",
        r"(symptoms have (improved|resolved|persisted).*?)(?:\.|,|;|$)",
        r"(no new symptoms?.*?)(?:\.|,|;|$)",
        r"(functioning well.*?)(?:\.|,|;|$)",
        r"(able to.*?perform daily activities.*?)(?:\.|,|;|$)",
        r"(signs of improvement.*?)(?:\.|,|;|$)",
        r"(ongoing treatment.*?)(?:\.|,|;|$)"
    ]

    for p in patterns:
        match = re.search(p, bart_summary, re.IGNORECASE)
        if match:
            return match.group(1).strip().capitalize()
    return ""

def get_prognosis(bart_summary):
    patterns = [
        r"(prognosis.*?)(?:\.|,|;|$)",
        r"(expect.?full recovery.?)(?:\.|,|;|$)",
        r"(full recovery.*?)(?:\.|,|;|$)",
        r"(recovery.*?)(?:\.|,|;|$)",
        r"(recover.*?)(?:\.|,|;|$)",
        r"(no long-term.*?)(?:\.|,|;|$)",
        r"(expected to recover.*?)(?:\.|,|;|$)",
        r"(likely to recover.*?)(?:\.|,|;|$)",
        r"(on track for.*?recovery.*?)(?:\.|,|;|$)",
        r"(good prognosis.*?)(?:\.|,|;|$)",
        r"(no evidence of.*?long-term damage.*?)(?:\.|,|;|$)",
        r"(expected to improve.*?)(?:\.|,|;|$)",
        r"(recovery anticipated within.*?)(?:\.|,|;|$)",
        r"(no chronic complications.*?)(?:\.|,|;|$)"
    ]

    for p in patterns:
        match = re.search(p, bart_summary, re.IGNORECASE)
        if match:
            return match.group(1).strip().capitalize()
    return ""



 GENERATE SUMMARY

In [51]:
def summary_generator(transcript_text, merged_entities, patient_name):
    diagnosis = get_diagnosis_set(merged_entities)
    symptoms = get_symptoms_set(merged_entities, transcript_text)
    treatments = get_treatment_set(merged_entities, transcript_text)

    bart_summary = run_bart_summarizer(transcript_text, summarizer_pipeline)

    current_status = get_current_status(bart_summary)
    prognosis = get_prognosis(bart_summary)
    medical_keywords_found = extract_medical_keywords(merged_entities, top_k=12)

    dialogue_turns = segment_dialogue_turns(TRANSCRIPT)
    filtered_diagnosis = filter_negated_entities_spacy(transcript_text, diagnosis, nlp, dialogue_turns)
    filtered_symptoms = filter_negated_entities_spacy(transcript_text, symptoms, nlp, dialogue_turns)
    filtered_treatments = filter_negated_entities_spacy(transcript_text, treatments, nlp, dialogue_turns)

    print("\n GENERATED MEDICAL SUMMARY : \n")
    summary = {
        "Patient_Name": patient_name,
        "Symptoms": [", ".join(sorted([s.capitalize() for s in filtered_symptoms]))],
        "Diagnosis": [", ".join(sorted([d.capitalize() for d in filtered_diagnosis]))],
        "Treatment": [", ".join(sorted([t.capitalize() for t in filtered_treatments]))],
        "Current_Status": current_status,
        "Prognosis": prognosis,
        "Medical_Keywords": [", ".join(sorted([m.capitalize() for m in medical_keywords_found]))],
        "Summary": bart_summary
    }
    print(json.dumps(summary, indent=2))
    return summary

# 2. SENTIMENT ANALYSIS

In [52]:
def predict_sentiment(text: str) -> str:
    """
    Predicts sentiment label from the Hazarri medical model.
    Maps to Anxious/Neutral/Reassured.
    """
    RAW2CLIN = {
    "Negative": "Anxious",
    "Neutral":  "Neutral",
    "Positive": "Reassured"
    }
    if not text or not text.strip():
        return "Neutral"
    out = sentiment_model(text, truncation=True, max_length=256)
    # returns a list of dicts: [{'label': 'Positive', 'score': 0.98}]
    label = out[0]["label"]
    return RAW2CLIN.get(label, "Neutral")

def detect_intent(text: str) -> str:
    """
    Rule-based intent detection.
    """
    text_low = text.lower()
    if re.search(r"\b(worried|anxious|concerned|fear|scared)\b", text_low):
        return "Expressing concern"
    if re.search(r"\b(hope|get better|reassure|okay|improve|recover)\b", text_low):
        return "Seeking reassurance"
    if re.search(r"\b(pain|ache|hurt|symptom)\b", text_low):
        return "Reporting symptoms"
    return "Neutral/Other"


In [53]:
def get_sentiment_analysis(transcript_text: str):
  # Run sentiment analysis only on patient utterances
    dialogue_turns = segment_dialogue_turns(transcript_text)
    # get patient utterances from segment dialogue function
    patient_utterances = [
        turn["text"] for turn in dialogue_turns
        if turn["speaker"] == "Patient"
    ]
    # Run sentiment & intent on each utterance
    sentiments = []
    intents    = []
    for utt in patient_utterances:
        s = predict_sentiment(utt)
        i = detect_intent(utt)
        sentiments.append(s)
        intents.append(i)

    # Aggregate with majority
    def majority_vote(labels, neutral_label):
        filtered = [lbl for lbl in labels if lbl != neutral_label]
        return Counter(filtered).most_common(1)[0][0] if filtered else neutral_label

    final_sentiment = majority_vote(sentiments, "Neutral")
    final_intent    = majority_vote(intents,    "Neutral/Other")


    print("\n GENERATED SENTIMENT ANALYSIS : \n")
    sentiment_summary = {
        "Sentiment": final_sentiment,
        "Intent":   final_intent
    }
    print(json.dumps(sentiment_summary, indent=2))
    return sentiment_summary


In [54]:
!pip install google-genai pydantic



# 3. SOAP NOTE GENERATION

In [55]:
import os
from typing import Optional
from pydantic import BaseModel, Field
from google import genai
from google.genai import types

# Get your api key

os.environ["GEMINI_API_KEY"] = "AIzaSyCXBi7N6f7GjcJ6Hu8tq_QCTC2mp_j4Szc"

# Define a strict JSON schema using Pydantic (auto-converted to JSON Schema by the SDK)
class Subjective(BaseModel):
    Chief_Complaint: str = Field(default="")
    History_of_Present_Illness: str = Field(default="")

class Objective(BaseModel):
    Physical_Exam: str = Field(default="")
    Observations: str = Field(default="")

class Assessment(BaseModel):
    Diagnosis: str = Field(default="")
    Severity: str = Field(default="")

class Plan(BaseModel):
    Treatment: str = Field(default="")
    Follow_Up: str = Field(default="")

class SOAPNote(BaseModel):
    Subjective: Subjective
    Objective: Objective
    Assessment: Assessment
    Plan: Plan

def generate_soap_with_gemini(transcript_text: str,
                              model_name: str = "gemini-2.5-flash") -> dict:

    # System instruction to constrain behavior
    system_instruction = (
        "You are a medical scribe. Use only the provided transcript to produce a concise, "
        "clinically coherent SOAP note in complete sentences. Do not add extraneous text. "
        "If a field is not explicitly mentioned, leave it as an empty string."
    )

    client = genai.Client()  # Reads GEMINI_API_KEY from env by default

    # Configure strict JSON output bound to the SOAPNote schema
    config = types.GenerateContentConfig(
        system_instruction=system_instruction,
        response_mime_type="application/json",
        response_schema=SOAPNote,  # The SDK converts this to JSON Schema
        temperature=0.2,
    )

    # Call the model
    response = client.models.generate_content(
        model=model_name,
        contents=transcript_text,
        config=config,
    )

    # Prefer parsed Pydantic object if available; otherwise parse JSON text
    try:
        parsed = response.parsed  # -> SOAPNote instance when response_schema is set
        if parsed:
            # Pydantic v2: model_dump; v1: dict()
            return parsed.model_dump()
    except Exception:
        pass

    # Fallback to JSON text
    return json.loads(response.text)



# Create pipeline

save json

In [56]:
from pathlib import Path
def save_json(data: dict, output_path: str):
    out_file = Path(output_path)
    out_file.parent.mkdir(parents=True, exist_ok=True)
    with open(out_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

final pipeline

In [57]:
def run_pipeline(transcript_text):
    patient_name = run_spacy_ner(transcript_text, nlp)
    #print(f"Extracted Patient Name: {patient_name}")

    ner_results = run_helios_ner_with_offsets(transcript_text, ner_model)
    merged_entities = merge_adjacent_entities(ner_results, transcript_text)
    #print_merged_entities(merged_entities)  # Debug print of merged entities

    patient_summary = summary_generator(transcript_text, merged_entities, patient_name)
    patient_sentiment_intent = get_sentiment_analysis(transcript_text)

    print("\n GENERATED SOAP NOTE : \n")
    soap_json = generate_soap_with_gemini(transcript_text)
    print(json.dumps(soap_json, indent=2))

    save_json(patient_summary, "output/patient_summary.json")
    save_json(patient_sentiment_intent, "output/patient_sentiment_intent.json")
    save_json(soap_json, "output/patient_soap_summary.json")



In [58]:
if __name__ == "__main__":
    run_pipeline(TRANSCRIPT)

Running spaCy (en_core_web_sm) for General NER 



Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Running Helios9/BioMED_NER for Medical NER 
 
Running BART Summarizer 


 GENERATED MEDICAL SUMMARY : 

{
  "Patient_Name": "Jones",
  "Symptoms": [
    "Back pain, Neck pain, Stiffness"
  ],
  "Diagnosis": [
    "Whiplash injury"
  ],
  "Treatment": [
    "Painkillers, Ten units of physiotherapy"
  ],
  "Current_Status": "Still gets occasional backaches",
  "Prognosis": "Full recovery within six months of the accident",
  "Medical_Keywords": [
    "Mobility, Painkillers, Physical examination, Physiotherapy, Range of movement, Whiplash injury, X-rays"
  ],
  "Summary": "Patient had a car accident on September 1st. Another car hit her from behind and pushed her car into the one in front. She hit her head on the steering wheel. She had to take painkillers and physiotherapy to help with the stiffness and discomfort. She still gets occasional backaches, but it's nothing like before. She took a week off work and is back to her usual routine. She should make a full recovery within six months