[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/tiagodsilva/llm-programs/blob/main/01-react/01-react.ipynb)

In [None]:
#@title Bootstrap (click to expand)

try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    print("Running in Colab â€” installing dependencies...")
    
    # Fetch pyproject.toml
    !curl -L -o pyproject.toml https://raw.githubusercontent.com/tiagodsilva/llm-programs/main/requirements.txt
    
    # Install poetry and dependencies
    %pip install -r requirements.txt

## Connect to the LLM provider

In [None]:
import os
from getpass import getpass


def _set_if_undefined(var: str) -> None:
    if os.environ.get(var):
        return
    os.environ[var] = getpass(var)


_set_if_undefined("OPENAI_API_KEY")

In [26]:
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model_name="gpt-4.1-mini",
    temperature=0.0,
)

## Creating a stock database

We will create an in-memory SQLite database for illustration purposes.  

In [3]:
from sqlalchemy import MetaData

metadata_obj = MetaData()

In [4]:
from sqlalchemy import Column, Integer, String, Table, Date, Float

stocks = Table(
    "stocks",
    metadata_obj,
    Column("obs_id", Integer, primary_key=True),
    Column("stock_ticker", String(4), nullable=False),
    Column("price", Float, nullable=False),
    Column("date", Date, nullable=False),
)

In [11]:
from sqlalchemy import create_engine

database_file = "temp_stocks_db.db"

if os.path.exists(database_file):
    os.remove(database_file)

engine = create_engine(
    f"sqlite:///{database_file}", connect_args={"check_same_thread": False}
)
metadata_obj.create_all(engine)

In [12]:
from datetime import datetime

observations = [
    [1, "ABC", 200, datetime(2023, 1, 1)],
    [2, "ABC", 208, datetime(2023, 1, 2)],
    [3, "ABC", 232, datetime(2023, 1, 3)],
    [4, "ABC", 225, datetime(2023, 1, 4)],
    [5, "ABC", 226, datetime(2023, 1, 5)],
    [6, "XYZ", 810, datetime(2023, 1, 1)],
    [7, "XYZ", 803, datetime(2023, 1, 2)],
    [8, "XYZ", 798, datetime(2023, 1, 3)],
    [9, "XYZ", 795, datetime(2023, 1, 4)],
    [10, "XYZ", 791, datetime(2023, 1, 5)],
]

In [13]:
from sqlalchemy import insert


def insert_obs(obs):
    stmt = insert(stocks).values(
        obs_id=obs[0],
        stock_ticker=obs[1],
        price=obs[2],
        date=obs[3],
    )

    with engine.begin() as conn:
        conn.execute(stmt)


for obs in observations:
    insert_obs(obs)

In [14]:
from langchain_community.utilities import SQLDatabase

db = SQLDatabase(engine=engine)

## Creating the SQL agent

In [15]:
system_prompt = """
You are an agent designed to interact with a SQL database.
Given an input question, create a syntactically correct {dialect} query to run,
then look at the results of the query and return the answer. Unless the user
specifies a specific number of examples they wish to obtain, always limit your
query to at most {top_k} results.

You can order the results by a relevant column to return the most interesting
examples in the database. Never query for all the columns from a specific table,
only ask for the relevant columns given the question.

You MUST double check your query before executing it. If you get an error while
executing a query, rewrite the query and try again.

DO NOT make any DML statements (INSERT, UPDATE, DELETE, DROP etc.) to the
database.

To start you should ALWAYS look at the tables in the database to see what you
can query. Do NOT skip this step.

Then you should query the schema of the most relevant tables.
""".format(
    dialect=db.dialect,
    top_k=5,
)

In [16]:
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
from langchain.agents import create_agent

db = SQLDatabase(engine=engine, include_tables=["stocks"])
toolkit = SQLDatabaseToolkit(db=db, llm=llm)

agent = create_agent(
    model=llm,
    tools=toolkit.get_tools(),
    system_prompt=system_prompt,
    # debug=True, Set debug=True to inspect the LLM reasoning.
)

In [17]:
db.get_usable_table_names()

['stocks']

In [18]:
query = (
    "What is the multiplication of the ratio between stock "
    "prices for 'ABC' and 'XYZ' in January 3rd and the ratio "
    "between the same stock prices in January the 4th?"
)

result = agent.invoke(
    {"messages": [{"role": "user", "content": query}]},
)

In [19]:
print(result["messages"][-1].content)

The prices for 'ABC' and 'XYZ' on January 3rd are 232.0 and 798.0 respectively.
The prices for 'ABC' and 'XYZ' on January 4th are 225.0 and 795.0 respectively.

Now, let's calculate the multiplication of the ratio between stock prices for 'ABC' and 'XYZ' on January 3rd and the ratio between the same stock prices on January 4th:

Ratio on January 3rd = 232.0 / 798.0
Ratio on January 4th = 225.0 / 795.0

Multiplication of the two ratios = (232.0 / 798.0) * (225.0 / 795.0) = 0.0823 (approximately).


## Conversational agents

In [20]:
from langchain_core.tools import tool
import math
import numexpr


