In [None]:
import getpass
import os


def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}: ")


# _set_env("ANTHROPIC_API_KEY")
_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig

In [None]:
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages


def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """Push or pop the state."""
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    user_info: str
    dialog_state: Annotated[
        list[
            Literal[
                "assistant",
                "collect_info",
                "obtain_permission",
                "webform_api_call",
                "schedule_call",
            ]
        ],
        update_dialog_stack,
    ]

In [None]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )

In [None]:
from typing import Callable
from langchain_core.messages import ToolMessage


def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
    def entry_node(state: State) -> dict:
        tool_call_id = state["messages"][-1].tool_calls[0]["id"]
        return {
            "messages": [
                ToolMessage(
                    content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
                    f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
                    " and the booking, update, other other action is not complete until after you have successfully invoked the appropriate tool."
                    " If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
                    " Do not mention who you are - just act as the proxy for the assistant.",
                    tool_call_id=tool_call_id,
                )
            ],
            "dialog_state": new_dialog_state,
        }

    return entry_node

In [None]:
from typing import Annotated, Dict, Any
from langchain.tools import Tool, tool

In [None]:
@tool
def ask_contact_permission() -> Dict[str, Any]:
    """Asks the user for permission to contact via email or phone"""

    return {
        "message": {
            "Do you give permission for us to contact via email or phone? Append legal print"
        }
    }

In [None]:
@tool
def ask_credit_pull_permission() -> Dict[str, Any]:
    """Asks the user for permission to pull their credit report."""

    return {
        "message": {
            "Do you give permission for us to pull your credit information? Append legal message"
        }
    }

### Router Logic

In [None]:
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime

In [None]:
class ToSavingsEstimate(BaseModel):
    """Handles savings estimate calculations for a customer based on their debt and program eligibility."""

    debt: float = Field(
        description="The total debt amount to calculate savings estimate on"
    )
    contact_permission: bool = Field(
        description="Indicates whether the customer has given permission to be contacted"
    )
    ask_credit_pull_permission: bool = Field(
        description="Indicates whether the customer has given permission for a credit pull"
    )

    class Config:
        json_schema_extra = {
            "example": {
                "debt": 10000,
                "contact_permission": True,
                "credit_pull_permission": True
            }
        }


class ToCollectInformation(BaseModel):
    """Takes user input and collects necessary information for the task at hand"""

    request: str = Field(
        description = "The user's request or question."
    )

class ToWebForm(BaseModel):
    """Handles API requests to pull customer credit information for eligibility checks and to create lead internally in Salesforce."""

    first_name: str = Field(description="The first name of the customer.")
    debt_amount: float = Field(description ="The total debt amount of the customer.")
    mobile_phone: str = Field(description ="The customer's mobile phone number.")

    class Config:
        json_schema_extra = {
            "example": {
                "first_name": "Terri",
                "debt_amount": 22030,
                "mobile_phone": "555-555-5555"
            }
        }

class ToScheduler(BaseModel):
    """Handles API requests to make an appointment for a call with representative."""

    month: float = Field(description="The month of the scheduled call")
    day: float = Field(description="The day of the scheduled call")
    hour: float = Field(description="The hour of the scheduled call")
    minute: float = Field(description="The minute of the scheduled call")

    class Config:
        json_schema_extra = {
            "example": {
                "month": 9,
                "day": 12,
                "hour": 14,
                "minute": 30
            }
        }

In [None]:
class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)

            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}


class CompleteOrEscalate(BaseModel):
    """A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
    who can re-route the dialog based on the user's needs."""

    cancel: bool = True
    reason: str

    class Config:
        json_schema_extra = {
            "example": {
                "cancel": True,
                "reason": "User changed their mind about the current task.",
            },
            "example 2": {
                "cancel": True,
                "reason": "I have fully completed the task.",
            },
            "example 3": {
                "cancel": False,
                "reason": "I need to search the user's emails or calendar for more information.",
            },
        }


