# BabyAGI with Tools \w Beautiful Soup integration

This notebook builds on top of [baby agi](baby_agi.ipynb), but shows how you can swap out the execution chain. The previous execution chain was just an LLM which made stuff up. By swapping it out with an agent that has access to tools, we can hopefully get real reliable information.

Additionally for resources beyond search, we would require a deep dive into a web page, for scraping we will be using Beautiful Soup.

## Install and Import Required Modules

In [1]:
import os
from collections import deque
from typing import Dict, List, Optional, Any, Type

from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.question_answering import load_qa_chain
from langchain.chains import AnalyzeDocumentChain
from langchain.prompts.base import BasePromptTemplate
from langchain.agents.agent_toolkits import FileManagementToolkit
from langchain.tools.file_management.write import WriteFileTool
import json
from json import JSONDecodeError
from tempfile import TemporaryDirectory
from kor import create_extraction_chain, Object, Text, Number
from pydantic import BaseModel, Field
from langchain.chains.base import Chain
import requests
from bs4 import BeautifulSoup
from markdownify import markdownify as md

## Connect to the Vector Store

Depending on what vectorstore you use, this step may look different.

In [2]:
%pip install faiss-cpu > /dev/null
%pip install google-search-results > /dev/null
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [3]:
# Define your embedding model
embeddings_model = OpenAIEmbeddings()
# Initialize the vectorstore as empty
import faiss

embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})

## Configure BeautifulSoup

Beautiful Soup is a Python package for parsing HTML and XML documents



In [4]:
headers = { 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:52.0) Gecko/20100101 Firefox/52.0' }
session = requests.Session()
session.headers.update(headers)
# The timeout for the HTTP request
timeout = 10

class Scraper: 
    def __init__(self, llm):
        self.llm = llm
    
    # Create a message for the user to summarize a chunk of text
    def run(self, url: str) -> str:
        # Get the response from a URL
        # HARDCODED FOR BETTER RESULT
        response = session.get(url="https://incidecoder.com/ingredients/eucalyptol", timeout=timeout)
        # Scrape text from a webpage
        soup = BeautifulSoup(response.text, "html.parser")
        text = soup.get_text()
        lines = (line.strip() for line in text.splitlines())
        chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
        texts = "\n".join(chunk for chunk in chunks if chunk)   

        # https://www.youtube.com/watch?v=xZzvwR9jdPA
        texts = md(texts)
        print("\033[92m\033[1m" + "\n*****MARKDOWN*****\n" + "\033[0m\033[0m")
        print(texts)

        # kor object
        schema = Object(
            id="products",
            description="""
                A list of products that contain a particular metabolite or ingredient    
            """,
            attributes=[
                Text(
                    id="product",
                    description="Products that containing a particular metabolite or ingredient"
                )
            ],   
            examples=[
                (
                    """
                        Products with Eucalyptol
                        listerine Gum Therapy
                        Davines Naturaltech Energizing Seasonal Superactive
                        Davines Naturaltech Energizing Superactive                    
                    """,
                    [
                        {"product": "listerine Gum Therapy"},
                        {"product": "Davines Naturaltech Energizing Seasonal Superactive"},
                        {"product": "Davines Naturaltech Energizing Superactive"},
                    ],
                ), 
            ],                             
        )

        chain = create_extraction_chain(self.llm, schema)

        result = chain.predict_and_parse(text=(texts))["data"]

        print("\033[92m\033[1m" + "\n*****RESULT OF SCRAPE*****\n" + "\033[0m\033[0m")
        print(result)        
        return result

## Write File Tool

