<a href="https://colab.research.google.com/github/mcleods777/LLM-Notebook-Experiments/blob/main/babyagi_generativeagents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title Import and Install Dependencies

# Install packages
!pip install langchain > /dev/null
!pip install faiss-gpu > /dev/null
!pip install tiktoken > /dev/null
!pip install openai > /dev/null
!pip install termcolor > /dev/null
!pip install -q google-search-results

# Import libraries
import os
import re
import math
import faiss
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Dict, Any
from termcolor import colored
from pydantic import BaseModel, Field
from collections import deque

# Import langchain components
from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.prompts import PromptTemplate
from langchain.retrievers import TimeWeightedVectorStoreRetriever
from langchain.schema import Document
from langchain.vectorstores import FAISS
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from langchain.chains.base import Chain
from langchain.experimental import AutoGPT, BabyAGI

#Deprecated
#from langchain.schema import BaseLanguageModel, Document


  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for google-search-results (setup.py) ... [?25l[?25hdone


In [None]:
#@title Hide
from langchain import OpenAI, ConversationChain, LLMChain, PromptTemplate
from langchain.memory import ConversationBufferWindowMemory


template = """Assistant is a large language model trained by OpenAI.

Assistant is designed to be able to assist with a wide range of tasks, from answering simple questions to providing in-depth explanations and discussions on a wide range of topics. As a language model, Assistant is able to generate human-like text based on the input it receives, allowing it to engage in natural-sounding conversations and provide responses that are coherent and relevant to the topic at hand.

Assistant is constantly learning and improving, and its capabilities are constantly evolving. It is able to process and understand large amounts of text, and can use this knowledge to provide accurate and informative responses to a wide range of questions. Additionally, Assistant is able to generate its own text based on the input it receives, allowing it to engage in discussions and provide explanations and descriptions on a wide range of topics.

Overall, Assistant is a powerful tool that can help with a wide range of tasks and provide valuable insights and information on a wide range of topics. Whether you need help with a specific question or just want to have a conversation about a particular topic, Assistant is here to assist.

{history}
Human: {human_input}
Assistant:"""

prompt = PromptTemplate(
    input_variables=["history", "human_input"],
    template=template
)


chatgpt_chain = LLMChain(
    llm=OpenAI(temperature=0,openai_api_key=openai_api_key),
    prompt=prompt,
    verbose=True,
    memory=ConversationBufferWindowMemory(k=2),
)

output = chatgpt_chain.predict(human_input="Hi there!")
print(output)

In [None]:
#@title User Name and API Key
openai_api_key='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'
serpapi_api_key='c4f2536cb9f4906c2ba4c1b792c19c1745c6f3410de473e19dddd23a75687082'
USER_NAME = "Person A" # The name you want to use for the agent.
LLM = ChatOpenAI(max_tokens=1500,openai_api_key=openai_api_key) # Can be any LLM you want.

In [None]:
#@title Set Up Tools

from langchain.utilities import SerpAPIWrapper
from langchain.agents import Tool
from langchain.tools.file_management.write import WriteFileTool
from langchain.tools.file_management.read import ReadFileTool

search = SerpAPIWrapper(serpapi_api_key=serpapi_api_key)
tools = [
    Tool(
        name = "search",
        func=search.run,
        description="useful for when you need to answer questions about current events. You should ask targeted questions"
    ),
    WriteFileTool(),
    ReadFileTool(),
]

In [None]:
# Define your embedding model
embeddings_model = OpenAIEmbeddings(openai_api_key=openai_api_key)
# Initialize the vectorstore as empty
import faiss

embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})

In [None]:
#@title Set Up AutoGPT Agent

agent_autogpt = AutoGPT.from_llm_and_tools(
    ai_name="Tom",
    ai_role="Assistant",
    tools=tools,
    llm=ChatOpenAI(temperature=0,openai_api_key=openai_api_key),
    memory=vectorstore.as_retriever()
)
# Set verbose to be true
agent_autogpt.chain.verbose = True

agent_autogpt.run(["write a weather report for SF today"])

In [None]:
#@title GenerativeAI Class

import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

from pydantic import BaseModel, Field

from langchain import LLMChain
from langchain.base_language import BaseLanguageModel
from langchain.experimental.generative_agents.memory import GenerativeAgentMemory
from langchain.prompts import PromptTemplate

