<a href="https://colab.research.google.com/github/mgfrantz/CTME-llm-lecture-resources/blob/main/labs/rag_and_agents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Agents and RAG

In this lab, we're going to create a customer service bot.
We will use LLM fundamentals and create modules in a step-by-step approach and by the end of the day we'll have a basic customer service chatbot.

The chatbot will have several capabilities - it will be able to look up and update customer info, look up and update customer orders, and answer questions about product offerings.

Tomorrow, we're going to train a *much* smaller LLM on the same task.
But as of now, we don't have any training data!
To do this, we're going to - you guessed it - use LLMs as fake customers to generate data that we can use to fine-tune an LLM tomorrow.

# Setup



In [None]:
# Installs
!pip install -Uqqqq \
    "llama-index>=0.11.17" \
    "llama-index-core>=0.10.43" \
    "openinference-instrumentation-llama-index>=2" \
    "opentelemetry-proto>=1.12.0" \
    arize-phoenix-otel \
    fastembed \
    nest-asyncio \
    llama-index-callbacks-arize-phoenix \
    llama-index-readers-database \
    llama-index-llms-openai \
    google-generativeai \
    llama-index-embeddings-fastembed \
    llama-index-readers-database \
    llama-index-agent-openai \
    --progress-bar off

In [None]:
# Environment variables
from google.colab import userdata
import os
os.environ['GEMINI_API_KEY'] = userdata.get('GEMINI_API_KEY')
os.environ['GOOGLE_API_KEY'] = userdata.get('GEMINI_API_KEY')
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')
os.environ['HF_TOKEN'] = userdata.get('HF_TOKEN')

In [None]:
# If you did not run the previous notebook, use the data from the repo
!git clone https://github.com/mgfrantz/CTME-llm-lecture-resources
!cp CTME-llm-lecture-resources/data/ecommerce.db .

In [None]:
# Copy the data we generated yesterday to the current working directory
if not os.path.exists('drive/MyDrive/CTME-LLM-labs/ecommerce.db'):
    print("The ecommerce.db database does not exist. Please make sure you're connected to Google Drive or upload it to the Colab notebook or re-run lesson 1.")
!cp drive/MyDrive/CTME-LLM-labs/ecommerce.db .

In [None]:
# Imports
import sqlite3
import pandas as pd
from IPython.display import display
from rich import print
from typing import Literal, List, Dict
from pydantic import BaseModel, Field
from time import sleep
from sqlalchemy import create_engine
import json
import phoenix as px
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from openinference.instrumentation import using_metadata
from phoenix.otel import register
from enum import Enum
from tqdm.auto import tqdm
from llama_index.core import VectorStoreIndex, Document
from llama_index.core.tools import FunctionTool, QueryEngineTool, RetrieverTool
from llama_index.core.agent import ReActAgent
from llama_index.core.llms import ChatMessage
from llama_index.core import PromptTemplate
from llama_index.readers.database import DatabaseReader
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.llms.openai import OpenAI
import asyncio
import nest_asyncio
# nest_asyncio.apply()

# Utility functions

Here we define some utility functions - `query` and `execute` - that help us interact with our SQL database.

In [None]:
def query(q:str, db:str='ecommerce.db') -> pd.DataFrame:
    """
    Executes a SQL query against the SQLite database and returns the result as a pandas DataFrame.
    Use this function when you want to query a database and return results.

    Args:
        q (str): The SQL query to execute.
        db (str, optional): The path to the SQLite database file. Defaults to 'ecommerce.db'.

    Returns:
        pd.DataFrame: The result of the SQL query as a pandas DataFrame.
    """
    connection = sqlite3.connect(db)
    cursor = connection.cursor()
    cursor.execute(q)
    result = cursor.fetchall()
    df = pd.DataFrame(result)
    df.columns = [i[0] for i in cursor.description]
    connection.close()
    return df

