In [1]:
# change the directory
import os 
from dotenv import load_dotenv
load_dotenv()
os.chdir(os.getenv('WORKSPACE_DIRECTORY'))


# import other packages
from typing import TypedDict, List, Annotated
from operator import add
from datetime import datetime

# import my packages
from agents.agent import MyAgent
from agents.master import MasterWorkflow
from agents.nurse import NurseAgent
from agents.diagnosis import DiagnosisAgent
from agents.recommender import RecommendationAgent
from agents.receptionist import ReceptionAgent

# imports utils
from utils.vector_database import PDFVectorDatabase
from utils.ddg_search_engine import DDGSearch
from utils.cdc_web_scraper import CDCWebScraper
from utils.google_calendar import GoogleCalendarAPI


#### Mocked Data

In [3]:
class Report(TypedDict):
    """
    Defines the state for the multi-agent workflow
    Includes messages, patient input, and workflow states
    """
    patient_input: str
    messages: Annotated[List[dict], add]
    diagnosis: str
    recommendations: str
    appointment_details: str

mocked_report_empty : Report = {'patient_info': '',
                          'messages' : [],
                          'diagnosis' : '',
                          'recommendations' : '',
                          'appointment_details' : '',
}

mocked_report_patient_filled : Report = {'patient_info': 'The user is a 45-year-old male named John Doe who is experiencing persistent chest pain, shortness of breath, occasional dizziness, and fatigue for the past two weeks. His medical history includes hypertension managed with medication and borderline high cholesterol with no current treatment.',
                          'messages' : [],
                          'diagnosis' : '',
                          'recommendations' : '',
                          'appointment_details' : '',
}

mocked_report_patient_unknwon : Report = {'patient_info': 'The curious fox dashed swiftly through the golden meadow, chasing shadows cast by the setting sun.',
                          'messages' : [],
                          'diagnosis' : '',
                          'recommendations' : '',
                          'appointment_details' : '',
}

mocked_report_diagnosis_filled_part_1 : Report = {'patient_info': 'The user is a 45-year-old male named John Doe who is experiencing persistent chest pain, shortness of breath, occasional dizziness, and fatigue for the past two weeks. His medical history includes hypertension managed with medication and borderline high cholesterol with no current treatment.',
                          'messages' : [],
                          'diagnosis' : 'The possible diagnoses for John Doe include acute coronary syndrome, stable angina, heart failure, hypertensive heart disease, pulmonary embolism, gastroesophageal reflux disease, or anxiety/panic attack, and further tests are needed to confirm the diagnosis.',
                          'recommendations' : '',
                          'appointment_details' : '',
}

mocked_report_diagnosis_filled_part_2 : Report = {'patient_info': 'The user is a 32 year with shortness of breath and acute chest pain',
                          'messages' : [],
                          'diagnosis' : 'The possible diagnoses for this could be Atrial Fibrilation',
                          'recommendations' : '',
                          'appointment_details' : '',
}

mocked_report_recommendation_filled : Report = {'patient_info': 'The user is a 45-year-old male named John Doe who is experiencing persistent chest pain, shortness of breath, occasional dizziness, and fatigue for the past two weeks. His medical history includes hypertension managed with medication and borderline high cholesterol with no current treatment.',
                          'messages' : [],
                          'diagnosis' : 'The possible diagnoses for John Doe include acute coronary syndrome, stable angina, heart failure, hypertensive heart disease, pulmonary embolism, gastroesophageal reflux disease, or anxiety/panic attack, and further tests are needed to confirm the diagnosis.',
                          'recommendations' : 'the patient should get a ECG scan immediately',
                          'appointment_details' : '',
}

mocked_report_all_filled : Report = {'patient_info': 'The user is a 45-year-old male named John Doe who is experiencing persistent chest pain, shortness of breath, occasional dizziness, and fatigue for the past two weeks. His medical history includes hypertension managed with medication and borderline high cholesterol with no current treatment.',
                          'messages' : [],
                          'diagnosis' : 'The possible diagnoses for John Doe include acute coronary syndrome, stable angina, heart failure, hypertensive heart disease, pulmonary embolism, gastroesophageal reflux disease, or anxiety/panic attack, and further tests are needed to confirm the diagnosis.',
                          'recommendations' : 'the patient should get a ECG scan immediately',
                          'appointment_details' : '11/24/2024, 18:00-18:30',
}

mock_patient_info = """
John Doe, a 45-year-old male, reports experiencing persistent chest pain, shortness of breath, occasional dizziness, and fatigue for the past two weeks. His medical history includes hypertension diagnosed five years ago, managed with medication, and borderline high cholesterol with no current treatment. He has a family history of heart disease, as his father suffered a heart attack at age 50. He has no known allergies.
"""

pdf_directory_one = 'data\diagnosis'

#### Test Base LLMs

In [None]:
# PASS
def test_my_agent():
    my_agent = MyAgent()
    my_agent_output = my_agent("hi how are you")
    return my_agent_output
# print(test_my_agent())

Default Agent
Hello! I'm just a computer program, so I don't have feelings, but I'm here and ready to help you. How can I assist you today?


#### Test Master Delegation

In [None]:
# PASS : USER INTERACTION
def test_master_default():
    master_agent = MasterWorkflow()
    output = master_agent.test_call(mocked_report_empty)
    return output
# print(test_master_default())

# PASS : DIAGNOSIS
def test_master_diagnosis():
    master_agent = MasterWorkflow()
    output = master_agent.test_call(mocked_report_patient_filled)
    return output
# print(test_master_diagnosis())

# PASS : RECOMMENDATION
def test_master_recommendation():
    master_agent = MasterWorkflow()
    output = master_agent.test_call(mocked_report_diagnosis_filled_part_2)
    return output