class GenerativeAgent(BaseModel):
    name: str
    age: int
    traits: str
    status: str
    role: str  # Added the role variable here
    llm: BaseLanguageModel
    memory_retriever: TimeWeightedVectorStoreRetriever
    verbose: bool = False
    reflection_threshold: Optional[float] = None
    current_plan: List[str] = []
    summary: str = ""
    summary_refresh_seconds: int = 3600
    last_refreshed: datetime = Field(default_factory=datetime.now)
    daily_summaries: List[str]
    memory_importance: float = 0.0
    max_tokens_limit: int = 1200

    class Config:
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True

    @staticmethod
    def _parse_list(text: str) -> List[str]:
        """Parse a newline-separated string into a list of strings."""
        lines = re.split(r'\n', text.strip())
        return [re.sub(r'^\s*\d+\.\s*', '', line).strip() for line in lines]


    def _compute_agent_summary(self):
        """"""
        prompt = PromptTemplate.from_template(
            "How would you summarize {name}'s core characteristics given the"
            +" following statements:\n"
            +"{related_memories}"
            + "Do not embellish."
            +"\n\nSummary: "
        )
        # The agent seeks to think about their core characteristics.
        relevant_memories = self.fetch_memories(f"{self.name}'s core characteristics")
        relevant_memories_str = "\n".join([f"{mem.page_content}" for mem in relevant_memories])
        chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        return chain.run(name=self.name, related_memories=relevant_memories_str).strip()

    def _get_topics_of_reflection(self, last_k: int = 50) -> Tuple[str, str, str]:
        """Return the 3 most salient high-level questions about recent observations."""
        prompt = PromptTemplate.from_template(
            "{observations}\n\n"
            + "Given only the information above, what are the 3 most salient"
            + " high-level questions we can answer about the subjects in the statements?"
            + " Provide each question on a new line.\n\n"
        )
        reflection_chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        observations = self.memory_retriever.memory_stream[-last_k:]
        observation_str = "\n".join([o.page_content for o in observations])
        result = reflection_chain.run(observations=observation_str)
        return self._parse_list(result)

    def _get_insights_on_topic(self, topic: str) -> List[str]:
        """Generate 'insights' on a topic of reflection, based on pertinent memories."""
        prompt = PromptTemplate.from_template(
            "Statements about {topic}\n"
            +"{related_statements}\n\n"
            + "What 5 high-level insights can you infer from the above statements?"
            + " (example format: insight (because of 1, 5, 3))"
        )
        related_memories = self.fetch_memories(topic)
        related_statements = "\n".join([f"{i+1}. {memory.page_content}"
                                        for i, memory in
                                        enumerate(related_memories)])
        reflection_chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        result = reflection_chain.run(topic=topic, related_statements=related_statements)
        # TODO: Parse the connections between memories and insights
        return self._parse_list(result)

    def pause_to_reflect(self) -> List[str]:
        """Reflect on recent observations and generate 'insights'."""
        print(colored(f"Character {self.name} is reflecting", "blue"))
        new_insights = []
        topics = self._get_topics_of_reflection()
        for topic in topics:
            insights = self._get_insights_on_topic( topic)
            for insight in insights:
                self.add_memory(insight)
            new_insights.extend(insights)
        return new_insights

    def _score_memory_importance(self, memory_content: str, weight: float = 0.15) -> float:
        """Score the absolute importance of the given memory."""
        # A weight of 0.25 makes this less important than it
        # would be otherwise, relative to salience and time
        prompt = PromptTemplate.from_template(
         "On the scale of 1 to 10, where 1 is purely mundane"
         +" (e.g., brushing teeth, making bed) and 10 is"
         + " extremely poignant (e.g., a break up, college"
         + " acceptance), rate the likely poignancy of the"
         + " following piece of memory. Respond with a single integer."
         + "\nMemory: {memory_content}"
         + "\nRating: "
        )
        chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        score = chain.run(memory_content=memory_content).strip()
        match = re.search(r"^\D*(\d+)", score)
        if match:
            return (float(score[0]) / 10) * weight
        else:
            return 0.0


    def add_memory(self, memory_content: str) -> List[str]:
        """Add an observation or memory to the agent's memory."""
        importance_score = self._score_memory_importance(memory_content)
        self.memory_importance += importance_score
        document = Document(page_content=memory_content, metadata={"importance": importance_score})
        result = self.memory_retriever.add_documents([document])

        # After an agent has processed a certain amount of memories (as measured by
        # aggregate importance), it is time to reflect on recent events to add
        # more synthesized memories to the agent's memory stream.
        if (self.reflection_threshold is not None
            and self.memory_importance > self.reflection_threshold
            and self.status != "Reflecting"):
            old_status = self.status
            self.status = "Reflecting"
            self.pause_to_reflect()
            # Hack to clear the importance from reflection
            self.memory_importance = 0.0
            self.status = old_status
        return result

    def fetch_memories(self, observation: str) -> List[Document]:
        """Fetch related memories."""
        return self.memory_retriever.get_relevant_documents(observation)


    def get_summary(self, force_refresh: bool = False) -> str:
        """Return a descriptive summary of the agent."""
        current_time = datetime.now()
        since_refresh = (current_time - self.last_refreshed).seconds
        if not self.summary or since_refresh >= self.summary_refresh_seconds or force_refresh:
            self.summary = self._compute_agent_summary()
            self.last_refreshed = current_time
        return (
            f"Name: {self.name} (age: {self.age})"
            +f"\nInnate traits: {self.traits}"
            +f"\n{self.summary}"
        )

    def get_full_header(self, force_refresh: bool = False) -> str:
        """Return a full header of the agent's status, summary, and current time."""
        summary = self.get_summary(force_refresh=force_refresh)
        current_time_str =  datetime.now().strftime("%B %d, %Y, %I:%M %p")
        return f"{summary}\nIt is {current_time_str}.\n{self.name}'s status: {self.status}"



    def _get_entity_from_observation(self, observation: str) -> str:
        prompt = PromptTemplate.from_template(
            "What is the observed entity in the following observation? {observation}"
            +"\nEntity="
        )
        chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        return chain.run(observation=observation).strip()

    def _get_entity_action(self, observation: str, entity_name: str) -> str:
        prompt = PromptTemplate.from_template(
            "What is the {entity} doing in the following observation? {observation}"
            +"\nThe {entity} is"
        )
        chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        return chain.run(entity=entity_name, observation=observation).strip()

    def _format_memories_to_summarize(self, relevant_memories: List[Document]) -> str:
        content_strs = set()
        content = []
        for mem in relevant_memories:
            if mem.page_content in content_strs:
                continue
            content_strs.add(mem.page_content)
            created_time = mem.metadata["created_at"].strftime("%B %d, %Y, %I:%M %p")
            content.append(f"- {created_time}: {mem.page_content.strip()}")
        return "\n".join([f"{mem}" for mem in content])

    def summarize_related_memories(self, observation: str) -> str:
        """Summarize memories that are most relevant to an observation."""
        entity_name = self._get_entity_from_observation(observation)
        entity_action = self._get_entity_action(observation, entity_name)
        q1 = f"What is the relationship between {self.name} and {entity_name}"
        relevant_memories = self.fetch_memories(q1) # Fetch memories related to the agent's relationship with the entity
        q2 = f"{entity_name} is {entity_action}"
        relevant_memories += self.fetch_memories(q2) # Fetch things related to the entity-action pair
        context_str = self._format_memories_to_summarize(relevant_memories)
        prompt = PromptTemplate.from_template(
            "{q1}?\nContext from memory:\n{context_str}\nRelevant context: "
        )
        chain = LLMChain(llm=self.llm, prompt=prompt, verbose=self.verbose)
        return chain.run(q1=q1, context_str=context_str.strip()).strip()

    def _get_memories_until_limit(self, consumed_tokens: int) -> str:
        """Reduce the number of tokens in the documents."""
        result = []
        for doc in self.memory_retriever.memory_stream[::-1]:
            if consumed_tokens >= self.max_tokens_limit:
                break
            consumed_tokens += self.llm.get_num_tokens(doc.page_content)
            if consumed_tokens < self.max_tokens_limit:
                result.append(doc.page_content)
        return "; ".join(result[::-1])

    def _generate_reaction(
        self,
        observation: str,
        suffix: str
    ) -> str:
        """React to a given observation."""
        prompt = PromptTemplate.from_template(
                "{agent_summary_description}"
                +"\nIt is {current_time}."
                +"\n{agent_name}'s status: {agent_status}"
                + "\nSummary of relevant context from {agent_name}'s memory:"
                +"\n{relevant_memories}"
                +"\nMost recent observations: {recent_observations}"
                + "\nObservation: {observation}"
                + "\n\n" + suffix
        )
        agent_summary_description = self.get_summary()
        relevant_memories_str = self.summarize_related_memories(observation)
        current_time_str = datetime.now().strftime("%B %d, %Y, %I:%M %p")
        kwargs = dict(agent_summary_description=agent_summary_description,
                      current_time=current_time_str,
                      relevant_memories=relevant_memories_str,
                      agent_name=self.name,
                      observation=observation,
                     agent_status=self.status)
        consumed_tokens = self.llm.get_num_tokens(prompt.format(recent_observations="", **kwargs))
        kwargs["recent_observations"] = self._get_memories_until_limit(consumed_tokens)
        action_prediction_chain = LLMChain(llm=self.llm, prompt=prompt)
        result = action_prediction_chain.run(**kwargs)
        return result.strip()

    def generate_dialogue_response(self, observation: str) -> Tuple[bool, str]:

     prompt = (
        "What would {agent_name} say to kindly help and respond to the following observation?\n\n"
    )

    # Format the prompt
    formatted_prompt = prompt.format(agent_name=self.name)

    # Generate a response using the language model
    full_result = self.llm.generate_text(
        formatted_prompt + observation, max_length=150
    )

    # Extract the response
    result = full_result.strip().split("\n")[0]

    response_text = self._clean_response(result)
    self.memory.save_context(
        {},
        {
            self.memory.add_memory_key: f"{self.name} observed "
            f"{observation} and said {response_text}"
        },
    )
     return True, f"{self.name} said {response_text}"