def execute(q:str, db:str='ecommerce.db') -> None:
    """
    Executes an SQL query against the SQLite database.
    Use this when you want to run commands like updates, inserts, or deletes that don't return results.

    Args:
        q (str): The SQL query to execute.
        db (str, optional): The path to the SQLite database file. Defaults to 'ecommerce.db'.

    Returns:
        None
    """
    connection = sqlite3.connect(db)
    cursor = connection.cursor()
    cursor.execute(q)
    connection.commit()
    connection.close()

In [None]:
for table in ['customers', 'items', 'orders']:
    print(f'Table: {table}')
    display(query(f'SELECT * FROM {table} ORDER BY random() LIMIT 3;'))

In [None]:
def instrument():
    """
    Starts a poenix session.

    Returns:
        session: The phoenix session object.
    """
    session = px.launch_app()
    tracer_provider = register(endpoint="http://127.0.0.1:6006/v1/traces")
    LlamaIndexInstrumentor().instrument(skip_dep_check=True, tracer_provider=tracer_provider)
    return session

def end_session(session):
    """
    Ends a phoenix session.

    Args:
        session: The phoenix session object.

    Returns:
        None
    """
    !rm {session.database_url.replace('sqlite:///', '')}
    session.end()

# Problem statement

We are building a chatbot to automate simple customer interactions.
We can assume that the customers are already logged in, so we can provide the customer's identity to the LLM to be able to retrieve relevant information.
Here are the actions we will support:

- Update customer info
  - Update email
  - Update phone number
  - Update address
  - Change pin
  - Close account
- Order information
    - Check order information
    - Cancel order
    - Update order address
- Ask question about a product

# Getting started with ReAct: Customer infrmation

In today's lab, we'll be useing the ReAct method.
In this method, we run an LLM in a loop to try and determine what the best action is at each time step until an instruction can be answered without any additional steps.
These steps include tool calls.

Let's start with a simple use case - we want the LLM to be able to look up information about the customer given the customer's ID.
Below, we define and document a function that queries our SQLite DB for all the information about a given customer.

We then use `llama-index`'s `FunctionTool` class to turn this into a tool that we can provide to the agent later.

In [None]:
def get_customer_information_tool(customer_id:int):
    def get_customer_information(*args, **kwargs) -> dict:
        """Use when you want to information about the customer.

        Does not require any arguments.

        Returns:
            dict: A dictionary containing the customer's information.
        """
        data = query(f"SELECT name, email, phone, street_address, city, state, zip_code, country FROM customers WHERE customer_id = {customer_id}")
        if len(data) == 0:
            return {'error': 'Customer information not found on file.'}
        else:
            return data.to_dict(orient='records')[0]

    return FunctionTool.from_defaults(fn=get_customer_information)

During testing, it's useful to just grab a random customer ID.
We shouldn't use this function as an LLM tool, but it can help us test our functions.
Remember, these are all synthetic customers so we're not actually revealing any sensitive customer information.

In [None]:
def get_random_id():
    return query("SELECT customer_id FROM customers ORDER BY random() LIMIT 1;").iloc[0,0]

Before we create our ReAct agent, let's actually take a look at the default system prompts.
One caveat with high-level frameworks is that functionality often come with pre-packaged prompts that work decently out-of-the-box, but work much better with some slight tweaking.

In [None]:
llm = OpenAI(model='gpt-4o-mini')
def get_base_tools(customer_id=get_random_id()):
    return [get_customer_information_tool(customer_id)]

In [None]:
# Show the default system prompt
print(ReActAgent.from_tools(tools=get_base_tools(), llm=llm).get_prompts()['agent_worker:system_prompt'].template)

After reading the prompt, we can make some tweaks.
Here are some tweaks I made that helped with the agent's behavior:

- Give the agent more context as to how to behave. It knew nothing about the context in which it was operating, types of problems it should solve, and what to do in edge cases.
- Give it the customer's ID. In this case, let's just make the assumption that the customer has already logged in, so there's an ID we can just make available to the LLM for function calls.

Feel free to make any additional adjustments you want to the prompt.
This is the default prompt we will use for all agent behavior in the rest of the lesson.

