# Beeai Example

things to add:  
- adding re-act agent and transfering the websearch from a static step to a tool
- summarizing the past searches and storing them in memory
- adding voice input, output
- adding api call as tool

In [15]:
from langchain_community.utilities import SearxSearchWrapper
from pydantic import Field
import requests

from beeai_framework.backend.chat import ChatModel, ChatModelOutput, ChatModelStructureOutput
from beeai_framework.backend.message import UserMessage
from beeai_framework.template import PromptTemplate, PromptTemplateInput
import traceback
from pydantic import BaseModel, ValidationError
from beeai_framework.workflows.workflow import Workflow, WorkflowError
from pydantic import InstanceOf
from beeai_framework.backend.message import AssistantMessage, SystemMessage
from beeai_framework.memory.unconstrained_memory import UnconstrainedMemory
from beeai_framework.agents.react.agent import ReActAgent
from beeai_framework.tools import Tool, tool
from dotenv import load_dotenv
import os
from langchain_community.tools import TavilySearchResults  # Import LangChain Tavily tool
from typing import Any

from beeai_framework.agents.react.agent import ReActAgent
from beeai_framework.agents.react.types import ReActAgentRunOutput
from beeai_framework.backend.chat import ChatModel
from beeai_framework.emitter.emitter import Emitter, EventMeta
from beeai_framework.emitter.types import EmitterOptions
from beeai_framework.memory.unconstrained_memory import UnconstrainedMemory


In [None]:
os.getcwd()

In [16]:
#Load Tavily API Key

# Check if the file exists
env_path = os.path.join(os.getcwd(), ".env")
print(f"Does .env exist? {'Yes' if os.path.exists(env_path) else 'No'}")

load_dotenv(dotenv_path=env_path)

tavily_api_key = os.getenv("TAVILY_API_KEY")


Does .env exist? Yes


In [17]:
model = ChatModel.from_name("ollama:granite3.1-dense:2b")

In [18]:
# Workflow Overall State
class SearchAgentState(BaseModel):
    question: str
    websearch_query: str | None = None
    search_results: str | None = None
    answer: str | None = None
    memory: InstanceOf[UnconstrainedMemory]

In [19]:
#SearchRAGInput only needed if you are running web search as a structured step in the workflow

# #structured input for RAG Search Agent
# class SearchRAGInput(BaseModel):
#     question: str
#     search_results: str
# #prompt template for question + search results to generate final response
# search_rag_template = PromptTemplate(
#     PromptTemplateInput(
#         schema=SearchRAGInput,
#         template="""Search results:
# Question: {{question}}
# Search Results: {{search_results}}
# Provide a concise answer based on the search results provided. If the results are irrelevant or insufficient, say 'I don't know.' Avoid phrases such as 'According to the results...'.""",
#     )
# )

#structured output for the web search terms used by generate_web_search_terms function
class WebSearchQuery(BaseModel):
    search_query: str = Field(description="The web search query.")

# class WebSearchResults(BaseModel):
#     results: str = Field(description="The web search results from the tool call.")

#structured input for creating search terms step
class QuestionInput(BaseModel):
    question: str

#prompt template to create search terms
search_query_template = PromptTemplate(
    PromptTemplateInput(
        schema=QuestionInput,
        template="""Convert the following question into a concise, effective web search query using keywords and operators for accuracy.
Question: {{question}}""",
    )
)


In [20]:
#Functions for the Re-ACT Agent Process

# Observe the agent
async def observer(emitter: Emitter) -> None:
    emitter.on("*.*", process_agent_events, EmitterOptions(match_nested=True))

#process agent events
async def process_agent_events(event_data: dict[str, Any], event_meta: EventMeta) -> None:
    """Process agent events and log appropriately"""

    if event_meta.name == "error":
        print("Agent 🤖 : ", event_data["error"])
    elif event_meta.name == "retry":
        print("Agent 🤖 : ", "retrying the action...")
    elif event_meta.name == "update":
        print(f"Agent({event_data['update']['key']}) 🤖 : ", event_data["update"]["parsedValue"])

In [None]:
# Web search tool if not using tavily
# search_tool = SearxSearchWrapper(searx_host="http://127.0.0.1:8888")