IndentationError: ignored

In [None]:
#@title Define BabyAGI Chains
class TaskCreationChain(LLMChain):
    """Chain to generates tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        task_creation_template = (
            "You are an task creation AI that uses the result of an execution agent"
            " to create new tasks with the following objective: {objective},"
            " The last completed task has the result: {result}."
            " This result was based on this task description: {task_description}."
            " These are incomplete tasks: {incomplete_tasks}."
            " Based on the result, create new tasks to be completed"
            " by the AI system that do not overlap with incomplete tasks."
            " Return the tasks as an array."
        )
        prompt = PromptTemplate(
            template=task_creation_template,
            input_variables=["result", "task_description", "incomplete_tasks", "objective"],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

class TaskPrioritizationChain(LLMChain):
    """Chain to prioritize tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        task_prioritization_template = (
            "You are an task prioritization AI tasked with cleaning the formatting of and reprioritizing"
            " the following tasks: {task_names}."
            " Consider the ultimate objective of your team: {objective}."
            " Do not remove any tasks. Return the result as a numbered list, like:"
            " #. First task"
            " #. Second task"
            " Start the task list with number {next_task_id}."
        )
        prompt = PromptTemplate(
            template=task_prioritization_template,
            input_variables=["task_names", "next_task_id", "objective"],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

class ExecutionChain(LLMChain):
    """Chain to execute tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        execution_template = (
            "You are an AI who performs one task based on the following objective: {objective}."
            " Take into account these previously completed tasks: {context}."
            " Your task: {task}."
            " Response:"
        )
        prompt = PromptTemplate(
            template=execution_template,
            input_variables=["objective", "context", "task"],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

In [None]:
#@title Load Up Our Memory
def relevance_score_fn(score: float) -> float:
    """Return a similarity score on a scale [0, 1]."""
    # This will differ depending on a few things:
    # - the distance / similarity metric used by the VectorStore
    # - the scale of your embeddings (OpenAI's are unit norm. Many others are not!)
    # This function converts the euclidean norm of normalized embeddings
    # (0 is most similar, sqrt(2) most dissimilar)
    # to a similarity function (0 to 1)
    return 1.0 - score / math.sqrt(2)

def create_new_memory_retriever():
    """Create a new vector store retriever unique to the agent."""
    # Define your embedding model
    embeddings_model = OpenAIEmbeddings(openai_api_key=openai_api_key)
    # Initialize the vectorstore as empty
    embedding_size = 1536
    index = faiss.IndexFlatL2(embedding_size)
    vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}, relevance_score_fn=relevance_score_fn)
    return TimeWeightedVectorStoreRetriever(vectorstore=vectorstore, other_score_keys=["importance"], k=15)

In [None]:
#@title Create Our Agent Attributes and AI Agent
agents_dict = {}

# Tax Attorney
agents_dict["Samantha"] = GenerativeAgent(name="Samantha Williams",
                                          age=35,
                                          traits="detail-oriented, analytical",
                                          status="working on a high-profile tax case",
                                          role="Tax Attorney",
                                          memory_retriever=create_new_memory_retriever(),
                                          llm=LLM,
                                          daily_summaries=[
                                              "Reviewed tax laws and prepared legal arguments for a client's case."
                                          ],
                                          reflection_threshold=8,
                                          )

# Family Law Attorney
agents_dict["John"] = GenerativeAgent(name="John Thompson",
                                      age=40,
                                      traits="compassionate, empathetic",
                                      status="helping clients with family law matters",
                                      role="Family Law Attorney",
                                      memory_retriever=create_new_memory_retriever(),
                                      llm=LLM,
                                      daily_summaries=[
                                          "Met with clients and negotiated custody arrangements."
                                      ],
                                      reflection_threshold=8,
                                      )

# Tax Advisor
agents_dict["Michelle"] = GenerativeAgent(name="Michelle Chen",
                                          age=32,
                                          traits="patient, good communicator",
                                          status="assisting clients with tax planning",
                                          role="Tax Advisor",
                                          memory_retriever=create_new_memory_retriever(),
                                          llm=LLM,
                                          daily_summaries=[
                                              "Advised clients on tax-saving strategies and prepared tax returns."
                                          ],
                                          reflection_threshold=8,
                                          )

# Financial Planner
agents_dict["Kevin"] = GenerativeAgent(name="Kevin Smith",
                                       age=45,
                                       traits="analytical, trustworthy",
                                       status="developing financial plans for clients",
                                       role="Financial Planner",
                                       memory_retriever=create_new_memory_retriever(),
                                       llm=LLM,
                                       daily_summaries=[
                                           "Assessed clients' financial situations and created customized plans for their financial goals."
                                       ],
                                       reflection_threshold=8,
                                       )


In [None]:
#@title Get Agent Summaries
# The current "Summary" of a character can't be made because the agent hasn't made
# any observations yet.

for agent_name, agent in agents_dict.items():
    print(f"{agent_name}'s summary:")
    print(agent.get_summary())
    print()

In [None]:
#@title Give the agents some memories to start
import random

memory_sets = [
    [
        "recalls their first time volunteering at an animal shelter",
        "cherishes the moments spent playing music",
        "finds joy in helping animals in need",
        "A local charity event leaves a lasting impression",
        "bonds with a rescued dog at the shelter",
        "uses their musical talent to bring people together",
        "is inspired by the kindness of others.",
    ],
    [
        "remembers their first cooking class",
        "recalls the excitement of trying new recipes",
        "finds satisfaction in creating delicious meals",
        "enjoys sharing food with friends and family",
        "discovers a new favorite ingredient at the farmers market",
        "learns about different cuisines from around the world",
        "takes pride in perfecting a challenging dish.",
    ],
    [
        "reminisces about their favorite childhood vacation",
        "appreciates the beauty of nature during a hike",
        "enjoys the thrill of exploring new places",
        "finds peace in watching the sunset on the beach",
        "learns about local culture and history while traveling",
        "connects with fellow travelers and forms lasting friendships",
        "is inspired to continue seeking new adventures.",
    ],
    [
        "looks back fondly on their time spent in college",
        "appreciates the camaraderie formed with classmates",
        "values the lessons learned from challenging coursework",
        "celebrates academic achievements and milestones",
        "discovers new interests and hobbies through extracurricular activities",
        "embraces personal growth and development",
        "feels a sense of accomplishment upon graduation.",
    ],
    [
        "remembers the excitement of starting a new job",
        "appreciates the support and mentorship of colleagues",
        "finds satisfaction in overcoming professional challenges",
        "develops new skills and expertise in their field",
        "enjoys collaborating with a diverse team to achieve common goals",
        "celebrates milestones and successes in their career",
        "looks forward to continued growth and advancement.",
    ],
]

for agent_name, agent in agents_dict.items():
    selected_memory_set = random.choice(memory_sets)
    for memory in selected_memory_set:
        memory_with_name = f"{agent_name} {memory}"
        agent.add_memory(memory_with_name)


In [None]:
#@title Let's have the Agent's start going through a day in the life.
import random

# Sample observations
observations = [
    [
        "Wakes up early to exercise.",
        "Prepares a healthy breakfast.",
        "Meets with a client to discuss their financial goals.",
        "Reviews market trends and investment opportunities.",
        "Attends a networking event to connect with professionals.",
        "Spends the evening relaxing with a good book.",
    ],
    [
        "Starts the day with a morning meditation.",
        "Helps a client understand complex legal concepts.",
        "Attends a court hearing to represent a client.",
        "Works on drafting legal documents for various cases.",
        "Meets with colleagues to discuss ongoing cases.",
        "Unwinds in the evening by watching a movie.",
    ],
    [
        "Goes for a morning jog to clear the mind.",
        "Spends the day designing a new website for a client.",
        "Collaborates with a team to brainstorm creative ideas.",
        "Attends an industry event to learn about the latest design trends.",
        "Meets up with friends for a casual dinner.",
        "Practices drawing and sketching before bed.",
    ],
    [
        "Wakes up and enjoys a cup of coffee while reading the news.",
        "Teaches a class on tax law to university students.",
        "Meets with clients to discuss tax planning strategies.",
        "Researches new tax regulations and updates.",
        "Attends a seminar on international tax laws.",
        "Spends the evening playing board games with family.",
    ],
    [
        "Begins the day with a yoga session.",
        "Counsels clients on various family law issues.",
        "Represents a client in a child custody hearing.",
        "Prepares legal documents for a divorce case.",
        "Attends a workshop on conflict resolution.",
        "Relaxes in the evening with a long walk in the park.",
    ],
]

# Add random observations to each agent's memory
for agent_name, agent in agents_dict.items():
    selected_observations = random.choice(observations)
    for observation in selected_observations:
        memory_with_name = f"{agent_name} {observation}"
        agent.add_memory(memory_with_name)

# Loop through agents_dict and call print_agent_reactions for each agent
def print_agent_reactions(agent, agent_observations):
    for i, observation in enumerate(agent_observations):
        memory_with_name = f"{agent.name} {observation}"
        _, reaction = agent.generate_reaction(memory_with_name)
        print(colored(observation, "green"), reaction)
        if ((i+1) % 20) == 0:
            print('*' * 40)
            print(colored(f"After {i+1} observations, {agent.name}'s summary is:\n{agent.get_summary(force_refresh=True)}", "blue"))
            print('*' * 40)


In [None]:
#@title Have the agents react to each observation we gave them
# Let's send Tommie on their way. We'll check in on their summary every few observations to watch it evolve
from termcolor import colored

def print_agent_reactions(agent, agent_observations):
    for i, observation in enumerate(agent_observations):
        _, reaction = agent.generate_reaction(observation)
        print(colored(observation, "green"), reaction)
        if ((i+1) % 20) == 0:
            print('*' * 40)
            print(colored(f"After {i+1} observations, {agent.name}'s summary is:\n{agent.get_summary(force_refresh=True)}", "blue"))
            print('*' * 40)

# Loop through agents_dict and call print_agent_reactions for each agent
for agent_name, agent in agents_dict.items():
    print(f"\n{agent_name}'s reactions:")
    print_agent_reactions(agent, observations)

In [None]:
#@title Define BabyAGI Controller
def get_next_task(task_creation_chain: LLMChain, result: Dict, task_description: str, task_list: List[str], objective: str) -> List[Dict]:
    """Get the next task."""
    incomplete_tasks = ", ".join(task_list)
    response = task_creation_chain.run(result=result, task_description=task_description, incomplete_tasks=incomplete_tasks, objective=objective)
    new_tasks = response.split('\n')
    return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()]

def prioritize_tasks(task_prioritization_chain: LLMChain, this_task_id: int, task_list: List[Dict], objective: str) -> List[Dict]:
    """Prioritize tasks."""
    task_names = [t["task_name"] for t in task_list]
    next_task_id = int(this_task_id) + 1
    response = task_prioritization_chain.run(task_names=task_names, next_task_id=next_task_id, objective=objective)
    new_tasks = response.split('\n')
    prioritized_task_list = []
    for task_string in new_tasks:
        if not task_string.strip():
            continue
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = task_parts[0].strip()
            task_name = task_parts[1].strip()
            prioritized_task_list.append({"task_id": task_id, "task_name": task_name})
    return prioritized_task_list

def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
    """Get the top k tasks based on the query."""
    results = vectorstore.similarity_search_with_score(query, k=k)
    if not results:
        return []
    sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
    return [str(item.metadata['task']) for item in sorted_results]

def execute_task(vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5) -> str:
    """Execute a task."""
    context = _get_top_tasks(vectorstore, query=objective, k=k)
    return execution_chain.run(objective=objective, context=context, task=task)

class BabyAGI(Chain, BaseModel):
    """Controller model for the BabyAGI agent."""

    task_list: deque = Field(default_factory=deque)
    task_creation_chain: TaskCreationChain = Field(...)
    task_prioritization_chain: TaskPrioritizationChain = Field(...)
    execution_chain: ExecutionChain = Field(...)
    task_id_counter: int = Field(1)
    vectorstore: VectorStore = Field(init=False)
    max_iterations: Optional[int] = None

    class Config:
        """Configuration for this pydantic object."""
        arbitrary_types_allowed = True

    def add_task(self, task: Dict):
        self.task_list.append(task)

    def print_task_list(self):
        print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
        for t in self.task_list:
            print(str(t["task_id"]) + ": " + t["task_name"])

    def print_next_task(self, task: Dict):
        print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
        print(str(task["task_id"]) + ": " + task["task_name"])

    def print_task_result(self, result: str):
        print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
        print(result)

    @property
    def input_keys(self) -> List[str]:
        return ["objective"]

    @property
    def output_keys(self) -> List[str]:
        return []

    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Run the agent."""
        objective = inputs['objective']
        first_task = inputs.get("first_task", "Make a todo list")
        self.add_task({"task_id": 1, "task_name": first_task})
        num_iters = 0
        while True:
            if self.task_list:
                self.print_task_list()

                # Step 1: Pull the first task
                task = self.task_list.popleft()
                self.print_next_task(task)

                # Step 2: Execute the task
                result = execute_task(
                    self.vectorstore, self.execution_chain, objective, task["task_name"]
                )
                this_task_id = int(task["task_id"])
                self.print_task_result(result)

                # Step 3: Store the result in Pinecone
                result_id = f"result_{task['task_id']}"
                self.vectorstore.add_texts(
                    texts=[result],
                    metadatas=[{"task": task["task_name"]}],
                    ids=[result_id],
                )

                # Step 4: Create new tasks and reprioritize task list
                new_tasks = get_next_task(
                    self.task_creation_chain, result, task["task_name"], [t["task_name"] for t in self.task_list], objective
                )
                for new_task in new_tasks:
                    self.task_id_counter += 1
                    new_task.update({"task_id": self.task_id_counter})
                    self.add_task(new_task)
                self.task_list = deque(
                    prioritize_tasks(
                        self.task_prioritization_chain, this_task_id, list(self.task_list), objective
                    )
                )
            num_iters += 1
            if self.max_iterations is not None and num_iters == self.max_iterations:
                print("\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m")
                break
        return {}

    @classmethod
    def from_llm(
        cls,
        llm: BaseLLM,
        vectorstore: VectorStore,
        verbose: bool = False,
        **kwargs
    ) -> "BabyAGI":
        """Initialize the BabyAGI Controller."""
        task_creation_chain = TaskCreationChain.from_llm(
            llm, verbose=verbose
        )
        task_prioritization_chain = TaskPrioritizationChain.from_llm(
            llm, verbose=verbose
        )
        execution_chain = ExecutionChain.from_llm(llm, verbose=verbose)
        return cls(
            task_creation_chain=task_creation_chain,
            task_prioritization_chain=task_prioritization_chain,
            execution_chain=execution_chain,
            vectorstore=vectorstore,
            **kwargs
        )



# Define your embedding model
embeddings_model = OpenAIEmbeddings(openai_api_key=openai_api_key)

# Initialize the vectorstore as empty
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}, relevance_score_fn=relevance_score_fn)