# print(test_master_recommendation())

Master Agent


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mCurrent Report:
{
    'patient_info': '',
    'messages': [],
    'diagnosis': '',
    'recommendations': '',
    'appointment_details': '',
}
Thought: The 'patient_info' is empty and needs to be filled. The NURSE is responsible for filling it out.
Action: NURSE
Final Answer: NURSE[0m

[1m> Finished chain.[0m
NURSE


KeyboardInterrupt: 

#### Test Nurse Agent and Tools

In [None]:
# PASS
def test_nurse_patient_info():
    "my symptoms are shortness of breath and acute chest pain"
    "32"
    "i do not have any medical history"
    nurse_agent = NurseAgent()
    output = nurse_agent(mocked_report_empty)
    return output
# print(test_nurse_patient_info())

# PASS
def test_nurse_appointment():
    nurse_agent = NurseAgent()
    output = nurse_agent(mocked_report_recommendation_filled)
    return output
# print(test_nurse_appointment())

#### Test Diagnosis Agent and Tools

In [None]:
# PASS
def test_diagnose_filled():
    diagnosis_agent = DiagnosisAgent(pdf_directory_one)
    output = diagnosis_agent(mocked_report_patient_filled)
    return output
# print(test_diagnose_filled())

# PASS
def test_diagnose_unknown():
    diagnosis_agent = DiagnosisAgent(pdf_directory_one)
    output = diagnosis_agent(mocked_report_patient_unknwon)
    return output
# print(test_diagnose_unknown())

#### Test Recommendation Agent and Tools

In [None]:
# PASS
def test_recommendation_filled():
    recommendation_agent = RecommendationAgent()
    output = recommendation_agent(mocked_report_diagnosis_filled_part_1)
    return output#
# print(test_recommendation_filled())

#### Test Reception Agent and Tools

In [None]:
# PASS
def test_reception():
    reception_agent = ReceptionAgent()
    output = reception_agent(mocked_report_all_filled)
    return output
# print(test_reception())
    

#### Test Utils

In [6]:
# test vector database
def test_vector_database():
    # PASS
    vector_database = PDFVectorDatabase()
    # PASS
    vector_database.add_pdf_directory(pdf_directory_one)
    # PASS
    vector_database.load_pdfs()
    # PASS
    vector_database.build_database()
    # PASS
    output = vector_database.search_with_metadata(query="something is wrong with my cardiovascular")
    return type(output)
# test_vector_database()

# test the search engine
def test_search_engine():
    # PASS
    search_engine = DDGSearch()
    # PASS
    search_result = search_engine.search("atrial fibrillation symptoms and palpitations, fatigue, dizziness, and history of hypothyroidism")
    return search_result
# test_search_engine()

# test webscraper
def test_webscraper():
    # PASS
    web_scraper = CDCWebScraper()
    # PASS
    new_page = web_scraper.update_and_extract_search_url("Coronary Artery Disease")
    # PASS
    first_source = web_scraper.select_source()
    # PASS
    content = web_scraper.extract_content_from_source()
    # PASS
    web_scraper.close()
    return content
# print(test_webscraper())

# test google calendar 
def test_google_calendar():
    # PASS
    google_calendar = GoogleCalendarAPI()
    # AUTHENTICATED
    google_calendar.authenticate()
    # PASS
    start_time = datetime(2024, 11, 25, 9, 0, 0) 
    end_time = datetime(2024, 11, 25, 10, 0, 0) 
    # PASS
    google_calendar.create_event(summary="My Appointment", 
                                 description="ECG Consultation",
                                 start_time=start_time,
                                 end_time=end_time)

test_google_calendar()

#### Main Workflow

In [2]:
# creates class 
# creates class 
final_workflow = MasterWorkflow() # called once

# starts with initial form
initiate = final_workflow.run()
"i have shortness of breath and acute chest pain"
"45"
"i have no medical histor"

Master Agent


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mCurrent Report:
{
    'patient_info': '',
    'messages': [],
    'diagnosis': '',
    'recommendations': '',
    'appointment_details': '',
}
Thought: The 'patient_info' is empty and needs to be filled. The NURSE is responsible for filling it out.
Action: NURSE
Final Answer: NURSE[0m

[1m> Finished chain.[0m
Nurse Agent


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mCurrent Patient Report Status: {'patient_info': '', 'messages': [], 'diagnosis': '', 'recommendations': '', 'appointment_details': ''}

Thought: The current patient form status indicates that there is no patient info or information gathered yet. I need to start by collecting the user's symptoms, age, and medical history before proceeding to any appointment details.
Reasoning: Gathering the user's symptoms, age, and medical history is essential for understanding their health condition and determining the appropriate next steps. Once I

'i have no medical history'

In [2]:
# creates class 
final_workflow = MasterWorkflow() # called once

# starts with initial form
initiate = final_workflow.run()
"i have shortness of breath and acute chest pain"
"45"
"i have no medical history"

Master Agent


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mCurrent Report:
{
    'patient_info': '',
    'messages': [],
    'diagnosis': '',
    'recommendations': '',
    'appointment_details': '',
}
Thought: The 'patient_info' is empty and needs to be filled. The NURSE is responsible for filling it out.
Action: NURSE
Final Answer: NURSE[0m

[1m> Finished chain.[0m
Nurse Agent


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mCurrent Patient Report Status: {'patient_info': '', 'messages': [], 'diagnosis': '', 'recommendations': '', 'appointment_details': ''}

Thought: The current patient form status indicates that there is no patient info gathered yet. I need to start by collecting the user's symptoms, age, and medical history before proceeding to any appointment details.
Reasoning: Gathering the user's symptoms, age, and medical history is essential for understanding their health condition and determining the appropriate next steps.
Action: user_input
Ac

'i have no medical history'