In [25]:
# Ensure project root is on sys.path for absolute imports
import os, sys
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))
if project_root not in sys.path:
    sys.path.insert(0, project_root)

In [26]:
from dotenv import load_dotenv

load_dotenv()

True

In [27]:
# import importlib
# import agents.rag_ingest as rag_ingest
# importlib.reload(rag_ingest)

from agents.rag_ingest import initialize_vectorstore_with_rag_chain

In [28]:
from pathlib import Path
from tempfile import mkdtemp

FILE_PATH = "https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf"
TOP_K = 3
milvus_uri = str(Path(mkdtemp()) / "vector.db")
collection_name="vectordb"

In [29]:
rag_chain = initialize_vectorstore_with_rag_chain(
    FILE_PATH=FILE_PATH,
    TOP_K=TOP_K,
    milvus_uri=milvus_uri,
    collection_name=collection_name,
)

2026-01-08 01:28:43,233 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2026-01-08 01:28:43,278 - INFO - Going to convert document batch...
2026-01-08 01:28:43,283 - INFO - Initializing pipeline for StandardPdfPipeline with options hash e15bc6f248154cc62f8db15ef18a8ab7
2026-01-08 01:28:43,296 - INFO - Auto OCR model selected ocrmac.
2026-01-08 01:28:43,300 - INFO - Accelerator device: 'mps'
2026-01-08 01:28:49,178 - INFO - Accelerator device: 'mps'
2026-01-08 01:28:49,983 - INFO - Processing document NIPS-2017-attention-is-all-you-need-Paper.pdf
2026-01-08 01:29:01,150 - INFO - Finished converting document NIPS-2017-attention-is-all-you-need-Paper.pdf in 21.11 sec.
I0000 00:00:1767817741.353588  302144 fork_posix.cc:71] Other threads are currently calling into gRPC, skipping fork() handlers
2026-01-08 01:29:04,531 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"


In [30]:
from agents.retrieval_orchestrator_agent import create_retrieval_orchestrator_agent, Context
from agents.resoning_agent import create_reasoning_agent

In [31]:
retrieval_orchestrator_agent = create_retrieval_orchestrator_agent()
reasoning_agent, reasoning_prompt = create_reasoning_agent()

In [32]:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.types import Command
from typing import Annotated, Any

In [33]:
class MainState(MessagesState):
    question: Annotated[str, "The user's question"]
    retrieval_results: Annotated[dict, "The retrieval results from the retrieval orchestrator agent"]
    final_answer: Annotated[str, "The final answer generated by the reasoning agent"]
    human_approval: Annotated[bool | None, "Whether the human approved the final answer"] = None
    rag_chain: Annotated[Any, "The RAG chain to use for retrieval"]
    pending_review: Annotated[Any | None, "HITL review configs returned on interrupt"] = None

In [34]:
def invoke_retrieval_orchestration(state: MainState):
    result = retrieval_orchestrator_agent.invoke(
        {
            "messages": [
                {"role": "user", "content": state.question}
            ],
        },
        context=Context(rag_chain)
    )
    res_messages = result["messages"]
    retrieval_results = result["structured_response"]["results"]
    return {"messages": res_messages, "retrieval_results": retrieval_results}

In [35]:
def invoke_reasoning(state: MainState):
    messages = [("system", reasoning_prompt.format(user_question=state.question, context=state.retrieval_results))]
    cfg = {"configurable": {"thread_id": "reasoning-thread"}}

    result = reasoning_agent.invoke({"messages": messages}, config=cfg)
    res_messages = result["messages"]

    # If the agent interrupted for HITL, store review configs; graph edges decide routing
    if "__interrupt__" in result:
        review_configs = result["__interrupt__"][-1].value["review_configs"]
        return {"messages": res_messages, "pending_review": review_configs}

    final_answer = result["structured_response"]["final_answer"]
    return {"messages": res_messages, "final_answer": final_answer, "pending_review": None}

In [36]:
def human_review(state: MainState):
    # Present state.pending_review to a human and collect decision.
    # For now, auto-approve to demonstrate resume flow.
    decision = {"type": "approve"}
    cfg = {"configurable": {"thread_id": "reasoning-thread"}}

    resumed = reasoning_agent.invoke(
        Command(resume={"decisions": [decision]}),
        config=cfg,
    )

    res_messages = resumed["messages"]
    final_answer = resumed["structured_response"]["final_answer"]

    # Graph-directed: only update; builder edge sends us to END
    return {
        "messages": res_messages,
        "final_answer": final_answer,
        "human_approval": True,
        "pending_review": None,
    }

In [None]:
builder = StateGraph(MainState)

builder.add_node("invoke_retrieval_orchestration", invoke_retrieval_orchestration)
builder.add_node("invoke_reasoning", invoke_reasoning)
builder.add_node("human_review", human_review)

builder.add_edge(START, "invoke_retrieval_orchestration")
builder.add_edge("invoke_retrieval_orchestration", "invoke_reasoning")

# Route based on presence of pending_review set by invoke_reasoning
builder.add_conditional_edges("invoke_reasoning", {
    "human_review": lambda state: state.pending_review is not None,
    END: lambda state: state.pending_review is None,
})

builder.add_edge("human_review", END)

graph = builder.compile()