# Logging of LLMChains
verbose=False
# If None, will keep on going forever
max_iterations: Optional[int] = 3
baby_agi = BabyAGI.from_llm(
    llm=LLM,
    vectorstore=vectorstore,
    verbose=verbose,
    max_iterations=max_iterations
)

In [None]:
#@title Chat Controller
#@title Chat Controller
from typing import Dict, Optional

class GenerativeAgent:  # Add a placeholder class definition for GenerativeAgent
    def __init__(self):
        self.name = "Agent"

    def get_summary(self):
        return "Summary of the agent."

    def generate_response(self, user_input):
        return f"Response to: {user_input}"


def print_agent_summaries(agents: Dict[str, GenerativeAgent]) -> None:
    """Prints the summaries of all agents."""
    for agent_name, agent in agents.items():
        print(f"{agent_name}'s summary:")
        print(agent.get_summary())
        print()

def select_agent(agents: Dict[str, GenerativeAgent]) -> Optional[GenerativeAgent]:
    """Lets the user choose an agent from the list or go back to the main menu."""
    print("Select an agent to chat with (or type 'menu' to go back to the main menu):")
    for i, (agent_name, agent) in enumerate(agents.items(), start=1):
        print(f"{i}. {agent_name}")

    while True:
        choice = input("> ")
        if choice.lower() == "menu":
            return None
        elif choice.isdigit() and 1 <= int(choice) <= len(agents):
            return list(agents.values())[int(choice) - 1]
        else:
            print("Invalid selection. Please enter a number corresponding to the agent or type 'menu' to go back to the main menu.")


