In [2]:
from langgraph.graph import END, StateGraph
import os
import requests
import json
from uuid import uuid4
from pydantic import ValidationError
from typing import Literal
import logging
from dotenv import find_dotenv, load_dotenv
from datetime import datetime, timezone

from langchain.schema.output_parser import StrOutputParser
from langchain_community.chat_message_histories import RedisChatMessageHistory
from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, get_buffer_string
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.tracers.langchain import LangChainTracer
from langsmith.client import Client
from langchain_core.tracers.context import tracing_v2_callback_var



In [None]:
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
import operator


class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    chat_history: list[BaseMessage]
    chain: str
    team_members: list[str]
    tool_results: dict[str, dict]


In [None]:
workflow = StateGraph(AgentState)

workflow.add_node("agent", self._call_model)
workflow.add_node("tools", ToolNode(self.tools))

workflow.add_edge(START, "agent")

workflow.add_conditional_edges(
    "agent",
    self._should_continue,
)

workflow.add_edge("tools", "agent")

return await workflow.compile()


In [None]:
def get_workflow(self):
    """Get the workflow for the conversation."""

    workflow = StateGraph(AgentState)

    workflow.add_node("topic_validator", self._topic_validation)

    workflow.add_node("supervisor", self.get_supervisor())

    workflow.add_node("exception_node", self._answer_chain)

    supervisor_conditional_edges = {}
    for agent in self.config.agents.keys():
        workflow.add_node(agent, self._run_agent)
        supervisor_conditional_edges[agent] = agent
    supervisor_conditional_edges["FINISH"] = END

    workflow.add_node("answer_chain", self._answer_chain)

    workflow.set_entry_point("topic_validator")
    workflow.add_conditional_edges(
        "topic_validator",
        self._should_continue_exception
    )

    workflow.add_edge("exception_node", END)

    workflow.add_conditional_edges(
        "supervisor",
        lambda x: x["chain"],
        supervisor_conditional_edges
    )

    for agent in self.config.agents.keys():
        if "rag_call_tool" in self.config.agents[agent].tools.keys():
            workflow.add_edge(agent, END)
        else:
            workflow.add_conditional_edges(
                agent,
                self._should_continue
            )
    workflow.add_edge("answer_chain", END)
    # workflow.add_edge("answer_chain", "supervisor")

    graph = workflow.compile(debug=self.config.debug)

    # Save the graph image if debug_graph is enabled
    if self.config.debug_graph:
        try:
            graph_image = graph.get_graph(xray=True).draw_mermaid_png()
            with open('graph.png', 'wb') as f:
                f.write(graph_image)

        except Exception as ex:
            logging.getLogger("uvicorn").error(
                "Error while displaying the graph: " + str(ex))

    return graph