In [5]:
class FixedWriteFileTool(WriteFileTool):
    description = (
        "Write file to disk." 
        "Takes the file path and the text content as two string arguments."
        "An example of this input: \"file_path\": \"test.txt\", \"text\": \"This is some test text.\""
        "Follow this example strictly; make sure to always use the double quotes just like in the example when working with the real inputs."
    )
    args_schema: Optional[Type[BaseModel]] = None

    @staticmethod
    def is_valid_json(input_string):
        try:
            json.loads(input_string)
            return True
        except JSONDecodeError:
            return False


    def _run(self, file_path_and_text: str) -> str:
        print()
        print(file_path_and_text)
        file_path_and_text = "{\"" + file_path_and_text + "\"}"
        print()
        print(file_path_and_text)
        file_path_and_text_json = None
        if FixedWriteFileTool.is_valid_json(file_path_and_text):
            file_path_and_text_json = json.loads(file_path_and_text)
            return super()._run(file_path_and_text_json["file_path"], file_path_and_text_json["text"])
        else:
            return "Failed to write file."

## Define the Chains

BabyAGI relies on three LLM chains:
- Task creation chain to select new tasks to add to the list
- Task prioritization chain to re-prioritize tasks
- Execution Chain to execute the tasks


NOTE: in this notebook, the Execution chain will now be an agent.

In [6]:
class TaskCreationChain(LLMChain):
    """Chain to generates tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        task_creation_template = (
            "You are an task creation AI that uses the result of an execution agent"
            " to create new tasks with the following objective: {objective},"
            " The last completed task has the result: {result}."
            " This result was based on this task description: {task_description}."
            " These are incomplete tasks: {incomplete_tasks}."
            " Based on the result, create new tasks to be completed"
            " by the AI system that do not overlap with incomplete tasks."
            " Return the tasks as an array."
        )
        prompt = PromptTemplate(
            template=task_creation_template,
            input_variables=[
                "result",
                "task_description",
                "incomplete_tasks",
                "objective",
            ],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

In [7]:
class TaskPrioritizationChain(LLMChain):
    """Chain to prioritize tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        task_prioritization_template = (
            "You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing"
            " the following tasks: {task_names}."
            " Consider the ultimate objective of your team: {objective}."
            " Do not remove any tasks. Return the result as a numbered list, like:"
            " #. First task"
            " #. Second task"
            " Start the task list with number {next_task_id}."
        )
        prompt = PromptTemplate(
            template=task_prioritization_template,
            input_variables=["task_names", "next_task_id", "objective"],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

In [8]:
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain import OpenAI, SerpAPIWrapper, LLMChain

todo_prompt = PromptTemplate.from_template(
    "You are a planner who is an expert at coming up with a todo list for a given objective. Come up with a todo list for this objective: {objective}"
)
todo_chain = LLMChain(llm=OpenAI(temperature=0), prompt=todo_prompt)
search = SerpAPIWrapper()
scraper = Scraper(llm=OpenAI(temperature=0))

# We'll make a temporary directory to avoid clutter
working_directory = TemporaryDirectory(dir="./temp")
# tools = FileManagementToolkit(root_dir=str(working_directory.name), selected_tools=["read_file", "write_file", "list_directory"]).get_tools()
# tools
# read_tool, write_tool, list_tool = tools
fixed_write_tool = FixedWriteFileTool(root_dir=str(working_directory.name))

# url_pattern = r'https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
tools = [
    Tool(
        name="Search",
        func=search.run,
        description="useful for when you need to answer questions about current events",
    ),
    Tool(
        name="Scrape",
        func=scraper.run,
        description="useful for browsing a webpage and summarize it using the LLM model. Input: an HTTP webpage url as defined by Search link. Output: a list of products or brand",
    ),
    Tool(
        name="TODO",
        func=todo_chain.run,
        description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!",
    ),
    Tool(
        name="Write File",
        # func=fixed_write_tool.run,
        func=fixed_write_tool.run,
        description = (
            "useful for when you need to write file to disk." 
            "Takes the file path and the text content as two string arguments."
            "Truncate text content to a length of 500 characters."
            "An example of this input: \"file_path\": \"test.txt\", \"text\": \"This is some test text.\""            
            "Follow this example strictly; make sure to always use the double quotes, on both key and value, just like in the example when working with the real inputs."
        )        
    )
]


prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}."""
suffix = """Question: {task}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
    tools,
    prefix=prefix,
    suffix=suffix,
    input_variables=["objective", "task", "context", "agent_scratchpad"],
)

