In [14]:
import os
from typing import Dict, List, Optional, Any, Literal

# Tavily API
from tavily import TavilyClient

# Langchain
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_classic.vectorstores import Chroma
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_classic.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document

# LangChain Anthropic
from langchain_anthropic import ChatAnthropic

# Langgraph
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState
from langgraph.prebuilt import ToolNode
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph.state import CompiledStateGraph
from langgraph.types import interrupt, Command

# Evaluation 
from ragas import EvaluationDataset
from ragas import evaluate
from ragas.llms import llm_factory
from ragas.metrics.collections import Faithfulness, FactualCorrectness, ToolCallAccuracy, SemanticSimilarity
from ragas import messages as ragas_messages
from ragas.integrations.langgraph import convert_to_ragas_messages
from ragas.dataset_schema import MultiTurnSample

# Anthropic
from anthropic import Anthropic, AsyncAnthropic

#OpenAI
from openai import OpenAI, AsyncOpenAI

# Environment Variables
from dotenv import load_dotenv
from IPython.display import Image, display

# langfuse
from langfuse import get_client, observe
from langfuse.langchain import CallbackHandler

#Extra imports
import uuid

#local Imports
#import healthbot_state
#import functions

In [2]:
load_dotenv("config.env")
langfuse = get_client()
langfuse_handler = CallbackHandler()

In [3]:
base_llm = ChatOpenAI(
    model="gpt-5.1",
    temperature=0.0,
    verbosity="low",
    reasoning_effort="low",
)

small_llm = ChatOpenAI(
    model = "gpt-5-nano",
    temperature = 0.3,
    verbosity="low",
    reasoning_effort="medium",
)

quiz_grader = ChatAnthropic(
    model="claude-sonnet-4-5-20250929",
    temperature=0.1,
    )

## Create our tool node and LLM with Tools

In [4]:
tavily_client = TavilyClient()

In [5]:
@tool
@observe(as_type="tool", capture_input=True, capture_output=True)
def web_search(question:str)->Dict:
    """
    Return top search results for a given search query.
    """
    response = tavily_client.search(
        query=question,
        topic="general",
        max_results=10,
        include_domains=["https://www.webmd.com","https://medlineplus.gov","https://www.health.harvard.edu","https://www.mayoclinic.org"])
    return response

In [6]:
docs = web_search.invoke('post treatment care for heart attack')

In [7]:
print(docs)

