## Introduction

The goal of the BeeAI project is to make AI agents interoperable, regardless of their underlying implementation. The project consists of two key components:
- **BeeAI Platform**: The platform to easily discover, run, and compose AI agents from any framework.
- **BeeAI Framework**: A production-grade framework for building AI agents in either Python or TypeScript.

Detailed information on BeeAI can be found [here](https://beeai.dev/).

### What's in this notebook?

In this notebook we will learn about BeeAI Workflows, which are available in the BeeAI Framework. An agent's behavior is defined through workflow steps and the transitions between them. You can think of a workflow as a graph that outlines an agent's behavior.

You can run this notebook on [**Google Colab**](https://colab.research.google.com/). The notebook uses **Ollama** to provide access to a variety of foundation models for remote execution. The notebook will run faster on Colab if you use the free *T4 GPU* option by selecting *Runtime / Change runtime type* in the Colab system menu.

Run the Next Cell to wrap Notebook output.

In [None]:
from IPython.display import HTML, display


def set_css():
    display(HTML("\n<style>\n pre{\n white-space: pre-wrap;\n}\n</style>\n"))


get_ipython().events.register("pre_run_cell", set_css)

### Install Libraries
We start by installing the required dependencies and starting Ollama server.

In [None]:
%pip install -q langchain_community wikipedia requests==2.32.4 beeai-framework

!curl -fsSL https://ollama.com/install.sh | sh > /dev/null
!nohup ollama serve >/dev/null 2>&1 &
!ollama pull granite3.3:8b

### Import Libraries

In [None]:
# Imports

import traceback
import warnings

import wikipedia
from bs4 import GuessedAtParserWarning
from langchain_community.utilities import SearxSearchWrapper
from pydantic import BaseModel, Field, InstanceOf, ValidationError

from beeai_framework.backend.chat import ChatModel, ChatModelOutput, ChatModelStructureOutput
from beeai_framework.backend.message import AssistantMessage, SystemMessage, UserMessage
from beeai_framework.memory.unconstrained_memory import UnconstrainedMemory
from beeai_framework.template import PromptTemplate, PromptTemplateInput
from beeai_framework.workflows.workflow import Workflow, WorkflowError

# Suppress parser warnings
warnings.filterwarnings("ignore", category=GuessedAtParserWarning)


def object_on_screen(obj):
    display(obj)


print("Imports completed")

## Basics of BeeAI Workflows

The main components of a BeeAI Workflow are state, defined as a Pydantic model, and steps, which are Python functions.

- State: Think of state as structured memory that the workflow can read from and write to during execution. It holds the data that flows through the workflow.
- Steps: These are the functional components of the workflow, connecting together to perform an agent’s actions.

The following simple workflow example highlights these key features:

- The state definition includes a required message field.
- The step (my_first_step) is defined as a function that takes the state instance as a parameter.
- The state can be modified within a step, and changes to the state are preserved across steps and workflow executions.
- The step function returns a string (Workflow.END), indicating the name of the next step (this is how step transitions are handled).
- Workflow.END signifies the end of the workflow.

In [None]:
print("We will build a MessageState workflow.")


# Define global state that is accessible to each step in the workflow graph
# The message field is required when instantiating the state object
class MessageState(BaseModel):
    message: str


# Each step in the workflow is defined as a python function
async def my_first_step(state: MessageState) -> None:
    state.message += " World"  # Modify the state
    print("\nRunning Step 1: adding 'World' to state.")
    return Workflow.END


try:
    # Define the structure of the workflow graph
    basic_workflow = Workflow(schema=MessageState, name="MyWorkflow")

    # Add a step, each step has a name and a function that implements the step
    basic_workflow.add_step("my_first_step", my_first_step)
    print("Setting initial workflow state to 'Hello'.")
    print("Each step in a workflow is implemented as a Python function.")

    # Execute the workflow
    basic_response = await basic_workflow.run(MessageState(message="Hello"))
    print("State after workflow run: " + basic_response.state.message)

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

## A Multi-Step Workflow with Tools

Now that you understand the basic components of a BeeAI Workflow, let’s explore the power of BeeAI Workflows by building a simple web search workflow.

This workflow creates a search query based on an input question, runs the query to retrieve search results, and then generates an answer to the question based on the results.

Let’s begin by defining our workflow State.

In this case, the question field is required when instantiating the State. The other fields, search_results and answer, are optional during construction (defaulting to None), but they will be populated by the workflow steps as the execution progresses.

In [None]:
print("We will build a SearchAgentState workflow.")


# Workflow State
class SearchAgentState(BaseModel):
    question: str
    search_results: str | None = None
    answer: str | None = None

Next, we define the ChatModel instance that will handle interaction with our LLM. For this example, we'll use IBM Granite 3.3 8B via watsonx. This model will be used to process the search query and generate answers based on the retrieved results.

In [None]:
# Construct ChatModel
model = ChatModel.from_name("ollama:granite3.3:8b")

Since this is a web search workflow, we need a way to run web searches.

- Option 1 is to use the SearxSearchWrapper from the Langchain community tools project. To use the SearxSearchWrapper, you need to set up a local SearXNG service, as described [here](https://github.com/i-am-bee/beeai-framework/blob/411a76558c1ddf42115e601baf8d5d1922a04695/python/examples/notebooks/searXNG.md) to configure your local SearXNG instance before proceeding.

- Option 2 is to use Wikipedia search.

Currently option 1 doesn't work in this implementation, so we will use option 2.

In [None]:
search_option = "Wikipedia"
# search_option = "Searx"

# Search tool option 1 (currently doesn't work in this implementation)

search_tool = SearxSearchWrapper(searx_host="http://127.0.0.1:8888")

# Search tool option 2


def search_wikipedia(question, sentences_per_page=10):
    try:
        # Search for relevant pages
        results = wikipedia.search(question)
        if not results:
            return "Page not found."

        # Loop through the search results and get summaries for each page
        for idx, result in enumerate(results):
            print("\nResult " + str(idx + 1) + ": " + result)
            try:
                page = wikipedia.page(result)
                summary = wikipedia.summary(page.title, sentences=sentences_per_page)
                print("Summary: " + summary)
            except wikipedia.exceptions.PageError:
                print("No summary found")
            except wikipedia.exceptions.DisambiguationError:
                print("Too many results")
            except Exception:
                print("An error occurred")

    except wikipedia.exceptions.DisambiguationError as e:
        return f"Too many results. Try to be more specific: {e.options[:5]}"
    except wikipedia.exceptions.PageError:
        return "Page not found."
    except Exception as e:
        return f"An error occurred: {e!s}"

In this workflow, we make extensive use of PromptTemplates and structured outputs.

Here, we define the various templates, input schemas, and structured output schemas that are essential for implementing the workflow. These templates will allow us to generate the search query and structure the results in a way that the workflow can process effectively.

In [None]:
# PromptTemplate Input Schemas
class QuestionInput(BaseModel):
    question: str


class SearchRAGInput(BaseModel):
    question: str
    search_results: str


# Prompt Templates
search_query_template = PromptTemplate(
    PromptTemplateInput(
        schema=QuestionInput,
        template="""Convert the following question into a concise, effective web search query using keywords and operators for accuracy.
Question: {{question}}""",
    )
)

search_rag_template = PromptTemplate(
    PromptTemplateInput(
        schema=SearchRAGInput,
        template="""Search results:
{{search_results}}

Question: {{question}}
Provide a concise answer based on the search results provided. If the results are irrelevant or insufficient, say 'I don't know.' Avoid phrases such as 'According to the results...'.""",
    )
)


# Structured output Schemas
class WebSearchQuery(BaseModel):
    query: str = Field(description="The web search query.")

Now, we can define the first step of the workflow, named web_search.

In this step:

- The LLM is prompted to generate an effective search query using the search_query_template.
- The generated search query is then used to run a web search via the web search tool.
- The search results are stored in the search_results field of the workflow state.
- Finally, the step returns generate_answer, passing control to the next step, named generate_answer.

In [None]:
async def web_search(state: SearchAgentState) -> str:
    print("\nRunning Step 1: web_search")

    # Generate a search query
    prompt = search_query_template.render(QuestionInput(question=state.question))
    response: ChatModelStructureOutput = await model.create_structure(
        schema=WebSearchQuery, messages=[UserMessage(prompt)]
    )
    generated_search_query = response.object["query"]
    print("Search query: " + generated_search_query)

    # Run search and store results in state
    try:
        if search_option == "Searx":
            state.search_results = str(search_tool.run(generated_search_query))
        else:
            state.search_results = str(search_wikipedia(generated_search_query))

    except Exception:
        print("Search tool failed! Agent will answer from memory.")
        state.search_results = "No search results available."

    return "generate_answer"

The next step in the workflow is generate_answer.

This step:

- Takes the question and search_results from the workflow state.
- Uses the search_rag_template to generate an answer based on the provided data.
- The generated answer is stored in the workflow state.
- Finally, the workflow ends by returning Workflow.END.

In [None]:
async def generate_answer(state: SearchAgentState) -> str:
    print("\nRunning Step 2: generate_answer")

    # Generate answer based on question and search results from previous step.
    prompt = search_rag_template.render(
        SearchRAGInput(question=state.question, search_results=state.search_results or "No results available.")
    )
    output: ChatModelOutput = await model.create(messages=[UserMessage(prompt)])

    # Store answer in state
    state.answer = output.get_text_content()
    return Workflow.END

The next step in the workflow is generate_answer.

This step:

- Takes the question and search_results from the workflow state.
- Uses the search_rag_template to generate an answer based on the provided data.
- The generated answer is stored in the workflow state.
- Finally, the workflow ends by returning Workflow.END.

In [None]:
try:
    # Define the structure of the workflow graph
    search_agent_workflow = Workflow(schema=SearchAgentState, name="WebSearchAgent")
    search_agent_workflow.add_step("web_search", web_search)
    search_agent_workflow.add_step("generate_answer", generate_answer)

    # Execute the workflow
    message = "What is a hedgehog?"
    print("Original Question: " + message)
    search_response = await search_agent_workflow.run(SearchAgentState(question=message))
    print("\nOriginal Question: " + search_response.state.question)
    print("\nFinal Answer: " + search_response.state.answer)

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

# Adding Memory to a Workflow

The web search workflow from the previous example can answer questions, but it cannot engage in a conversation because it doesn't maintain message history.

In the next example, we'll show you how to add memory to your workflow, allowing it to interactively chat while keeping track of the conversation history. This will enable the workflow to remember previous interactions and provide more context-aware responses.

In [None]:
# Workflow State


class ChatState(BaseModel):
    memory: InstanceOf[UnconstrainedMemory]
    output: str | None = None


async def chat(state: ChatState) -> str:
    output: ChatModelOutput = await model.create(messages=state.memory.messages)
    state.output = output.get_text_content()
    return Workflow.END


memory = UnconstrainedMemory()
await memory.add(SystemMessage(content="You are a helpful and friendly AI assistant."))

try:
    # Define the structure of the workflow graph
    chat_workflow = Workflow(ChatState)
    chat_workflow.add_step("chat", chat)
    chat_workflow.add_step("generate_answer", generate_answer)

    while True:
        user_input = input("Type a message to the Assistant (type exit to stop): ")
        if user_input == "exit":
            break
        print(user_input)
        # Add user message to memory
        await memory.add(UserMessage(content=user_input))
        # Run workflow with memory
        response = await chat_workflow.run(ChatState(memory=memory))
        # Add assistant response to memory
        await memory.add(AssistantMessage(content=response.state.output))
        print("Assistant: " + response.state.output)

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

print("Demo complete")

## Learn More

Detailed information on BeeAI can be found [here](https://beeai.dev/).

In this notebook, you learned how to build BeeAI Workflows in the BeeAI Framework.