In [33]:

# Define the tool using the `tool` decorator
@tool
def tavily_search_tool(query: str) -> str:
    """
    Perform a web search for the latest and most relevant information available online.
    
    Args:
        query (str): The search query to look up on the web which comes from the websearch_query.
    
    Returns:
        str: The top search results based on the Tavily API.
    """
    if not isinstance(query, str) or not query.strip():
        raise ValueError("Invalid query input. Expected a non-empty string.")

    tool = TavilySearchResults(
    max_results=5,
    search_depth="advanced",
    include_answer=True,
    include_raw_content=True,
    include_images=False,)

    searchresults = tool.invoke({"query": query})
    return searchresults

In [34]:
#STEP IN WORKFLOW
async def generate_web_search_terms(state: SearchAgentState) -> str:
    print("Step: ", "generate_web_search_terms")
    # Generate a search query
    prompt = search_query_template.render(QuestionInput(question=state.question))
    response: ChatModelStructureOutput = await model.create_structure(
        schema=WebSearchQuery, messages=[UserMessage(prompt)]
    )
    #add the assistant response to websearch_query in state 
    state.websearch_query = response.object["search_query"]
    return "generate_answer"

In [35]:
#add an inital system prompt to memory so that it is not empty when the process kicks off

memory = UnconstrainedMemory()
await memory.add(SystemMessage(content="You are an AI assistant. You have access to a web search tool to help you gather information about the codes and provide the user with accurate, up-to-date information. If you call the tavily_search_tool you use websearch_query from State as your search terms."))

#STEP IN WORKFLOW
async def generate_answer(state: SearchAgentState) -> str:
    print("Step: ", "generate_answer")
    
    # Ensure we are passing the correct search query
    if not isinstance(state.websearch_query, str) or not state.websearch_query.strip():
        raise ValueError("websearch_query is invalid. Ensure it is a non-empty string.")
    

    #create the agent
    agent = ReActAgent(llm=model, tools=[tavily_search_tool], memory=UnconstrainedMemory())
    
    # Debugging
    print(f"Running agent with query: {state.websearch_query}")
    
    #run the agent
    result: ReActAgentRunOutput = await agent.run(prompt= state.websearch_query).observe(observer)

    # Store answer in state
    state.answer = result.get_text_content()

    # Add response to memory
    await state.memory.add(AssistantMessage(content=state.answer))
    
    return Workflow.END


Testing just the genrate function

In [36]:
state = SearchAgentState(
    question="What are the latest trends in AI?",
    websearch_query="latest trends in quantum computing",
    search_results=None,
    answer=None,
    memory=UnconstrainedMemory()
)

# Print the instance
print(state)

question='What are the latest trends in AI?' websearch_query='latest trends in quantum computing' search_results=None answer=None memory=<beeai_framework.memory.unconstrained_memory.UnconstrainedMemory object at 0x11c2fe0f0>


In [37]:
await generate_answer(state=state)

Step:  generate_answer
Running agent with query: latest trends in quantum computing
Agent 🤖 :  The generated output does not adhere to the schema.
Transition from 'thought' to 'tool_input' does not exist!
Agent 🤖 :  The generated output does not adhere to the schema.
Transition from 'thought' to 'tool_input' does not exist!
Agent 🤖 :  The generated output does not adhere to the schema.
Transition from 'thought' to 'tool_input' does not exist!
Agent 🤖 :  retrying the action...
Agent(thought) 🤖 :  I need to perform a web search for the latest trends in quantum computing using the Tavily Search Tool.

Agent(tool_name) 🤖 :  tavily_search_tool
Agent 🤖 :  The generated output does not adhere to the schema.
Value for 'tool_input' cannot be retrieved because its value does not adhere to the appropriate schema.
Agent 🤖 :  Maximal amount of global retries (1) has been reached.


AgentError: Maximal amount of global retries (1) has been reached.

In [None]:
#Only use this step if you want to make the websearch a structured part of the workflow, not if you want to give the option for the agent to call websearch as a tool

# async def execute_web_search(state: SearchAgentState) -> str:
#     print("Step: ", "execute_web_search")

