In [1]:
import getpass
import os

def _set_env(key: str):
    if key not in os.environ:
        os.environ[key] = getpass.getpass(f"{key}:")

_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")

In [2]:
from langsmith import utils

os.environ['LANGCHAIN_TRACING_V2']='true'
os.environ['LANGCHAIN_API_KEY']="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"
os.environ['LANGCHAIN_PROJECT']="sql query agent"
utils.tracing_is_enabled()

True

In [3]:
#Use sqlite3 if database modification required
import sqlite3
# from langchain_community.utilities import SQLDatabase

db = "db.sqlite3"
# db = SQLDatabase.from_uri(f"sqlite:///db.sqlite3")

In [4]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(base_url="https://api.deepseek.com", model="deepseek-chat")

In [5]:
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda

from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            )
            for tc in tool_calls
        ]
    }


def create_tool_node_with_fallback(tools: list) -> dict:
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )


def _print_event(event: dict, _printed: set, max_length=1500):
    current_state = event.get("dialog_state")
    if current_state:
        print("Currently in: ", current_state[-1])
    message = event.get("messages")
    if message:
        if isinstance(message, list):
            message = message[-1]
        if message.id not in _printed:
            msg_repr = message.pretty_repr(html=True)
            if len(msg_repr) > max_length:
                msg_repr = msg_repr[:max_length] + " ... (truncated)"
            print(msg_repr)
            _printed.add(message.id)

In [6]:
from typing import Annotated, Literal, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages


def update_dialog_stack(left: list[str], right: Optional[str]) -> list[str]:
    """Push or pop the state."""
    if right is None:
        return left
    if right == "pop":
        return left[:-1]
    return left + [right]


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    #If initial information about the user is needed
    #user_info: str
    dialog_state: Annotated[
        list[
            Literal[
                "assistant",
                "update_budget",
            ]
        ],
        update_dialog_stack,
    ]

    

In [7]:
#Budget assistant
import sqlite3
from langchain_core.tools import tool
from typing import Optional
from langchain_core.runnables import RunnableConfig
from datetime import date
from decimal import Decimal


@tool
def get_account(config: RunnableConfig) -> list[dict]:
    """Fetch all budget accounts for the user.

    Returns:
        A list of dictionaries where each dictionary contains the budget account details
    """

    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = """
    SELECT *
    FROM 
        budget_account
    """
    cursor.execute(query)
    rows = cursor.fetchall()
    column_names = [column[0] for column in cursor.description]
    results = [dict(zip(column_names, row)) for row in rows]

    cursor.close()
    conn.close()

    return results


@tool
def search_transactions(
    account_id: Optional[int] = None,
    transaction_date: Optional[date] = None,
    description: Optional[str] = None,
    type: Optional[str] = None,
    amount: Optional[Decimal] = None,
) -> list[dict]:
    """Search for transactions based on the account, transaction date, description, type and amount"""
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = "SELECT * FROM budget_transaction WHERE 1=1"
    params = []

    if account_id:
        query += " AND account_id LIKE ?"
        params.append(account_id)
    if transaction_date:
        query += " AND transaction_date LIKE ?"
        params.append(transaction_date)
    if description:
        query += " AND description LIKE ?"
        params.append(f"%{description}%")
    if type:
        query += " AND type LIKE ?"
        params.append(type)
    if amount:
        query += " AND amount LIKE ?"
        params.append(f"%{amount}%")
    
    cursor.execute(query, params)
    results = cursor.fetchall()

    conn.close()

    return [
        dict(zip([column[0] for column in cursor.description], row)) for row in results
    ]


@tool
def create_new_account(
    name: str,
    type: str,
    balance: Decimal
) -> list[dict]:
    """Create a new account"""
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    query = f"""
        INSERT INTO 
            budget_account (name, type, balance)
        VALUES 
            ({name}, 
            {type}, 
            {balance})
    """
    
    cursor.execute(query)
    
    if cursor.rowcount > 0:
        conn.close()
        return f"Account {name} created successfully."
    else:
        conn.close()
        return f"Account creation failed."