def chat_loop(agents: Dict[str, GenerativeAgent]) -> None:
    while True:
        selected_agent = select_agent(agents)

        if selected_agent is None:
            break

        print(f"\nYou are now chatting with {selected_agent.name}. Use the following commands:\n"
              f"exitnow - Go back to the agent selection\n"
              f"/command main menu - Go back to the main menu\n"
              f"exitall - Quit the application\n")

        while True:
            user_input = input("> ")
            if user_input.lower() == "exitnow":
                break
            elif user_input.lower() == "/command main menu":
                return
            elif user_input.lower() == "exitall":
                return

            response = selected_agent.generate_response(user_input)  # Changed from 'interview_agent' to the 'generate_response' method
            print(f"{selected_agent.name} says: {response}\n")

def run_app(agents: Dict[str, GenerativeAgent]) -> None:
    while True:
        print("Welcome to the chat application!")
        print("1. Start chatting")
        print("2. Agent summaries")
        print("3. Exit")

        choice = input("Enter your choice: ")

        if choice == "1":
            chat_loop(agents)
        elif choice == "2":
            print_agent_summaries(agents)
        elif choice == "3":
            print("Goodbye!")
            break
        else:
            print("Invalid choice. Please try again.\n")


run_app(agents_dict)


Welcome to the chat application!
1. Start chatting
2. Agent summaries
3. Exit
Enter your choice: 2
Samantha's summary:
Name: Samantha Williams (age: 35)
Innate traits: detail-oriented, analytical
No statements or information provided to summarize Samantha Williams's core characteristics.

