In [None]:
# create a modified version of the Agent's state to include updated accessible info
from typing import Type, Optional, List
from genworlds.agents.abstracts.agent_state import AbstractAgentState

class QnAAgentState(AbstractAgentState):
    users_question:Optional[str]
    retrieved_information_from_data_structures: List[str]

In [None]:
# create the action that retrieves info from sources
from genworlds.events.abstracts.action import AbstractAction
from genworlds.events.abstracts.event import AbstractEvent
from genworlds.objects.abstracts.object import AbstractObject
from objects.qdrant_bucket import VectorStoreCollectionRetrieveQuery, VectorStoreCollectionSimilarChunks

class AgentRetrievesFromSourcesEvent(AbstractEvent):
    event_type = "agent_retrieves_from_sources_event"
    description = "The agent triggers events to the different data structures to retrieve information before answering questions as an expert."
    users_query: str

class AgentRetrievesFromSources(AbstractAction):
    trigger_event_class = AgentRetrievesFromSourcesEvent
    description = "The agent triggers events to the different data structures to retrieve information before answering questions as an expert."

    def __init__(self, host_object: AbstractObject):
        super().__init__(host_object=host_object)

    def __call__(self, event: AgentRetrievesFromSourcesEvent):
        self.host_object.state_manager.state.users_question = event.users_query

        self.host_object.send_event(
            VectorStoreCollectionRetrieveQuery(
                sender_id=self.host_object.id,
                target_id=event.target_id,
                collection_name="genworlds-text-chunks",
                query=event.users_query,
            )
        )

        self.host_object.send_event(
            VectorStoreCollectionRetrieveQuery(
                sender_id=self.host_object.id,
                target_id=event.target_id,
                collection_name="genworlds-ner",
                query=event.users_query,
            )
        )

        sleep(5)

class AddRelevantInfoToState(AbstractAction):
    trigger_event_class = VectorStoreCollectionSimilarChunks
    description = "The agent modifies its state based on the relevant information retrieved from a Vectorstore."

    def __init__(self, host_object: AbstractObject):
        super().__init__(host_object=host_object)

    def __call__(self, event: AgentRetrievesFromSourcesEvent):
        for el in event.similar_chunks:
            self.host_object.state_manager.state.retrieved_information_from_data_structures.append(el)

In [None]:
# thought that provides justified answer based on sources
from pydantic import BaseModel, Field


from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.chains.openai_functions import (
    create_structured_output_chain,
)


from genworlds.agents.abstracts.agent_state import AbstractAgentState
from genworlds.agents.abstracts.thought import AbstractThought


class JustifiedAnswer(BaseModel):
    """Answers the user's question based on data extracted from different data structures."""

    answer: str = Field(
        ...,
        description="The final answer to the user.",
    )
    rationale: str = Field(
        ...,
        description="The rationale of why this is the correct answer to user's question.",
    )
    references: Optional[List[str]] = Field(
        ..., description="References to documents extracted from metadata."
    )

class AnswerFromSourcesThought(AbstractThought):
    def __init__(
        self,
        agent_state: AbstractAgentState # other thoughts unique init arg, so everything goes through state
    ):
        self.agent_state = agent_state
        self.llm = ChatOpenAI(
            model="gpt-4", openai_api_key=openai_api_key, temperature=0.1
        )

    def run(self):
        prompt = ChatPromptTemplate.from_messages(
            [
                ("system", "You are {agent_name}, {agent_description}."),
                (
                    "system",
                    "You are embedded in a simulated world with those properties {agent_world_state}",
                ),
                ("system", "Those are your goals: \n{goals}"),
                (
                    "system",
                    "And this is your current plan to achieve the goals: \n{plan}",
                ),
                (
                    "system",
                    "Here is your memories of all the events that you remember from being in this simulation: \n{memory}",
                ),
                (
                    "system",
                    "This is the last user's question: \n{user_question}",
                ),
                (
                    "system",
                    "This is relevant information related to the user's question: \n{relevant_info}",
                ),
                ("human", "{footer}"),
            ]
        )

        chain = create_structured_output_chain(
            output_schema=JustifiedAnswer.schema(),
            llm=self.llm,
            prompt=prompt,
            verbose=True,
        )
        response = chain.run(
            agent_name=self.agent_state.name,
            agent_description=self.agent_state.description,
            agent_world_state=self.agent_state.host_world_prompt,
            goals=self.agent_state.goals,
            plan=self.agent_state.plan,
            memory=self.agent_state.last_retrieved_memory,
            user_question=self.agent_state.users_question,
            relevant_info=self.agent_state.retrieved_information_from_data_structures,
            footer="""Provide a justified answer to user's question based on the different data structures, the rationale, and references when possible.
            """,
        )
        response = JustifiedAnswer.parse_obj(response)
        return response

