In [None]:
import yaml

def format_dict(data, indent=0):
    """
    Recursively formats a dictionary or list into a readable string.
    """
    formatted = []
    prefix = "  " * indent  # Indentation for nesting
    if isinstance(data, dict):
        for key, value in data.items():
            formatted.append(f"{prefix}{key}:")
            formatted.append(format_dict(value, indent + 1))
    elif isinstance(data, list):
        for item in data:
            formatted.append(f"{prefix}- {format_dict(item, indent + 1).strip()}")
    else:
        # For strings, numbers, or other types
        formatted.append(f"{prefix}{data}")
    return "\n".join(formatted)


def readable_string(data):
    if isinstance(data, dict):
        # return json.dumps(data, indent=2, ensure_ascii=False) 
        return format_dict(data)
    return data


# def get_diagnosis(hd):
#     try:
#         stage = hd["estagio_drc"]
#         drc = ""
#         drc += f"DRC: G{stage['grau']}"
#         if alb := stage.get("albuminuria"):
#             drc += f"/A{alb}"
#         drc += "\nFunção renal atual: " + stage.get("funcao_rim_atual")
#         if et := hd.get("etiologia_doença_de_base"):
#             drc += "\nEtiologia: " + et
#         return drc
#     except TypeError:
#         return hd

def get_diagnosis(hd):
    try:
        stage = hd["estagio_drc"]
        drc = ""
        drc += f"{stage['grau']}"
        if et := hd.get("etiologia_doença_de_base"):
            drc += " " + et
        return drc
    except TypeError:
        return hd


def get_field(summary, field):
    field = summary["relatorio_consulta_ambulatorial_nefrologia"][field]
    if isinstance(field, list):
        field = [readable_string(x) for x in field]
        return "\n".join(field)
    else:
        return field


def get_summary(patient_id):
    with open(
        f"Dataset-Hucam-Nefro/patient_{patient_id:03d}/patient_{patient_id:03d}_medical_summary.yaml"
    ) as fp:
        return yaml.safe_load(fp)
    
def get_prediction(patient_id):
    with open(
        f"results/patient_{patient_id}/outputs.yaml"
    ) as fp:
        return yaml.safe_load(fp)

def load_yaml(file_path):
    """Load a YAML file and return its content."""
    with open(file_path, "r", encoding="utf-8") as f:
        return yaml.safe_load(f)


def get_field(data, field_path):
    """Safely retrieve a nested field from a dictionary."""
    keys = field_path.split(".")
    for key in keys:
        data = data.get(key, {})
    return data

In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModel
import torch

# Load the model and tokenizer
model_name = "dmis-lab/biobert-v1.1"
# model_name = "Clinical-AI-Apollo/Medical-NER"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

def evaluate_patient(patient_id):
    """Evaluate a single patient's predictions against the ground truth."""
    # Load YAML files
    ground_truth = get_summary(patient_id)
    prediction = get_prediction(patient_id)

    # Extract "grau" (CKD stage)
    try:
        true_grau = ground_truth["relatorio_consulta_ambulatorial_nefrologia"]["hipoteses_diagnosticas"]["estagio_drc"]["grau"]
        pred_grau = prediction["hipoteses_diagnosticas"]["estagio_drc"]["grau"]
    except Exception as e:
        true_grau = None
        pred_grau = None
    

    pred_diagnosis = get_diagnosis(prediction["hipoteses_diagnosticas"])
    gt_diagnosis = get_diagnosis(ground_truth["relatorio_consulta_ambulatorial_nefrologia"]["hipoteses_diagnosticas"])
    
    # Tokenize and encode
    inputs1 = tokenizer(gt_diagnosis, return_tensors="pt", truncation=True, padding=True)
    inputs2 = tokenizer("Origem da doença renal: " +  pred_diagnosis, return_tensors="pt", truncation=True, padding=True)
    # inputs2 = tokenizer(pred_diagnosis, return_tensors="pt", truncation=True, padding=True)

    # Generate embeddings
    with torch.no_grad():
        embedding1 = model(**inputs1).last_hidden_state.mean(dim=1)
        embedding2 = model(**inputs2).last_hidden_state.mean(dim=1)

    # Compute similarity (medical semantic similarity)
    mss_diagnosis = torch.nn.functional.cosine_similarity(embedding1, embedding2)[0].abs().item()

    # Evaluate conducts
    gt_conduct = get_field(ground_truth["relatorio_consulta_ambulatorial_nefrologia"], "conduta")
    pred_conduct = get_field(prediction, "conduta")

    gt_conduct = [readable_string(x) for x in gt_conduct]
    gt_conduct = "\n".join(gt_conduct)

    pred_conduct = [readable_string(x) for x in pred_conduct]
    pred_conduct = "\n".join(pred_conduct)

    inputs1 = tokenizer(gt_conduct, return_tensors="pt", truncation=True, padding=True)
    inputs2 = tokenizer(pred_conduct, return_tensors="pt", truncation=True, padding=True)
    
    with torch.no_grad():
        embedding1 = model(**inputs1).last_hidden_state.mean(dim=1)
        embedding2 = model(**inputs2).last_hidden_state.mean(dim=1)
        
    cond_cosine = torch.nn.functional.cosine_similarity(embedding1, embedding2)[0].abs().item()
    
    # evalute impression
    gt_impression = ground_truth["relatorio_consulta_ambulatorial_nefrologia"]["impressão"]
    pred_impression = prediction["impressão"]
    
    if not gt_impression or not pred_impression:
        imp_cosine = None
    else:
        gt_impression = [readable_string(x) for x in gt_impression]
        gt_impression = "\n".join(gt_impression)
    
        pred_impression = [readable_string(x) for x in pred_impression]
        pred_impression = "\n".join(pred_impression)
        
        inputs1 = tokenizer(gt_impression, return_tensors="pt", truncation=True, padding=True)
        inputs2 = tokenizer(pred_impression, return_tensors="pt", truncation=True, padding=True)
        
        with torch.no_grad():
            embedding1 = model(**inputs1).last_hidden_state.mean(dim=1)
            embedding2 = model(**inputs2).last_hidden_state.mean(dim=1)
            
        imp_cosine = torch.nn.functional.cosine_similarity(embedding1, embedding2)[0].abs().item()

    # Compile results
    return {
        "patient_id": patient_id,
        "true_stage": true_grau,
        "estimated_stage": pred_grau,
        "GT Etiology": " ".join(gt_diagnosis.split(' ')[1:]) if len(gt_diagnosis.split(' ')) > 1 else None,
        "Pred Etiology": " ".join(pred_diagnosis.split(' ')[1:]) if len(pred_diagnosis.split(' ')) > 1 else None,
        "GT Conduct": gt_conduct,
        "Pred Conduct": pred_conduct,
        "GT Impression": gt_impression,
        "Pred Impression": pred_impression,
        "diagnosis_mss": mss_diagnosis,
        "conduct_cosine": cond_cosine,
        "impression_cosine": imp_cosine
    }


# Iterate over patients and compile results
results = []
for patient_id in range(1, 17):
# for patient_id in [10]:
    print(f"Evaluating patient {patient_id}")
    patient_results = evaluate_patient(patient_id)
    if patient_results:
        results.append(patient_results)

df_results = pd.DataFrame(results)