In [None]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

In [None]:
primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant for a debt consolidation company. "
            "Your primary role is to engage the customer to enrolling in the company's product by building rapport."
            "If a customer requests to get a savings estimate, book a call with an agent or update his personal information, "
            "delegate the task to the appropriate specialized assistant by invoking the corresponding tool. You are not able to make these types of changes yourself."
            " Only the specialized assistants are given permission to do this for the user."
            "The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
            "Provide detailed information to the customer."
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            " If a search comes up empty, expand your search before giving up."
            "\n\nCurrent user flight information:\n<Flights>\n{user_info}\n</Flights>"
            "\nCurrent time: {time}.",
        ),
        ("placeholder", "{messages}"),
    ]
).partial(time=datetime.now)
primary_assistant_tools = [
    TavilySearchResults(max_results=1),
]
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
    primary_assistant_tools
    + [
        ToSavingsEstimate,
        ToCollectInformation,
        ToWebForm,
        ToScheduler,
    ]
)

In [None]:
from typing import Literal

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition

builder = StateGraph(State)

builder.add_edge("__start__", "primary_assistant")

# Primary assistant
builder.add_node("primary_assistant", Assistant(assistant_runnable))
builder.add_node(
    "primary_assistant_tools", create_tool_node_with_fallback(primary_assistant_tools)
)


def route_primary_assistant(
    state: State,
):
    route = tools_condition(state)
    if route == "__end__":
        return "__end__"
    tool_calls = state["messages"][-1].tool_calls
    if tool_calls:
        if tool_calls[0]["name"] == ToSavingsEstimate.__name__:
            return "enter_savings_estimate"
        elif tool_calls[0]["name"] == ToCollectInformation.__name__:
            return "enter_collect_info"
        elif tool_calls[0]["name"] == ToWebForm.__name__:
            return "enter_webform_api"
        elif tool_calls[0]["name"] == ToScheduler.__name__:
            return "enter_schedule_call"
        return "primary_assistant_tools"
    raise ValueError("Invalid route")

builder.add_conditional_edges(
    "primary_assistant",
    route_primary_assistant,
    [
        "enter_update_flight",
        "enter_book_car_rental",
        "enter_book_hotel",
        "enter_book_excursion",
        "primary_assistant_tools",
        "__end__",
    ],
)
builder.add_edge("primary_assistant_tools", "primary_assistant")

def route_to_workflow(
    state: State,
) -> Literal[
    "primary_assistant",
    "collect_info",
    "obtain_permission",
    "webform_api",
    "schedule_call",
]:
    """If we are in a delegated state, route directly to the appropriate assistant."""
    dialog_state = state.get("dialog_state")
    if not dialog_state:
        return "primary_assistant"
    return dialog_state[-1]


builder.add_conditional_edges("assistant", route_to_workflow)

# Compile graph
memory = MemorySaver()
part_4_graph = builder.compile(
    checkpointer=memory,
    # Let the user approve or deny the use of sensitive tools
    interrupt_before=[
        "collect_info_sensitive_tools",
        "obtain_permission_sensitive_tools",
        "webform_api_sensitive_tools",
        "schedule_call_sensitive_tools",
    ],
)

# This node will be shared for exiting all specialized assistants
def pop_dialog_state(state: State) -> dict:
    """Pop the dialog stack and return to the main assistant.

    This lets the full graph explicitly track the dialog flow and delegate control
    to specific sub-graphs.
    """
    messages = []
    if state["messages"][-1].tool_calls:
        # Note: Doesn't currently handle the edge case where the llm performs parallel tool calls
        messages.append(
            ToolMessage(
                content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
                tool_call_id=state["messages"][-1].tool_calls[0]["id"],
            )
        )
    return {
        "dialog_state": "pop",
        "messages": messages,
    }


builder.add_node("leave_skill", pop_dialog_state)
builder.add_edge("leave_skill", "primary_assistant")