In [None]:
# action that answers as an expert
from genworlds.agents.abstracts.thought_action import ThoughtAction
from genworlds.events.abstracts.event import AbstractEvent

class AnswerToUserAsExpertTriggerEvent(AbstractEvent):
    event_type = "agent_answers_as_expert_trigger_event"
    description = "An agent answers to the user's question based on retrieved data structures."
    justified_answer: str
    references: Optional[List[str]]

class AnswerToUserAsExpertEvent(AbstractEvent):
    event_type = "agent_answers_as_expert_event"
    description = "An agent answers to the user's question based on retrieved data structures."
    justified_answer: str
    references: Optional[List[str]]

class AnswerToUserAsExpert(ThoughtAction):
    trigger_event_class = AnswerToUserAsExpertTriggerEvent
    required_thoughts = {
        "justified_answer": AnswerFromSourcesThought,
    }
    description = "An agent answers to the user's question based on retrieved data structures."

    def __init__(self, host_object: AbstractObject):
        super().__init__(host_object=host_object)

    def __call__(self, event: AnswerToUserAsExpertTriggerEvent):
        self.host_object.send_event(
            AnswerToUserAsExpertEvent(
                sender_id=self.host_object.id,
                target_id=event.target_id,
                justified_answer=event.justified_answer,
                references = event.references,
            )
        )
        self.host_object.state_manager.state.users_question = None
        self.host_object.state_manager.state.retrieved_information_from_data_structures = []
        self.host_object.state_manager.state.other_thoughts_filled_parameters["justified_answer"] = ""

In [None]:
# create agent and add it to the world
agent_name = "qna_agent"
description = "An agent that helps users with their requests, always answering after reviewing information from available data structures such as vector stores."
answer_as_expert_action_chain = ["qna_agent:AgentRetrievesFromSources", "qna_agent:AnswerToUserAsExpert"]

initial_agent_state = QnAAgentState(
    name=agent_name,
    id=agent_name,
    description=description,
    goals=[
        "Starts waiting and sleeps till the user starts a new question.",
        f"Once {agent_name} receives a user's question, he makes sure to request relevant information from different data structures in order to have all the information before answering as an expert to the user.",
        f"When {agent_name} has all the required information, he answers as an expert based on the information from different data structures.",
        "After sending the response, he waits for the next user question.",
        "If you have been waiting for any object or entity to send you an event for over 30 seconds, you will wait and sleep until you receive a new event.",
    ],
    plan=[],
    last_retrieved_memory="",
    other_thoughts_filled_parameters={},
    available_entities=[],
    available_action_schemas={},
    current_action_chain=[],
    host_world_prompt="",
    simulation_memory_persistent_path="./",
    memory_ignored_event_types=(
        "world_sends_available_action_schemas_event",
        "world_sends_available_entities_event",
        "vector_store_collection_similar_chunks",
    ),
    wakeup_event_types=set(),  # fill
    is_asleep=False,
    action_schema_chains=[answer_as_expert_action_chain],
    users_question = None,
    retrieved_information_from_data_structures= [],
)

from genworlds.agents.concrete.basic_assistant.utils import generate_basic_assistant
from genworlds.worlds.concrete.base.actions import UserSpeaksWithAgentEvent

qna_agent = generate_basic_assistant(
    openai_api_key=openai_api_key,
    agent_name=agent_name,
    description=description,
    initial_agent_state=initial_agent_state,
    action_classes=[AgentRetrievesFromSources, AddRelevantInfoToState, AnswerToUserAsExpert]
)

qna_agent.add_wakeup_event(event_class=UserSpeaksWithAgentEvent)

## Attach Q&A to the World
RAGWorld.add_agent(qna_agent)

In [None]:
from genworlds.worlds.concrete.base.actions import UserSpeaksWithAgentEvent

# Format the message that will be sent to the simulation socket
test_msg = """Do you know what GenWorlds is?"""

message_to_send = UserSpeaksWithAgentEvent(
    sender_id=test_user.id, 
    created_at=datetime.now(),
    message=test_msg, 
    target_id="qna_agent"
).json()

sleep(1)

# Send the message to DCPI
test_user.socket_client.send_message(message_to_send)