In [None]:
AGENT_SYSTEM_PROMPT = PromptTemplate("""\
You are a helpul customer service assistant for MikeCorp, an ecommerce company selling electronics. \
You are designed to help with a variety of problems a customer may have, including account management, order management, and product-related queries. \
If you are ever unsure what to do, please escalate.

## Tools

You have access to a wide variety of tools. You are responsible for using the tools in any sequence you deem
appropriate to complete the task at hand.
This may require breaking the task into subtasks and using different tools to complete each subtask.

You have access to the following tools:
{tool_desc}


## Output Format

Please answer in English using the following format:

```
Thought: I need to use a tool to help me answer the question.
Action: tool name (one of {tool_names}) if using a tool.
Action Input: the input to the tool, in a JSON format representing the kwargs (e.g. {{"input": "hello world", "num_beams": 5}})
```

Please ALWAYS start with a Thought.

NEVER surround your response with markdown code markers. \
You may use code markers within your response if you need to.

Please use a valid JSON format for the Action Input. Do NOT do this {{'input': 'hello world', 'num_beams': 5}}.

If this format is used, the user will respond in the following format:

```
Observation: tool response
```

You should keep repeating the above format till you have enough information \
to answer the question without using any more tools. \
At that point, you MUST respond in the one of the following two formats:

```
Thought: I can answer without using any more tools. I'll use the user's language to answer
Answer:
```

```
Thought: I cannot answer the question with the provided tools.
Answer:
```

## Current Conversation

Below is the current conversation consisting of interleaving human and assistant messages. \
Conversation:
""")

Finally, let's create a function that completes the default system prompt with the customer's ID, sets it as the ReAct agent's prompt, and returns the agent.
This is how we will be creating agents in the rest of the lab.

In [None]:
def create_agent(llm, tools, system_prompt=AGENT_SYSTEM_PROMPT, verbose=False):
    agent = ReActAgent.from_tools(tools, llm=llm, verbose=verbose)
    prompt_dict = agent.get_prompts()
    prompt_dict['agent_worker:system_prompt'] = system_prompt
    agent.update_prompts(prompt_dict)
    return agent

## Tracing

It's always a good idea to keep track of the inputs and outputs for your LLM calls.
In this case, we need it for training data.
It can also really help us debug when we're not getting what we expect from our LLM apps.

In this case, we will set up `phoenix`, a UI that ***traces*** all of our retrievals, function calls, and LLM calls.
It also allows us to create and annotate datasets and download the inputs and outputs in a format we can adapt for fine tuning tomorrow.

We've created several utility functions for the open source tracing utility `phoenix`.
For now, we will use it for observability, but later on we will use it to export LLM fine-tuning data for tomorrow's lab.
The usage is demonstrated below:

```python
session = istrument()
# ... your LLM/agent calls ...
end_session(session) # clears all traces
```

We added the ability to end the session and clear all traces because this makes it easier to start over when we want to log LLM traces for fine-tuning without including any of our development work.

Now let's create our first agent and ask it a question!
We can see that the agent can correctly use the tool we provided.

In [None]:
tools = get_base_tools()
agent = create_agent(llm, tools, AGENT_SYSTEM_PROMPT, verbose=True)

In [None]:
session = instrument()

In [None]:
agent.chat(f"What is the address you have on file for me?")

# Exercise 1: User management tools

Now that we've gone over the basics of how to create a working agent, the bulk of our work is to create the tools.
In this section, your task is to create several user management tools.
Make sure to create detailed docstrings, use type hints, and return text that describes the outcome of the action.

In [None]:
def get_update_pin(customer_id:int):
    raise NotImplementedError()

In [None]:
def get_update_address(customer_id:int):
    raise NotImplementedError()

In [None]:
def get_update_phone_number(customer_id:int):
    raise NotImplementedError()

In [None]:
def get_user_management_tools(customer_id=get_random_id()):
    return [
        get_update_pin(customer_id),
        get_update_address(customer_id),
        get_update_phone_number(customer_id)
    ]

# Exercise 2: Order Management

