In [None]:
# import necessary libraries 
import pandas as pd
import os
import textstat
from openai import OpenAI
import json
import re
import requests
from dotenv import load_dotenv
import math

In [None]:
# import autogen
import autogen
from autogen import ConversableAgent

In [None]:
os.environ['OPENAI_API_KEY'] = ""

In [None]:
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
CLIENT = OpenAI(api_key = OPENAI_API_KEY)
OPENAI_MODEL = "gpt-4o"

In [None]:
llm_config={"model": OPENAI_MODEL}

In [None]:
# import prompts
from jh_pfx_prompts import example, icd10_example, single_fewshot_icd10_labeling_prompt, writer_prompt,doctor_prompt, readability_checker_prompt, ICD10_LABELER_INSTRUCTION

In [None]:
#reading levels
PROFESSIONAL = "Professional"
COLLEGE_GRADUATE = "College Graduate"
COLLEGE = "College"
TENTH_TO_TWELTH_GRADE = "10th to 12th grade"
EIGTH_TO_NINTH_GRADE = "8th to 9th grade"
SEVENTH_GRADE = "7th grade"
SIXTH_GRADE = "6th grade"
FIFTH_GRADE = "5th grade"
N_A = "N/A"

In [None]:
# reading ease variables
fifth_grade = 95
sixth_grade = 85
seventh_grade = 75
eigth_and_ninth_grade = 65
tenth_to_twelfth_grade = 55
college = 40
college_graduate = 20
professional = 5

In [None]:
def adjust_difference(diff, threshold):
    """Adjust the readability difference based on the threshold."""
    if diff > threshold:
        return diff - threshold
    return 0

In [None]:
def extract_json_gpt3(groupchat):
    """
    Extracts the first valid JSON object found within the 'content' of the
    messages in a groupchat. By default, this searches from the last message
    to the first so you get the most recent JSON, but you can reverse that
    logic if desired.
    """
    # Go in reverse order so you get the most recent JSON first
    for msg in reversed(groupchat.messages):
        content = msg.get("content", "")
        # Attempt to parse content as JSON
        try:
            data = json.loads(content)
            return data
        except (json.JSONDecodeError, TypeError):
            # If this message isn't valid JSON, just continue
            pass

    # If no message contains valid JSON, return None
    return None


In [None]:
import unicodedata

def extract_json_gpt4o(groupchat):
    """
    Extracts the first valid JSON object from the 'content' of messages,
    handling GPT-4o's potential formatting quirks.
    """
    for msg in reversed(groupchat.messages):
        content = msg.get("content", "").strip()

        # Normalize encoding
        content = unicodedata.normalize("NFKC", content)

        # Remove markdown blocks if they exist
        content = re.sub(r"```json|```", "", content).strip()

        # Try direct JSON parsing first
        try:
            return json.loads(content)
        except json.JSONDecodeError:
            pass  # Continue if direct parsing fails

        # Extract JSON from mixed text
        json_match = re.search(r"\{.*?\}", content, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group(0))
            except json.JSONDecodeError:
                pass  # Continue if regex extraction fails

    return None


In [None]:
def label_icd10s(pfx_outputs_json):
    # Build few-shot examples from a DataFrame (assumes df_fewshot and icd10_example exist)
    pfx_icd10_fewshot_examples = ""
    for i, row in df_fewshot.iterrows():
        pfx_icd10_fewshot_examples += icd10_example.format(**row)

    pfx_icd10_codes = []
    for pfx_output in pfx_outputs_json:
        try:
            prompt = single_fewshot_icd10_labeling_prompt.format(
                examples=pfx_icd10_fewshot_examples,
                PFx=pfx_output['PFx']
            )
        except Exception as e:
            print("ERROR: %s" % pfx_output)
            return

        response = CLIENT.chat.completions.create(
            model=OPENAI_MODEL,
            temperature=0.0,
            messages=[
                {"role": "system", "content": "You are an ICD10 medical coder for incidental findings."},
                {"role": "system", "content": prompt}
            ],
            stream=False,
        )
        
        # Create a simple wrapper object with a 'messages' attribute as a list of dictionaries.
        wrapper = type("Wrapper", (), {})()
        # Convert the ChatCompletionMessage to a dictionary.
        wrapper.messages = [{
            "role": response.choices[0].message.role,
            "content": response.choices[0].message.content
        }]
        
        pfx_icd10_codes.append(wrapper)
    
    return list(map(extract_json, pfx_icd10_codes))


In [None]:
def get_last_agent_response(messages, agent_name):
    for message in messages:
        if message['name'] == agent_name:
            return message["content"]
    return None

In [None]:
df_eval = pd.read_csv('pfx_evaluation_data.csv')

In [None]:
# import fewshot examples
df_fewshot = pd.read_csv('pfx_fewshot_examples_college.csv')