### Define the BabyAGI Controller

BabyAGI composes the chains defined above in a (potentially-)infinite loop.

In [9]:
def get_next_task(
    task_creation_chain: LLMChain,
    result: Dict,
    task_description: str,
    task_list: List[str],
    objective: str,
) -> List[Dict]:
    """Get the next task."""
    incomplete_tasks = ", ".join(task_list)
    response = task_creation_chain.run(
        result=result,
        task_description=task_description,
        incomplete_tasks=incomplete_tasks,
        objective=objective,
    )
    new_tasks = response.split("\n")
    return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()]

In [10]:
def prioritize_tasks(
    task_prioritization_chain: LLMChain,
    this_task_id: int,
    task_list: List[Dict],
    objective: str,
) -> List[Dict]:
    """Prioritize tasks."""
    task_names = [t["task_name"] for t in task_list]
    next_task_id = int(this_task_id) + 1
    response = task_prioritization_chain.run(
        task_names=task_names, next_task_id=next_task_id, objective=objective
    )
    new_tasks = response.split("\n")
    prioritized_task_list = []
    for task_string in new_tasks:
        if not task_string.strip():
            continue
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = task_parts[0].strip()
            task_name = task_parts[1].strip()
            prioritized_task_list.append({"task_id": task_id, "task_name": task_name})
    return prioritized_task_list

In [11]:
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
    """Get the top k tasks based on the query."""
    results = vectorstore.similarity_search_with_score(query, k=k)
    if not results:
        return []
    sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
    return [str(item.metadata["task"]) for item in sorted_results]


def execute_task(
    vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5
) -> str:
    """Execute a task."""
    context = _get_top_tasks(vectorstore, query=objective, k=k)
    return execution_chain.run(objective=objective, context=context, task=task)