John's summary:
Name: John Thompson (age: 40)
Innate traits: compassionate, empathetic
John Thompson's core characteristics are not specified in the given statements.

Michelle's summary:
Name: Michelle Chen (age: 32)
Innate traits: patient, good communicator
It is not possible to provide a summary as there are no statements provided about Michelle Chen's characteristics.

Kevin's summary:
Name: Kevin Smith (age: 45)
Innate traits: analytical, trustworthy
Kevin Smith is a filmmaker known for his comedic and irreverent style. He often incorporates pop culture references and uses dialogue-driven storytelling. He has a strong fan base and is active on social media, where he engages with his followers. He

AttributeError: ignored

In [None]:
#@title Simulate Conversation Between Different Agents
def run_conversation(agents: List[GenerativeAgent], initial_observation: str) -> None:
    """Runs a conversation between agents."""
    _, observation = agents[1].generate_reaction(initial_observation)
    print(observation)
    turns = 0
    while True:
        break_dialogue = False
        for agent in agents:
            stay_in_dialogue, observation = agent.generate_dialogue_response(observation)
            print(observation)
            # observation = f"{agent.name} said {reaction}"
            if not stay_in_dialogue:
                break_dialogue = True
        if break_dialogue:
            break
        turns += 1


In [None]:
#@title Print all of the agent summaries
def print_agent_summaries(agents: List[GenerativeAgent]) -> None:
    """Prints the summaries of all agents."""
    for agent in agents:
        print(f"{agent.name}'s summary:")
        print(agent.get_summary())
        print()