#     if not state.websearch_query:
#         print("No web search query generated.")
#         state.search_results = "No search results available."
#         return "generate_answer"

#     try:
#         # Perform web search
#         state.search_results = str(search_tool.run(state.websearch_query))
#     except Exception as e:
#         print("Search tool failed! Agent will answer from memory.")
#         state.search_results = "No search results available."

#     return "generate_answer"

In [None]:
#OLD GENRATE ANSWER FOR STRUCTURED WORKFLOW

# async def generate_answer(state: SearchAgentState) -> str:
#     print("Step: ", "generate_answer")

#     # Generate response based on search results
#     prompt = search_rag_template.render(
#         SearchRAGInput(question=state.question, search_results=state.search_results or "No results available.")
#     )
#     # Add prompt to memory
#     await state.memory.add(UserMessage(content=prompt))
#     # Generate model output
#     output: ChatModelOutput = await model.create(messages=state.memory.messages)
#     # Store answer in state
#     state.answer = output.get_text_content()
#     # Add response to memory
#     await state.memory.add(AssistantMessage(content=state.answer))
#     return Workflow.END




In [10]:
#add an inital system prompt to memory so that it is not empty when the process kicks off

memory = UnconstrainedMemory()
await memory.add(SystemMessage(content="You are an AI assistant that helps people understand their medical bills. You have access to a web search tool to help you gather information about the codes and provide the user with accurate, up-to-date information."))

In [11]:
# UNSTRUCTURED WORKFLOW
try:
    search_workflow = Workflow(SearchAgentState)
    
    # Define workflow steps
    search_workflow.add_step("generate_web_search_terms", generate_web_search_terms)
    search_workflow.add_step("generate_answer", generate_answer)

    while True:
        user_input = input("User (type 'exit' to stop): ")
        if user_input.lower() == "exit":
            break

        # Add user message to memory
        await memory.add(UserMessage(content=user_input))

        # Run workflow with memory
        response = await search_workflow.run(SearchAgentState(question=user_input, memory=memory))
        # Print assistant response
        print("Assistant:", response.state.answer)

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

Step:  generate_web_search_terms
Step:  generate_answer
Agent 🤖 :  The generated output does not adhere to the schema.
Transition from 'thought' to 'tool_input' does not exist!
Agent 🤖 :  The generated output does not adhere to the schema.
Transition from 'thought' to 'tool_input' does not exist!
Agent 🤖 :  The generated output does not adhere to the schema.
Transition from 'thought' to 'tool_input' does not exist!
Agent 🤖 :  retrying the action...


CancelledError: 

Agent 🤖 :  Context destroyed.
Agent 🤖 :  Context destroyed.
Agent(thought) 🤖 :  I need to find out about India's history, culture, economy, geography, population, major cities, tourist attractions using the Tavily search tool.



In [None]:
# # Workflow Definition
# try:
#     search_workflow = Workflow(SearchAgentState)
    
#     # Define workflow steps
#     search_workflow.add_step("generate_web_search_terms", generate_web_search_terms)
#     search_workflow.add_step("execute_web_search", execute_web_search)
#     search_workflow.add_step("generate_answer", generate_answer)

#     while True:
#         user_input = input("User (type 'exit' to stop): ")
#         if user_input.lower() == "exit":
#             break

#         # Add user message to memory
#         await memory.add(UserMessage(content=user_input))

#         # Run workflow with memory
#         response = await search_workflow.run(SearchAgentState(question=user_input, memory=memory))

#         # Print assistant response
#         print("Assistant:", response.state.answer)

# except WorkflowError:
#     traceback.print_exc()
# except ValidationError:
#     traceback.print_exc()

In [None]:
# SearchAgentState.websearch_query = "latest trends in quantum computing"

In [None]:

# Initialize the agent with a language model and the Tavily tool
chat_model: ChatModel = ChatModel.from_name("ollama:granite3.1-dense:2b")
agent = ReActAgent(llm=chat_model, tools=[tavily_search_tool], memory=UnconstrainedMemory())

# Run a query using the agent
result = await agent.run("What are the latest advancements in quantum computing?").observe(observer)