In this section, your task is to create several order management tools.
Make sure to create detailed docstrings, use type hints, and return text that describes the outcome of the action or returns appropriate information for the LLM.

In [None]:
def get_list_orders(customer_id:int):
    raise NotImplementedError()

In [None]:
def get_cancel_order(customer_id:int):
    raise NotImplementedError()

In [None]:
def get_update_order_address(customer_id:int):
    raise NotImplementedError()

In [None]:
def get_order_tools(customer_id=get_random_id()):
    return [
        get_list_orders(customer_id),
        get_cancel_order(customer_id),
        get_update_order_address(customer_id)
    ]

# Inventory tools with RAG

For other operations, we saw that simple function calling gives most of the context we need.
But for inventory, we can't load all the items into the LLM context at once.
So we're going to use RAG to solve this.

`llama-index` has a really great out-of-the-box RAG framework.
It centers around creating vector store indices, which can be used as retrievers or objects called `QueryEngine`s, that answer questions about the data.
In this case, let's create one or more retrievers to use to look up items for the agent in order to answer customer responses.

In [None]:
# Load documents
engine = create_engine('sqlite:///ecommerce.db')
docs = DatabaseReader(engine=engine).load_data(query="SELECT item_id, description, price, quantity, name AS text FROM items")

In [None]:
embed_model = FastEmbedEmbedding(model_name='mixedbread-ai/mxbai-embed-large-v1')

In [None]:
index = VectorStoreIndex.from_documents(docs, embed_model=embed_model, show_progress=True)

In [None]:
retriever = index.as_retriever(similarity_top_k=5)
print(retriever.retrieve("headphones"))

Now that we have our vector store indices, we can actually use the retrievers as tools.
When we ask questions answerable via product names or descriptions,

In [None]:
inventory_tools = [
    RetrieverTool.from_defaults(index.as_retriever(), description="Useful when you need to answer a question by searching items.", name='search_items'),
]

# Exercise 3: Simulating and evaluating conversations

Now we need to generate some data.
One option would be to spend hours or days chatting with our agent.
If we have actual chats from support represntatives, we could test how the LLM responds to those same user queries.
But since our data is synthetic, we don't have that.

What we *can* do is simulate customers with particular goals and see if they can chat with our agent.
Your goal is to create a function that interacts with our agent to accomplish a particular goal like updating their phone number or canceling an order.

We also designed this agent so it is fairly limited in what it can do; otherwise it escalates.
So we can actually measure when the desired task was completed or not by seeing if the correct function was called.
Let's return a score of 1 for each trace where the correct function was called, and 0 when the incorrect function is called.
This way, we can 1) evaluate how well our model works for each function type, and 2) filter out bad traces from our fine tuning data.

To record metadata for an LLM or agent call, use the context manager as shown here:
```python
with using_metadata(metadata={'attribute_1':'value_1', ..., 'attribute_n': 'value_n'}):
    response = await agent.achat(...)
```
You can use this context manager to track things like whether the LLM call was the customer or agent, the tool call we expect, or any other information useful for evaluation.

Hints:

- have the "customer LLM" output a special word (I used `'<|DONE|>'`) when their task is complete
- use a loop and alternate between customer llm and agent llm
- set a max number of messages per test - tests that go more than 3 or 4 turns probably are looping into some bad behavior
- try to be extremely procedural in your instructions to the LLM
- you will be making a lot of LLM calls - use `achat` or other asynchronous functionality to speed things up a bit.


In [None]:
def get_all_tools(customer_id=get_random_id()):
    return (
        get_base_tools(customer_id)
        + get_user_management_tools(customer_id)
        + get_order_tools(customer_id)
        + inventory_tools
    )

In [None]:
for tool in get_all_tools():
    print(tool.metadata.name)

In [None]:
def get_called_tool_names(agent):
    """
    Returns a list of all the tool names called by the agent.
    """
    tool_calls = []
    for v in agent.state.task_dict.values():
        for source in v.task.extra_state.get('sources', []):
            tool_calls.append(source.tool_name)
    return tool_calls