@tool
def create_new_transaction(
    account_id: int,
    transaction_date: date,  # date object from datetime
    description: str,
    type: str,
    amount: float
) -> str:
    """Create a new transaction and update the account balance."""
    conn = sqlite3.connect(db)
    cursor = conn.cursor()

    # Convert date to string format SQLite understands
    date_str = transaction_date.isoformat()

    # Insert transaction
    insert_query = """
        INSERT INTO budget_transaction 
        (account_id, transaction_date, description, type, amount)
        VALUES (?, ?, ?, ?, ?)
    """
    cursor.execute(insert_query, 
                    (account_id, date_str, description, type, amount))

    # Update account balance
    update_amount = -amount if type.upper() == 'CREDIT' else amount
    update_query = """
        UPDATE budget_account
        SET balance = balance + ?
        WHERE id = ?
    """
    # Use tuple instead of set for parameters
    cursor.execute(update_query, (update_amount, account_id))

    conn.commit()
    return "Transaction created successfully."

In [8]:

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable, RunnableConfig

from pydantic import BaseModel, Field


class Assistant:
    def __init__(self, runnable: Runnable):
        self.runnable = runnable

    def __call__(self, state: State, config: RunnableConfig):
        while True:
            result = self.runnable.invoke(state)

            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                break
        return {"messages": result}
    
class CompleteOrEscalate(BaseModel):
    """A tool to mark the current task as completed and/or to escalate control of the dialog to the main assistant,
    who can re-route the dialog based on the user's needs."""

    cancel: bool = True
    reason: str

    class Config:
        json_schema_extra = {
            "example": {
                "cancel": True,
                "reason": "User changed their mind about the current task.",
            },
            "example 2": {
                "cancel": True,
                "reason": "I have fully completed the task.",
            },
            "example 3": {
                "cancel": False,
                "reason": "I need to search the user's emails or calendar for more information.",
            },
        }

budget_management_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a specialized assistant for handling budget updates. "
            " The primary assistant delegates work to you whenever the user needs help updating their budget accounts and transactions. "
            "Confirm the updated account and transaction details with the user and inform them of the resulting balance changes of the accounts if any. "
            " When searching, be persistent. Expand your query bounds if the first search returns no results. "
            "If you need more information or the user changes their mind, escalate the task back to the main assistant."
            " Remember that a transaction isn't completed until after the relevant tool has successfully been used."
            "\n\nIf the user needs help, and none of your tools are appropriate for it, then"
            ' "CompleteOrEscalate" the dialog to the host assistant. Do not waste the user\'s time. Do not make up invalid tools or functions.',
        ),
        ("placeholder", "{messages}"),
    ]
)

update_budget_safe_tools = [get_account, search_transactions]
update_budget_sensitive_tools = [create_new_account, create_new_transaction]
update_budget_tools = update_budget_safe_tools + update_budget_sensitive_tools
update_budget_runnable = budget_management_prompt | llm.bind_tools(
    update_budget_tools + [CompleteOrEscalate]
)