In [12]:
class BabyAGI(Chain, BaseModel):
    """Controller model for the BabyAGI agent."""

    task_list: deque = Field(default_factory=deque)
    task_creation_chain: TaskCreationChain = Field(...)
    task_prioritization_chain: TaskPrioritizationChain = Field(...)
    execution_chain: AgentExecutor = Field(...)
    task_id_counter: int = Field(1)
    vectorstore: VectorStore = Field(init=False)
    max_iterations: Optional[int] = None

    class Config:
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True

    def add_task(self, task: Dict):
        self.task_list.append(task)

    def print_task_list(self):
        print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
        for t in self.task_list:
            print(str(t["task_id"]) + ": " + t["task_name"])

    def print_next_task(self, task: Dict):
        print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
        print(str(task["task_id"]) + ": " + task["task_name"])

    def print_task_result(self, result: str):
        print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
        print(result)

    @property
    def input_keys(self) -> List[str]:
        return ["objective"]

    @property
    def output_keys(self) -> List[str]:
        return []

    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Run the agent."""
        objective = inputs["objective"]
        first_task = inputs.get("first_task", "Make a todo list")
        self.add_task({"task_id": 1, "task_name": first_task})
        num_iters = 0
        while True:
            if self.task_list:
                self.print_task_list()

                # Step 1: Pull the first task
                task = self.task_list.popleft()
                self.print_next_task(task)

                # Step 2: Execute the task
                result = execute_task(
                    self.vectorstore, self.execution_chain, objective, task["task_name"]
                )
                this_task_id = int(task["task_id"])
                self.print_task_result(result)

                # Step 3: Store the result in Pinecone
                result_id = f"result_{task['task_id']}"
                self.vectorstore.add_texts(
                    texts=[result],
                    metadatas=[{"task": task["task_name"]}],
                    ids=[result_id],
                )

                # Step 4: Create new tasks and reprioritize task list
                new_tasks = get_next_task(
                    self.task_creation_chain,
                    result,
                    task["task_name"],
                    [t["task_name"] for t in self.task_list],
                    objective,
                )
                for new_task in new_tasks:
                    self.task_id_counter += 1
                    new_task.update({"task_id": self.task_id_counter})
                    self.add_task(new_task)
                self.task_list = deque(
                    prioritize_tasks(
                        self.task_prioritization_chain,
                        this_task_id,
                        list(self.task_list),
                        objective,
                    )
                )
            num_iters += 1
            if self.max_iterations is not None and num_iters == self.max_iterations:
                print(
                    "\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m"
                )
                break
        return {}

    @classmethod
    def from_llm(
        cls, llm: BaseLLM, vectorstore: VectorStore, verbose: bool = False, **kwargs
    ) -> "BabyAGI":
        """Initialize the BabyAGI Controller."""
        task_creation_chain = TaskCreationChain.from_llm(llm, verbose=verbose)
        task_prioritization_chain = TaskPrioritizationChain.from_llm(
            llm, verbose=verbose
        )
        llm_chain = LLMChain(llm=llm, prompt=prompt)
        tool_names = [tool.name for tool in tools]
        agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
        agent_executor = AgentExecutor.from_agent_and_tools(
            agent=agent, tools=tools, verbose=True
        )
        return cls(
            task_creation_chain=task_creation_chain,
            task_prioritization_chain=task_prioritization_chain,
            execution_chain=agent_executor,
            vectorstore=vectorstore,
            **kwargs,
        )

### Run the BabyAGI

Now it's time to create the BabyAGI controller and watch it try to accomplish your objective.

In [13]:
# 1. Research cosmetic products containing Eucalyptol.
# 2. Create a list of cosmetic products containing Eucalyptol.
# 3. Format the list into a JSON format.
# 4. Check the accuracy of the list.
# 5. Output the formatted JSON list of cosmetic products containing Eucalyptol.
OBJECTIVE = """
    Write a list of all cosmetic products containing Eucalyptol to disk.
    Validate the accuracy of the list using web search and scraping, and amend as needed.
    Only the brand or product name is required. 
    Pricing, ethical considerations, source, side effects, reviews, comparisons or any other information is not required.   
    Once file is written, exit.
"""

In [14]:
llm = OpenAI(temperature=0)

In [15]:
# Logging of LLMChains
verbose = True
# If None, will keep on going forever
max_iterations: Optional[int] = 3
baby_agi = BabyAGI.from_llm(
    llm=llm, vectorstore=vectorstore, verbose=verbose, max_iterations=max_iterations
)

In [16]:
baby_agi({"objective": OBJECTIVE})

[95m[1m
*****TASK LIST*****
[0m[0m
1: Make a todo list
[92m[1m
*****NEXT TASK*****
[0m[0m
1: Make a todo list


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mThought: I need to create a todo list
Action: TODO
Action Input: Write a list of all cosmetic products containing Eucalyptol to disk. Validate the accuracy of the list using web search and scraping, and amend as needed. Only the brand or product name is required.[0m
Observation: [38;5;200m[1;3m

1. Research and compile a list of cosmetic products containing Eucalyptol.
2. Validate the accuracy of the list using web search and scraping.
3. Amend the list as needed.
4. Write the list of cosmetic products containing Eucalyptol to disk.
5. Double-check the accuracy of the list and make any necessary amendments.
6. Test the list to ensure accuracy.
7. Publish the list of cosmetic products containing Eucalyptol to disk.[0m
Thought:[32;1m[1;3m I now know the final answer
Final Answer: The final answer is a list

{'objective': '\n    Write a list of all cosmetic products containing Eucalyptol to disk.\n    Validate the accuracy of the list using web search and scraping, and amend as needed.\n    Only the brand or product name is required. \n    Pricing, ethical considerations, source, side effects, reviews, comparisons or any other information is not required.   \n    Once file is written, exit.\n'}