In [None]:
agents = [jordan, tommie]
run_conversation(agents, "Jordan said: Have you seen the rabid racoon killing people next door!")

In [None]:
#@title Tommy Summary
#Now that Tommie has 'memories', their self-summary is more descriptive, though still rudimentary.
# We will see how this summary updates after more observations to create a more rich description.
print(tommie.get_summary(force_refresh=True))

In [None]:
def interview_agent(agent: GenerativeAgent, message: str) -> str:
    """Help the notebook user interact with the agent."""
    new_message = f"{USER_NAME} says {message}"
    return agent.generate_dialogue_response(new_message)[1]

In [None]:
interview_agent(tommie, "'/get_task_list'")

In [None]:
OBJECTIVE = "List what we have talked about so far"

In [None]:
llm = OpenAI(temperature=0,openai_api_key=openai_api_key)

In [None]:
# Logging of LLMChains
verbose=False
# If None, will keep on going forever
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
embeddings_model = OpenAIEmbeddings(openai_api_key=openai_api_key)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {}, relevance_score_fn=relevance_score_fn)
max_iterations: Optional[int] = 3
baby_agi = BabyAGI.from_llm(
    llm=llm,
    vectorstore=vectorstore,
    verbose=verbose,
    max_iterations=max_iterations
)

In [None]:
baby_agi({"objective": OBJECTIVE})