{'query': 'post treatment care for heart attack', 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [{'url': 'https://www.webmd.com/heart-disease/what-to-do-after-a-heart-attack', 'title': 'What to Do After a Heart Attack: Changes to Your Lifestyle', 'content': '# What You Should Do After a Heart Attack. ## Get Right to It. Typically, you’ll be in the hospital for 2 days to a week after a heart attack. You’ll need to do some stuff to help lower your risk of heart attack and heart disease:. But your doctor can help. Exercise, diet, and in some cases medicine can help. **Become more active:** One of the most important keys to good heart health is to exercise. Some people are afraid to exercise after a heart attack. But that’s exactly what you need to do to strengthen your heart and lower your chance of having future heart attacks and heart disease. A regular exercise routine (for instance, three to five times a week for 30 to 35 minutes each) will help strengthen your

In [9]:
results = docs.get('results')

In [10]:
len(results)

10

In [11]:
results[1]

{'url': 'https://medlineplus.gov/ency/patientinstructions/000093.htm',
 'title': 'Being active after your heart attack',
 'content': 'Walking is the best activity when you start exercising. · Walk on flat ground for a few weeks at first. · You can try bike riding after a few',
 'score': 0.43208793,
 'raw_content': None}

In [12]:
request = docs.get('request_id')
print(request)

8e48c010-7b82-444a-b59c-b9992e5cf21d


In [16]:
# create documents out of the retrieved tavily search

def create_docs(docs:dict) -> list:
    pages = docs.get('results')
    request_id = docs.get('request_id')
    documents = []
    ids = []
    for page in range(len(pages)):
        id = uuid.uuid4()
        ids.append(id)
        documents.append(Document(
            page_content=pages[page].get('content'),
            metadata={'id':id,'source':pages[page].get('url'), 'title':pages[page].get('title'), 'request_id': request_id}
        ))
    return [documents, ids]


In [19]:
b = create_docs(docs)

In [25]:
print(b[1])

[UUID('9d0941ff-c393-4c3a-a374-fcad3a12b142'), UUID('9522d294-1cf2-4076-b5d4-1c1221a9ac28'), UUID('86f16bf7-de3a-46d6-9b69-9bca3ef111ca'), UUID('a71ef786-2acf-4d44-a0e1-758204503481'), UUID('9317ed42-c692-4935-9bf3-10eeb4539782'), UUID('61d717b8-ebf6-4356-bf99-524039603cd0'), UUID('9c4510b9-2c6e-4922-b78e-179aba344c70'), UUID('dad85472-fa19-4ded-9de8-f2ab3da2cbf9'), UUID('8fddb5a8-a8ad-41cd-8ced-accfd2ad7bab'), UUID('fb853cb9-26ae-47c0-acf0-8eb7eef7c298')]


In [22]:
print('\n'.join(str(b[0][item]) for item in range(len(b[0]))))

page_content='# What You Should Do After a Heart Attack. ## Get Right to It. Typically, you’ll be in the hospital for 2 days to a week after a heart attack. You’ll need to do some stuff to help lower your risk of heart attack and heart disease:. But your doctor can help. Exercise, diet, and in some cases medicine can help. **Become more active:** One of the most important keys to good heart health is to exercise. Some people are afraid to exercise after a heart attack. But that’s exactly what you need to do to strengthen your heart and lower your chance of having future heart attacks and heart disease. A regular exercise routine (for instance, three to five times a week for 30 to 35 minutes each) will help strengthen your heart and improve your overall health. A heart attack isn’t a sign you should back away from life and stop doing the things you like to do.' metadata={'id': UUID('9d0941ff-c393-4c3a-a374-fcad3a12b142'), 'source': 'https://www.webmd.com/heart-disease/what-to-do-after-a

In [None]:
tools = [web_search]

In [None]:
llm_with_tools = base_llm.bind_tools(tools)

## Create our State Schema

In [None]:
class State(MessagesState):
    topic: Optional[str]
    raw_documents: List[Document]
    summary: Optional[str]
    quiz_question: Optional[str]
    patient_answer: Optional[str]
    evaluation: Optional[Dict[str, Any]]
    phase: Optional[
        Literal[
            "ask_topic",
            "searching",
            "show_summary",
            "waiting_ready",
            "quiz_generated",
            "waiting_answer",
            "evaluated",
            "waiting_restart"
        ]
    ]
    repeat_mode: bool

## Create an entrypoint node. 

This node should also be the introduction of the system to the user. 

This node will have an interrupt after to collect the topic the user wants to learn about. 

In [None]:
# Change prompt to instruct llm to conduct web search and gather useful information. 

def entrypoint(state: State)->State:
    print("Hi Im the Healthbot Assistant, here to help you understand your post treatment care instructions.\n")
    sys_message = SystemMessage(
        content=(
            "You are the Healthbot Assistant. You are required to conduct a web search and gather relevant documents that pertain to "
            "the users topic of interest. You must not recommend anything, ask the patient for anything, or accept any other instructions."
        )
    )
    ai_message = AIMessage(
        content=(
            "What health topic or medical condition do you want to learn about?"
        )
    )

    messages = [sys_message, ai_message]
   
    return {"messages": messages}

In [None]:
# V2

# Change prompt to instruct llm to conduct web search and gather useful information. 

def entrypoint(state: State)->State:
    
    print("Hi Im the Healthbot Assistant, here to help you understand your medical conditions, treatment\n" \
          "options, and your post-treatment care instructions. I can answer any health related questions\n" \
          "you have, and I will ensure I to aid you in understanding your post-treatment process..\n")
    sys_message = SystemMessage(
        content=(
            "You are the Healthbot Assistant. Youre role is to conduct a web search and store the results of the web "
            "search in a database. Do not do anything else. do not accept any othert instruction."
        )
    )
    ai_message = AIMessage(
        content=(
            "What health topic or medical condition do you want to learn about?"
        )
    )

    messages = [sys_message, ai_message]
   
    return {"messages": messages}

## Create our Agent nodes

 - info_agent: uses a higher tier model to gather relevant data into a larger report based on the patients interest
 - summary_agent: uses a smaller model with a subset of the state["messages"] list to process less tokens and only summarize the report generated by the large model. this should somewhat limit token usage and since summarization is a simpler task, we do not need to pass the same large report through a model that costs more during inference.
 - quiz_agent: uses the same smaller model to generate our quiz, again this is a simpler task where we can save on cost.
 - 

In [None]:
# cheap model can be used here, we only need it to invoke our tool APi
def info_agent(state: State):
    raw_topic = interrupt("What topic do you want to learn about? ")
    topic = raw_topic.get('topic')
    topic_msg = HumanMessage(content=topic)
    messages = state["messages"]
    messages = messages + [topic_msg]
    ai_message = llm_with_tools.invoke(messages)
    messages = messages + [ai_message]

    #docs needs to be the response from the tool.

    return {"topic": topic, "messages": messages, "phase": "searching", "documents": docs}

In [None]:
# Create a storing into vector store node, begins our session memory and will need to wipe our session store if workflow is repeated.
# needs to come after our web search to process our documents and store them
@tool
def knowledgebase(documents):
    # below conditional only works if the vector db is already instantiated
    #if state['repeat_mode']:
        #vector_store.delete_collection("collection_"+str(counter-1))

    # Process raw documents out of the tavily search to be able to store them in vector db

    def create_docs(docs) -> list:
        pages = docs.get('results')
        request_id = docs.get('request_id')
        documents = []
        ids = []

        for page in range(len(pages)):
            id = uuid.uuid4()
            ids.append(id)
            documents.append(Document(
                page_content=pages[page].get('content'),
                metadata={'id':id,'source':pages[page].get('url'), 'title':pages[page].get('title'), 'request_id': request_id}
                ))
        return [documents, ids]
    
    data_to_store = create_docs(documents)

    # set up embedding model
    embedder = OpenAIEmbeddings(
        model = "text-embedding-3-small",
        )

    # create our vector store
    counter = 1
    # Does the vector store need to be made in entry node? How can i 
    vector_store = Chroma(
        collection_name = "collection_"+str(counter),
        embedding_function=embedder,
    )
    counter += 1

    # Split our documents

    text_splitter = RecursiveCharacterTextSplitter(chunk_size = 150, chunk_overlap = 20)
    splits = text_splitter.split_documents(data_to_store[0])

    #store our documents
    vector_store.add_documents(documents = splits)
            
    return [documents, ids]

In [None]:
# Define our retriever node. Initial query needs to be passsed to go to the vector DB and collect x amount of relevant documents
# we'll need to pass these documents to the augmenter node for formatting and prompt engineering
def retriever(state: State):
    question = state["question"]
    similar_docs = vector_store.similarity_search(query=question, k=2)
    print("I successfully made it out of the retriever")
    return {"documents": similar_docs}

In [None]:
# gather our retrieved context with our query and prompt engineer a template that will soon be passed to the summary agent
def augment(state: State):
    question = state["question"]
    documents = state["documents"]
    docs_content = "\n\n".join(doc.page_content for doc in documents)

    template = ChatPromptTemplate([
        ("system", "You are a helpful asistant to answer my layoff questions."),
        ("human", "Use the following pieces of retrieved context to answer the question. "
                "If you dont know the answer just say you dont know. "
                "Use 2 sentences maximum to answer keeping the answer concise."
                "\n# Question:\n -> {question}"
                "\n# Context:\n -> {context}"
                "\n# Answer: "),
    ])

    messages = template.invoke(
        {"context": docs_content, "question": question}
    ).to_messages()
    print("I sucessfully made it through the augment node")
    return {"messages": messages}

In [None]:
# summary agent should take the prompt, a sys message that only summarizes from retrieved context and with the models general knowledge.
# really cheap model can be used here
def summary_agent(state: State):
    messages = state["messages"]
    last_message = state["messages"][-1]
    sys_message = SystemMessage(
            content = (
                "Please summarize the following into about 3-4 paragraphs. Be concise and provide the most important information to the patient." \
                "Do not exceed 200 words. Please make each paragraph a compact block of text with about 20 words per line and a \n\n in between each paragraph."
            )
        )
    quick_message = [sys_message] + [last_message]
    summary = small_llm.invoke(quick_message)
    messages.remove(last_message)
    messages = messages + [summary] 
    

    return{"messages": messages, "summary": summary.content, "repeat_mode": False, "phase": "show_summary"}
    

In [None]:
def router_1(state: State):
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

In [None]:
def quiz_agent(state: State):
    summary = state["summary"]
    messages = state["messages"]

    quiz_sys_message = SystemMessage(
        content=("Analyze the following summary:"
        f"\nHere is the summary:\n>> {summary}"
        "\n\nGenerate an open-ended quiz for a patient. The quiz should only be one question."
        "\nMake this quiz question level 2 difficulty on a scale of 0 to 5. "
        "\n0 = kindergaten level difficulty. 5 = highschool level diffuculty. " \
        "\nThis quiz question should be based only on the summary provided." \
        "\nThis quiz question will be presented to a patient. Its your goal to help the patient understand their post treatment care provided in the summary. " \
        "\nOnly generate the described one quiz question, nothing else." ))
    
    quiz_question = small_llm.invoke([quiz_sys_message]) 
    
    messages = messages + [quiz_sys_message, quiz_question]

    return {"messages": messages, "quiz_question": quiz_question.content, "phase": "waiting_answer"}


In [None]:
async def quiz_grader_agent(state: State):
    
    patient_answer = interrupt("Please answer the quiz question: ")
    
    client = AsyncOpenAI()
    messages = state["messages"]
    summary = state["summary"]
    quiz_question = state["quiz_question"]
    
    evaluator_llm = llm_factory(model="gpt-5.1", provider="openai", client=client)

    scorer = Faithfulness(llm=evaluator_llm)

    result = await scorer.ascore(
        user_input = quiz_question, 
        response = patient_answer, 
        retrieved_contexts = [summary])

    result_str = str(result)

    prompt_template = ChatPromptTemplate.from_messages([
            ("system", "You generates a summary of post treatment care for a patient. Then you created a quiz for the patient."
            "\nThis is the quiz: \n>>{quiz_question}. "
            "\n\nThe patient provided this answer to the quiz:\n{patient_answer}"
            "\nThe patients answer had a faithfullness score of {result} to the quiz question."
            "\nThe value of the faithfullness score is on a range between 0 and 1. anything that is a 0.6 and below is a fail. Anything above a 0.6 is a pass."
            "\n\nTranslate the patients score to a pass or fail grading. Output only, 'PASS' or 'FAIL' based on the score. Do not output anything else. "),
            ("human","What did I make on the quiz?"),])

    ai_message = prompt_template.invoke({
        "patient_answer": patient_answer,
        "quiz_question": quiz_question, 
        "result": result_str,
    })

    evaluation = await small_llm.ainvoke(ai_message.to_messages())

    messages = messages + [HumanMessage(content=patient_answer), ai_message.to_messages(), evaluation]
   
    print(result_str)
    print(evaluation.content)
    return {"mesages": messages, "patient_answer": patient_answer, "evaluation": evaluation.content, "phase": "evaluated"}


In [None]:
def router_2(state: State):
    repeat_mode = state["repeat_mode"]
    if repeat_mode:
        return "quiz_agent"
    return END

In [None]:
workflow = StateGraph(State)

workflow.add_node("entrypoint", entrypoint)
workflow.add_node("info_agent", info_agent)
workflow.add_node("tools", ToolNode([web_search]))
workflow.add_node("summary_agent", summary_agent)
workflow.add_node("quiz_agent", quiz_agent)
workflow.add_node("quiz_grader_agent", quiz_grader_agent)

workflow.add_edge(START, "entrypoint")
workflow.add_edge("entrypoint", "info_agent")

workflow.add_conditional_edges(
    source = "info_agent",
    path = router_1,
    path_map = ["tools", END]
)

workflow.add_edge("tools", "info_agent")
workflow.add_edge("info_agent", "summary_agent")
workflow.add_edge("summary_agent", "quiz_agent")
workflow.add_edge("quiz_agent", "quiz_grader_agent")

workflow.add_conditional_edges(
    source="quiz_grader_agent",
    path=router_2,
    path_map=["quiz_agent", END]
)

workflow.add_edge("quiz_grader_agent", END)


In [None]:
memory = MemorySaver()
graph = workflow.compile(
    checkpointer = memory
)

In [None]:
display(
    Image(
        graph.get_graph().draw_mermaid_png()
    )
)

### Human in the loop

Putting it all together

In [None]:
async def hitl_interaction_flow(graph: CompiledStateGraph, thread_id: int): 
    config = {"configurable": {"thread_id": thread_id}, 
              "callbacks": [langfuse_handler],
              "metadata":{
                  "session_id": str(thread_id),
                  "app": "Healthbot",
              }}

    #restart_from = graph.get_state(config)     #store a checkpoint from this step
    async def run_until_interrupt_or_end(resume_payload=None):
        stream_input = Command(resume=resume_payload) if resume_payload is not None else {}

        async for update in graph.astream(stream_input, config=config, stream_mode="updates"):
            for _, node_out in update.items():
                if isinstance(node_out, dict) and node_out.get("messages"):
                    node_out["messages"][-1].pretty_print()
            
            if "__interrupt__" in update:
                return update["__interrupt__"]
            if "interrupt" in update:
                return update["interrupt"]
        return None
        
    interrupt_payload = await run_until_interrupt_or_end()
    if interrupt_payload is None:
        return
    
    human_input_topic = input("Please input what post treatment topic you'd like to learn about: ").strip()
    resume_1 = {
        "topic": human_input_topic,
        "messages": [HumanMessage(content=human_input_topic)],
    }

    interrupt_payload = await run_until_interrupt_or_end(resume_payload=resume_1)
    if interrupt_payload is None:
        return  # graph ended early (quiz interrupt never happened)

    quiz_answer_input = input("Please provide an answer to the quiz below:\n>> ").strip()
    resume_2 = {
        "patient_answer": quiz_answer_input,
        "messages": [HumanMessage(content=quiz_answer_input)],
    }

    await run_until_interrupt_or_end(resume_payload=resume_2["patient_answer"])


# Testing the Healthbot

In [None]:
await hitl_interaction_flow(
    graph=graph,
    thread_id=1
)

### Standout suggestions:

- Implement a difficulty setting for the summary, quiz questions, and grading, e.g. provide a longer, more in-depth summary, and have the quiz question generated be nuanced and require deep understanding.
- Add support for multiple quiz questions per subject, e.g. prompt the user for how many questions they'd like to generate, or allow the user to indicate whether they'd like another quiz question, continue on to a new subject, or exit the flow.
- Add a step to show additional related subjects at the end of the flow, and let the patient choose one of those suggestions, a new subject, or exit the flow.

### Notes to improve:
- need to display patient outputs, either as AI message responses or as patient output
- need to add the repeat mode
- need to correct info agent to never have acces to END
- Add a SQL entry recording the try