In [None]:
# create writer, doctor, readability checker, and user agents 
for i, row in df_eval.iloc[4:5].iterrows():
    writer = ConversableAgent(
        name = "Writer",
        system_message = writer_prompt.format(Incidental_Finding = row['Incidental_Finding'], Reading_Level = SIXTH_GRADE),
        llm_config = llm_config,
        code_execution_config = False,
        human_input_mode = "NEVER",
    )
    
    doctor = ConversableAgent( 
        name = "Doctor",
        system_message = doctor_prompt.format(Incidental_Finding = row['Incidental_Finding'], ICD10_code = row["ICD10_code"]),
        llm_config = llm_config,
        code_execution_config = False,
        human_input_mode = "NEVER",
    ) 
    
    readability_checker = ConversableAgent(
        name = "Readability_Checker",
        system_message = readability_checker_prompt.format(reading_level = SIXTH_GRADE),
        llm_config = llm_config,
        code_execution_config = False,
        human_input_mode = "NEVER",
    )

    icd10_labeler = ConversableAgent(
        name = "ICD10_Labeler",
        system_message = ICD10_LABELER_INSTRUCTION,
        llm_config = llm_config,
        code_execution_config = False,
        human_input_mode = "NEVER",
    )
    



In [None]:
def state_transition(last_speaker, groupchat):
    messages = groupchat.messages

    if last_speaker is manager:
        return writer
    elif last_speaker is writer:
        return icd10_labeler 
    elif last_speaker is icd10_labeler:
        return doctor
    elif last_speaker is doctor:
        if "INACCURATE" in messages[-1]["content"]:
            return writer 
        else:
            return readability_checker
    elif last_speaker is readability_checker:
        if "All done!" in messages[-1]["content"]:
            return None 
        else:
            return writer

In [None]:
# create agent groupchat
groupchat = autogen.GroupChat(
    agents = [writer, icd10_labeler, doctor, readability_checker],
    messages = [],
    max_round = 20,
    speaker_selection_method = state_transition,
)

manager = autogen.GroupChatManager(
    groupchat = groupchat, llm_config = llm_config,
)

In [None]:
groupchat_result = manager.initiate_chat(manager, message = """Please play your specified role in 
generating a patient friendly explanation of an inicidental MRI finding.""") 

In [None]:
chat = extract_json_gpt4o(groupchat)

In [None]:
chat

In [None]:
result = pd.DataFrame([chat])

In [None]:
result

In [None]:
result_icd10_labels = label_icd10s([chat])

In [None]:
result_icd10_labels

In [None]:
result['_0_agent_icd10_codes'] = list(map(lambda x: list(x.values())[0] if x else "", result_icd10_labels))
result["_0_icd10_matches"]= result.ICD10_code == result._0_agent_icd10_codes
result["_0_pfx_icd10_matches"] = result.ICD10_code == result["PFx_ICD10_code"] 
result["_0_flesch"] = result['PFx'].apply(textstat.flesch_reading_ease)

In [None]:
desired_reading_ease = sixth_grade
# Calculate threshold for penalty
if desired_reading_ease >= 55:
    threshold = 10
else:
    threshold = 20

# Calculate accuracy score
accuracy_icd10_matches = sum(result["_0_icd10_matches"]) / len(result.index)
accuracy_pfx_matches = sum(result["_0_pfx_icd10_matches"]) / len(result.index)

# Extract scalar value for flesch_score (assuming result["_0_flesch"] is a Series)
flesch_score_scalar = result["_0_flesch"].iloc[0]

# Adjust weights for overall score 
total_icd10_matches = accuracy_icd10_matches + accuracy_pfx_matches

# Calculate readability score 
readability_difference = abs(flesch_score_scalar - desired_reading_ease)

# Compute the overall score
overall_score = total_icd10_matches * 0.8 + 0.2 * (1 / (readability_difference + 1))

# Calculate readability score using scalar value
readability_difference_log = desired_reading_ease - flesch_score_scalar
if readability_difference_log <= threshold:  # No penalty if difference is within the threshold
    readability_difference_p = 0
else:  # Apply penalty only if readability exceeds the threshold
    readability_difference_with_threshold = readability_difference_log - threshold
    readability_difference_p = math.log(1 + readability_difference_with_threshold) / math.log(20)

log_overall_score = total_icd10_matches * 0.8 + readability_difference_log * 0.2

grades_data = []
grades_data.append({
    "accuracy_agent_icd10": float(accuracy_icd10_matches),
    "accuracy_pfx_icd10": float(accuracy_pfx_matches),
    "readability_difference": readability_difference, 
    "overall_score": overall_score,  
    "log_overall_score": log_overall_score,  
})
grades = pd.DataFrame(grades_data)
result = pd.concat([result, grades], axis=1)


In [None]:
grades

In [None]:
result