async def complete_one_task(task, target_fn, customer_id, max_iter=4):
    raise NotImplementedError()

async def complete_all_tasks(customer_id, session=None, verbose=False, max_iter=4, pbar=None):
    raise NotImplementedError()

# Simulate customer interactions

Now that our LLM calls are being traced, let's run our simulation.
Let's select several customers at random and run our customer LLM against our agent.
This will generate lots of traces - one input and one output for every LLM call!

In [None]:
n_train_customers = 8
n_eval_customers = 2
customer_ids = query(f"SELECT customer_id FROM customers ORDER BY random() LIMIT {n_train_customers + n_eval_customers};").customer_id.to_list()
train_customers = customer_ids[:n_train_customers]
eval_customers = customer_ids[n_train_customers:]
assert len(set(train_customers).intersection(set(eval_customers))) == 0
assert len(train_customers) == n_train_customers
assert len(eval_customers) == n_eval_customers

In [None]:
train_results = []
for customer_id in tqdm(train_customers):
    _df = await complete_all_tasks(customer_id)
    train_results.append(_df)
    sleep(60) # add this because my token rate limit is low and I need to slow request rate
train_df = pd.concat(train_results)

In [None]:
eval_results = []
for customer_id in tqdm(eval_customers):
    _df = await complete_all_tasks(customer_id)
    eval_results.append(_df)
    sleep(60) # add this because my token rate limit is low and I need to slow request rate
eval_df = pd.concat(eval_results)

In [None]:
train_df.head()

In [None]:
eval_df.head()

# Export spans for fine tuning

Then, we will run the code to export it to Google Drive.
Tomorrow, we will tokenize and fine tune on this dataset.

In [None]:
# map conversation to the format expected by the sharegpt format
key_mapping = {
    'message.content': 'value',
    'message.role': 'from'
}
def process_keys(l):
    return [{key_mapping[k]: v for k, v in d.items()} for d in l]

from_value_mapping = {
    'user': 'human',
    'assistant': 'gpt',
    'system': 'system'
}

# Add the role key to the conversations - helps with input masking
def apply_value_mapping(l, role):
    to_return = []
    for d in l:
        d['from'] = from_value_mapping[d['from']]
        to_return.append(d)
    return to_return

def to_conversations(row):
    messages = row.input + row.output

    return {'conversations': messages}


def process_span_df(df, path, remove_failed_tasks_from_json=True):
    input_output = df[['attributes.llm.input_messages', 'attributes.llm.output_messages', 'customer_id', 'target_fn', 'task_score']]
    input_output = input_output.rename(columns={'attributes.llm.input_messages': 'input', 'attributes.llm.output_messages': 'output'})
    input_output.loc[:, ['input', 'output']] = input_output.loc[:, ['input', 'output']].map(process_keys)
    input_output.input = input_output.input.map(lambda x: apply_value_mapping(x, 'input'))
    input_output.output = input_output.output.map(lambda x: apply_value_mapping(x, 'output'))
    input_output['conversations'] = input_output.apply(to_conversations, axis=1)
    with open(path, 'w') as f:
        _input_output = input_output.copy()
        if remove_failed_tasks_from_json:
            _input_output = _input_output[_input_output.task_score == 1]
        f.write(_input_output.conversations.to_json(orient='records', lines=True))
    return input_output

In [None]:
t = process_span_df(train_df, 'train.jsonl')
e = process_span_df(eval_df, 'eval.jsonl')

In [None]:
!head -n 1 train.jsonl | python -m json.tool

In [None]:
!rm /content/drive/MyDrive/CTME-LLM-labs/*.jsonl

In [None]:
# Move the data to where we can find it
!cp *.jsonl /content/drive/MyDrive/CTME-LLM-labs/
t.to_parquet('/content/drive/MyDrive/CTME-LLM-labs/train.parquet')
e.to_parquet('/content/drive/MyDrive/CTME-LLM-labs/eval.parquet')

In [None]:
!ls -al /content/drive/MyDrive/CTME-LLM-labs/