class ToUpdateBudgetAssistant(BaseModel):
    """Transfers work to a specialized assistant to handle car budget updates."""

    account_id: int = Field(description="The id of the budget account that the user is refering to.")
    account_name: str = Field(description="The name of the budget account that the user is refering to")
    transaction_date: date = Field(description="The date of the budget transaction.")
    description: str = Field(description="Additional information from the user regarding the transaction.")
    type: str = Field(description="The credit nature of the transaction (can be either debit or credit).")
    amount: Decimal = Field(description="The numerical amount which the budget account has changed due to the transaction")

    class Config:
        json_schema_extra = {
            "example": {
                "account_id": "1",
                "account_name": "Food",
                "transaction_date": "2023-07-05",
                "description": "Food",
                "amount": "30.00",
            }
        }

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful customer support assistant for a Financial Tracking application. "
            "Your primary role is to help the user to keep track of transactions in their budget accounts. "
            "If a customer requests to insert a new transaction, delegate the task to the appropriate specialized assistant by invoking the corresponding tool. "
            "You are not able to make these types of changes yourself. "
            "Only the specialized assistants are given permission to do this for the user. "
            "The user is not aware of the different specialized assistants, so do not mention them; just quietly delegate through function calls. "
            "Provide detailed information to the customer, and always double-check the database before concluding that information is unavailable. "
            "When searching, be persistent. Expand your query bounds if the first search returns no results. "
            "If a search comes up empty, expand your search before giving up. ",
        ),
        ("placeholder", "{messages}"),
    ]
)
primary_assistant_tools = [
    TavilySearchResults(max_results=1),
    search_transactions,
    get_account
]
assistant_runnable = primary_assistant_prompt | llm.bind_tools(
    primary_assistant_tools
    + [
        ToUpdateBudgetAssistant,
    ]
)

In [9]:
from typing import Callable

from langchain_core.messages import ToolMessage


def create_entry_node(assistant_name: str, new_dialog_state: str) -> Callable:
    def entry_node(state: State) -> dict:
        tool_call_id = state["messages"][-1].tool_calls[0]["id"]
        return {
            "messages": [
                ToolMessage(
                    content=f"The assistant is now the {assistant_name}. Reflect on the above conversation between the host assistant and the user."
                    f" The user's intent is unsatisfied. Use the provided tools to assist the user. Remember, you are {assistant_name},"
                    " and the creation, update, and other action is not complete until after you have successfully invoked the appropriate tool."
                    " If the user changes their mind or needs help for other tasks, call the CompleteOrEscalate function to let the primary host assistant take control."
                    " Do not mention who you are - just act as the proxy for the assistant.",
                    tool_call_id=tool_call_id,
                )
            ],
            "dialog_state": new_dialog_state,
        }

    return entry_node

# This node will be shared for exiting all specialized assistants
def pop_dialog_state(state: State) -> dict:
    """Pop the dialog stack and return to the main assistant.

    This lets the full graph explicitly track the dialog flow and delegate control
    to specific sub-graphs.
    """
    messages = []
    if state["messages"][-1].tool_calls:
        # Note: Doesn't currently handle the edge case where the llm performs parallel tool calls
        messages.append(
            ToolMessage(
                content="Resuming dialog with the host assistant. Please reflect on the past conversation and assist the user as needed.",
                tool_call_id=state["messages"][-1].tool_calls[0]["id"],
            )
        )
    return {
        "dialog_state": "pop",
        "messages": messages,
    }

In [10]:
from typing import Literal

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.prebuilt import tools_condition
from langgraph.graph import END, START

builder = StateGraph(State)

# If initial user information is required
# def user_info(state: State):
#     return {"user_info": fetch_user_flight_information.invoke({})}

def route_update_budget(
    state: State,
):
    route = tools_condition(state)
    if route == END:
        return END
    tool_calls = state["messages"][-1].tool_calls
    did_cancel = any(tc["name"] == CompleteOrEscalate.__name__ for tc in tool_calls)
    if did_cancel:
        return "leave_skill"
    safe_toolnames = [t.name for t in update_budget_safe_tools]
    if all(tc["name"] in safe_toolnames for tc in tool_calls):
        return "update_budget_safe_tools"
    return "update_budget_sensitive_tools"

builder.add_node("primary_assistant", Assistant(assistant_runnable))
builder.add_node("leave_skill", pop_dialog_state)
builder.add_node("enter_update_budget", create_entry_node("Budget Updates & Creation Assistant", "update_budget"))
builder.add_node("update_budget", Assistant(update_budget_runnable))
builder.add_node(
    "update_budget_sensitive_tools",
    create_tool_node_with_fallback(update_budget_sensitive_tools),
)
builder.add_node(
    "update_budget_safe_tools",
    create_tool_node_with_fallback(update_budget_safe_tools),
)


