In [None]:
import json
import os
import re

In [None]:
def score_notes_by_modality(
    notes, 
    modality_texts, 
    modality_dict, 
    tokenizer, 
    model, 
    extract_keywords_fn, 
    note_keywords=None,
    alpha=0.5,
    keyword_boost_val=0.9,
    constraint_penalty_val=-1.0
):
    from sklearn.metrics.pairwise import cosine_similarity
    import numpy as np
    import torch

    def get_embedding(text, chunk_size=128):
        tokens = tokenizer.encode(text, truncation=False)
        chunks = [tokens[i:i+chunk_size] for i in range(0, len(tokens), chunk_size)]
        embeddings = []
        for chunk in chunks:
            input_ids = torch.tensor([chunk]).to("cuda")
            with torch.no_grad():
                output = model(input_ids)[0][:, 0, :]
            embeddings.append(output.cpu())
        return torch.mean(torch.cat(embeddings, dim=0), dim=0).unsqueeze(0).numpy()

    def calculate_normalized_prior_note(note_keywords, modality):
        desc_keywords = modality_dict[modality].get("related_conditions", [])
        if not desc_keywords:
            return 0.0
        intersect = set(k.lower() for k in note_keywords) & set(k.lower() for k in desc_keywords)
        return len(intersect) / len(desc_keywords)

    MODALITY_CONSTRAINTS = {
        "glioma": ["Brain MRI", "Brain CT"],
        "pneumonia": ["Chest X-ray", "Chest CT"]
    }
    CRITICAL_KEYWORDS = ['pneumonia', 'glioma']

    def apply_disease_modality_constraint(note, modality):
        note_lower = note.lower()
        for disease, allowed_modalities in MODALITY_CONSTRAINTS.items():
            if disease in note_lower and modality not in allowed_modalities:
                return constraint_penalty_val
        return 0.0

    def compute_keyword_boost(note):
        note_lower = note.lower()
        for keyword in CRITICAL_KEYWORDS:
            if keyword in note_lower:
                return keyword_boost_val
        return 0.0

    # keyword 준비
    keywords_list = []
    for i, note in enumerate(notes):
        if note_keywords and note_keywords[i] is not None:
            keywords_list.append(note_keywords[i])
        else:
            keywords_list.append(extract_keywords_fn(note))

    # note embedding
    note_embeddings = [get_embedding(note) for note in notes]

    # scoring
    modality_scores = {}
    for modality, desc in modality_texts.items():
        desc_emb = get_embedding(desc)
        results = []
        for i, note in enumerate(notes):
            cosine_score = cosine_similarity(note_embeddings[i], desc_emb)[0][0]
            prior = calculate_normalized_prior_note(keywords_list[i], modality)
            boost = compute_keyword_boost(note)
            penalty = apply_disease_modality_constraint(note, modality)
            score = alpha * cosine_score + (1 - alpha) * prior + boost + penalty
            results.append({"note": note, "score": score})
        modality_scores[modality] = sorted(results, key=lambda x: x["score"], reverse=True)
    return modality_scores

In [None]:
from keyword_extractor import MedCATExtractor
from transformers import AutoTokenizer, AutoModel

# Keyword extractor 준비 : 필요한 경우 사용 
extractor = MedCATExtractor()
extract_keywords_fn = extractor.extract

# BERT : 임베딩 모델 준비
tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")
model = AutoModel.from_pretrained("emilyalsentzer/Bio_ClinicalBERT").to("cuda")

# modality_dict 준비
with open("modal_descrip.json", "r") as f:
    modality_dict = json.load(f)

# modality_texts만 따로 추출
modality_texts = {k: v["formal"] for k, v in modality_dict.items()}

# 이후 score_notes_by_modality()에 전달
results = score_notes_by_modality(
    notes=notes,
    modality_texts=modality_texts,
    modality_dict=modality_dict,
    tokenizer=tokenizer,
    model=model,
    extract_keywords_fn=extract_keywords_fn,
    note_keywords=None
)

# output : 각 modality에 대해서 note 별 점수 리스트 반환