@tool  # Docstrings are REQUIRED for tools
def calculator(expression: str) -> str:
    """Calculate expression using Python's numexpr library.

    Examples:
        "37593 * 67" for "37593 times 67"
        "37593**(1/5)" for "37593^(1/5)"
    """
    local_dict = {"pi": math.pi, "e": math.e}
    return str(
        numexpr.evaluate(
            expression.strip(),
            global_dict={},  # restrict access to globals
            local_dict=local_dict,  # add common mathematical functions
        )
    )

In [21]:
@tool
def final_answer(answer: str, tools_used: list[str]) -> str:
    """Use this tool to provide a final answer to the user.
    The answer should be in natural language as this will be provided
    to the user directly. The tools_used must include a list of tool
    names that were used within the `scratchpad`.
    """
    return {"answer": answer, "tools_used": tools_used}


# Add tools
tools = [final_answer, calculator]

In [22]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.base import RunnableSerializable

# First, create a prompt that forces the LLM to analyze the history
history_analysis_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            (
                "Analyze the conversation history below and identify any calculations that have already been performed. "
                "Extract the results of these calculations so they can be reused instead of recalculating."
            ),
        ),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "What calculations have been done and what are their results?"),
    ]
)

# Then the main agent prompt
agent_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            (
                "You're a helpful assistant. When answering a user's question "
                "you should first use one of the tools provided. After using a "
                "tool the tool output will be provided in the "
                "'scratchpad' below. If you have an answer in the "
                "scratchpad you should not use any more tools and "
                "instead answer directly to the user. "
                "IMPORTANT: Use the analysis of previous calculations to avoid recalculating."
            ),
        ),
        ("human", "Previous calculations analysis: {history_analysis}"),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

# define the agent runnable with history analysis first
agent: RunnableSerializable = (
    {
        "input": lambda x: x["input"],
        "chat_history": lambda x: x["chat_history"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", []),
    }
    | {
        "history_analysis": lambda x: llm.invoke(
            history_analysis_prompt.format_messages(chat_history=x["chat_history"])
        ).content,
        "input": lambda x: x["input"],
        "agent_scratchpad": lambda x: x.get("agent_scratchpad", []),
    }
    | agent_prompt
    | llm.bind_tools(tools, tool_choice="auto")
)

# create tool name to function mapping as per guide
name2tool = {tool.name: tool.func for tool in tools}

In [None]:
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.chat_history import InMemoryChatMessageHistory


class CustomAgentExecutor:
    def __init__(self, max_iterations: int = 5):
        self.max_iterations = max_iterations
        self.memory = (
            InMemoryChatMessageHistory()
        )  # Simple in-memory structure to store conversation history
        self.agent = agent

    def invoke(self, input: str) -> dict:
        # invoke the agent but we do this iteratively in a loop until
        # reaching a final answer
        count = 0
        agent_scratchpad = []

        while count < self.max_iterations:
            # invoke a step for the agent to generate a tool call
            response = self.agent.invoke(
                {
                    "input": input,
                    "chat_history": self.memory.messages,
                    "agent_scratchpad": agent_scratchpad,
                }
            )

            # add initial tool call to scratchpad
            agent_scratchpad.append(response)

            # Handle ALL tool calls, not just the first one
            if response.tool_calls:
                for tool_call_obj in response.tool_calls:
                    tool_name = tool_call_obj["name"]
                    tool_args = tool_call_obj["args"]
                    tool_call_id = tool_call_obj["id"]

                    # execute the tool
                    tool_out = name2tool[tool_name](**tool_args)

                    # add the tool output to the agent scratchpad
                    tool_exec = ToolMessage(
                        content=f"{tool_out}", tool_call_id=tool_call_id
                    )
                    agent_scratchpad.append(tool_exec)

                    # add a print so we can see intermediate steps
                    print(f"{count}: {tool_name}({tool_args}) -> {tool_out}")

                count += 1

                # Check if any tool call is the final answer tool
                if any(tc["name"] == "final_answer" for tc in response.tool_calls):
                    # Get the final answer from the final_answer tool
                    final_tool_call = next(
                        tc for tc in response.tool_calls if tc["name"] == "final_answer"
                    )
                    final_answer = final_tool_call["args"]["answer"]
                    break
            else:
                # no tool call, we have a final answer
                final_answer = response.content
                break

        # Add to conversation history ONLY the human input and final AI response
        # This preserves memory without corrupting it with tool calls
        self.memory.add_messages(
            [HumanMessage(content=input), AIMessage(content=final_answer)]
        )

        # return the final answer in dict form
        return {"output": final_answer}


# Initialize the custom agent executor
conversational_agent = CustomAgentExecutor()

In [24]:
# First question
result = conversational_agent.invoke("What is 10000 * (1 + 0.08)**5?")
print(f"Result: {result['output']}")

0: calculator({'expression': '10000 * (1 + 0.08)**5'}) -> 14693.280768000006
1: final_answer({'answer': 'The value of 10000 * (1 + 0.08)**5 is approximately 14693.28.', 'tools_used': ['functions.calculator']}) -> {'answer': 'The value of 10000 * (1 + 0.08)**5 is approximately 14693.28.', 'tools_used': ['functions.calculator']}
Result: The value of 10000 * (1 + 0.08)**5 is approximately 14693.28.