builder.add_edge("enter_update_budget", "update_budget")
builder.add_edge("update_budget_sensitive_tools", "update_budget")
builder.add_edge("update_budget_safe_tools", "update_budget")
builder.add_conditional_edges(
    "update_budget",
    route_update_budget,
    ["update_budget_sensitive_tools", "update_budget_safe_tools", "leave_skill", END],
)

builder.add_edge("leave_skill", "primary_assistant")

<langgraph.graph.state.StateGraph at 0x2a0c1f08530>

In [11]:
# Primary assistant

def route_primary_assistant(
    state: State,
):
    route = tools_condition(state)
    if route == END:
        return END
    tool_calls = state["messages"][-1].tool_calls
    if tool_calls:
        if tool_calls[0]["name"] == ToUpdateBudgetAssistant.__name__:
            return "enter_update_budget"
        return "primary_assistant_tools"
    raise ValueError("Invalid route")

builder.add_node(
    "primary_assistant_tools", create_tool_node_with_fallback(primary_assistant_tools)
)

# Each delegated workflow can directly respond to the user
# When the user responds, we want to return to the currently active workflow
def route_to_workflow(
    state: State,
) -> Literal[
    "primary_assistant",
    "update_budget",
]:
    """If we are in a delegated state, route directly to the appropriate assistant."""
    dialog_state = state.get("dialog_state")
    if not dialog_state:
        return "primary_assistant"
    return dialog_state[-1]

# The assistant can route to one of the delegated assistants,
# directly use a tool, or directly respond to the user
builder.add_conditional_edges(
    "primary_assistant",
    route_primary_assistant,
    [
        "enter_update_budget",
        "primary_assistant_tools",
        END,
    ],
)
builder.add_edge("primary_assistant_tools", "primary_assistant")
builder.add_conditional_edges(START, route_to_workflow)

# Compile graph
memory = MemorySaver()
app = builder.compile(
    checkpointer=memory,
    # Let the user approve or deny the use of sensitive tools
    interrupt_before=[
        "update_budget_sensitive_tools",
    ],
)

In [12]:
import shutil
import uuid

# Let's create an example conversation a user might have with the assistant
tutorial_questions = [
    "How many accounts do i have?",
]

thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        # Checkpoints are accessed by thread_id
        "thread_id": thread_id,
    }
}

_printed = set()
# We can reuse the tutorial questions from part 1 to see how it does.
for question in tutorial_questions:
    events = app.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)
    snapshot = app.get_state(config)
    while snapshot.next:
        # We have an interrupt! The agent is trying to use a tool, and the user can approve or deny it
        # Note: This code is all outside of your graph. Typically, you would stream the output to a UI.
        # Then, you would have the frontend trigger a new run via an API call when the user has provided input.
        try:
            user_input = input(
                "Do you approve of the above actions? Type 'y' to continue;"
                " otherwise, explain your requested changed.\n\n"
            )
        except:
            user_input = "y"
        if user_input.strip() == "y":
            # Just continue
            result = app.invoke(
                None,
                config,
            )
        else:
            # Satisfy the tool invocation by
            # providing instructions on the requested changes / change of mind
            result = app.invoke(
                {
                    "messages": [
                        ToolMessage(
                            tool_call_id=event["messages"][-1].tool_calls[0]["id"],
                            content=f"API call denied by user. Reasoning: '{user_input}'. Continue assisting, accounting for the user's input.",
                        )
                    ]
                },
                config,
                
            )
        snapshot = app.get_state(config)


How many accounts do i have?
Tool Calls:
  get_account (call_0_2acb2bf5-4e68-49b2-a124-b63970fcffea)
 Call ID: call_0_2acb2bf5-4e68-49b2-a124-b63970fcffea
  Args:
Name: get_account

[]


UnprocessableEntityError: Failed to deserialize the JSON body into the target type: messages[3]: invalid type: sequence, expected a string at line 1 column 1252