# Simulartcra

In [1]:
import logging

logging.basicConfig(level=logging.ERROR)

from datetime import datetime, timedelta
from typing import List
import collections
import inspect
import tenacity
from termcolor import colored
import os
import io
import warnings
from PIL import Image
from stability_sdk import client
import stability_sdk.interfaces.gooseai.generation.generation_pb2 as generation

from langchain.chat_models import ChatOpenAI
from langchain.docstore import InMemoryDocstore
from langchain.embeddings import OpenAIEmbeddings
from langchain.retrievers import TimeWeightedVectorStoreRetriever
from langchain.vectorstores import FAISS
from langchain.schema import (
    HumanMessage,
    SystemMessage,
)
from langchain.output_parsers import RegexParser
from langchain_experimental.generative_agents import (
    GenerativeAgent,
    GenerativeAgentMemory,
)

from dotenv import load_dotenv

load_dotenv()
os.environ['STABILITY_HOST'] = 'grpc.stability.ai:443'

In [3]:
USER_NAME = "schoi"  # The name you want to use when interviewing the agent.
LLM = ChatOpenAI(max_tokens=1500)  # Can be any LLM you want.
# Set up our connection to the API.
stability_api = client.StabilityInference(
    key=os.environ['STABILITY_KEY'], # API Key reference.
    verbose=True, # Print debug messages.
    engine="stable-diffusion-xl-1024-v1-0", # Set the engine to use for generation.
    # Check out the following link for a list of available engines: https://platform.stability.ai/docs/features/api-parameters#engine
)

INFO:stability_sdk.client:Opening channel to grpc.stability.ai:443
INFO:stability_sdk.client:Channel opened to grpc.stability.ai:443


In [4]:

class GymnasiumGenerativeAgent:
    @classmethod
    def get_docs(cls, env):
        return env.unwrapped.__doc__

    def __init__(self, agent, env):
        self.agent = agent
        self.env = env
        self.docs = self.get_docs(env)

        self.instructions = """
Your goal is to maximize your return, i.e. the sum of the rewards you receive.
I will give you an observation, reward, terminiation flag, truncation flag, and the return so far, formatted as:

Observation: <observation>
Reward: <reward>
Termination: <termination>
Truncation: <truncation>
Return: <sum_of_rewards>

You will respond with an action, formatted as:

Action: <action>

where you replace <action> with your actual action.
Do nothing else but return the action.
"""
        self.action_parser = RegexParser(
            regex=r"Action: (.*)", output_keys=["action"], default_output_key="action"
        )

        self.message_history = []
        self.ret = 0

    def random_action(self):
        action = self.env.action_space.sample()
        return action

    def reset(self):
        self.message_history = [
            SystemMessage(content=self.docs),
            SystemMessage(content=self.instructions),
        ]

    def observe(self, obs, rew=0, term=False, trunc=False, info=None):
        self.ret += rew

        obs_message = f"""
Observation: {obs}
Reward: {rew}
Termination: {term}
Truncation: {trunc}
Return: {self.ret}
        """
        self.message_history.append(HumanMessage(content=obs_message))
        return obs_message

    def _act(self):
        act_message = self.model(self.message_history)
        self.message_history.append(act_message)
        action = int(self.action_parser.parse(act_message.content)["action"])
        return action

    def act(self):
        try:
            for attempt in tenacity.Retrying(
                stop=tenacity.stop_after_attempt(2),
                wait=tenacity.wait_none(),  # No waiting time between retries
                retry=tenacity.retry_if_exception_type(ValueError),
                before_sleep=lambda retry_state: print(
                    f"ValueError occurred: {retry_state.outcome.exception()}, retrying..."
                ),
            ):
                with attempt:
                    action = self._act()
        except tenacity.RetryError as e:
            action = self.random_action()
        return action

In [5]:
class PettingZooGenerativeAgent(GymnasiumGenerativeAgent):
    @classmethod
    def get_docs(cls, env):
        return inspect.getmodule(env.unwrapped).__doc__

    def __init__(self, name, agent, env):
        super().__init__(agent, env)
        self.name = name

    def random_action(self):
        action = self.env.action_space(self.name).sample()
        return action

In [6]:
class ActionMaskGenerativeAgent(PettingZooGenerativeAgent):
    def __init__(self, name, agent, env):
        super().__init__(name, agent, env)
        self.obs_buffer = collections.deque(maxlen=1)

    def random_action(self):
        obs = self.obs_buffer[-1]
        action = self.env.action_space(self.name).sample(obs["action_mask"])
        return action

    def reset(self):
        self.message_history = [
            SystemMessage(content=self.docs),
            SystemMessage(content=self.instructions),
        ]

    def observe(self, obs, rew=0, term=False, trunc=False, info=None):
        self.obs_buffer.append(obs)
        return super().observe(obs, rew, term, trunc, info)

    def _act(self):
        valid_action_instruction = "Generate a valid action given by the indices of the `action_mask` that are not 0, according to the action formatting rules."
        self.message_history.append(HumanMessage(content=valid_action_instruction))
        return super()._act()

In [7]:
import math
import faiss


def relevance_score_fn(score: float) -> float:
    """Return a similarity score on a scale [0, 1]."""
    # This will differ depending on a few things:
    # - the distance / similarity metric used by the VectorStore
    # - the scale of your embeddings (OpenAI's are unit norm. Many others are not!)
    # This function converts the euclidean norm of normalized embeddings
    # (0 is most similar, sqrt(2) most dissimilar)
    # to a similarity function (0 to 1)
    return 1.0 - score / math.sqrt(2)


def create_new_memory_retriever():
    """Create a new vector store retriever unique to the agent."""
    # Define your embedding model
    embeddings_model = OpenAIEmbeddings()
    # Initialize the vectorstore as empty
    embedding_size = 1536
    index = faiss.IndexFlatL2(embedding_size)
    vectorstore = FAISS(
        embeddings_model.embed_query,
        index,
        InMemoryDocstore({}),
        {},
        relevance_score_fn=relevance_score_fn,
    )
    return TimeWeightedVectorStoreRetriever(
        vectorstore=vectorstore, other_score_keys=["importance"], k=15
    )

In [8]:
## stability ai art
def generate_art(prompt, seed=4253978046, init_img=None, mask=None, samples=1, steps=50, cfg_scale=8.0, width=1024, height=1024, sampler=generation.SAMPLER_K_DPMPP_2M):
    answers = stability_api.generate(
        prompt=prompt,
        init_image=init_img, # Assign our previously generated img as our Initial Image for transformation.
        mask_image=mask,
        seed=seed, # If a seed is provided, the resulting generated image will be deterministic.
                        # What this means is that as long as all generation parameters remain the same, you can always recall the same image simply by generating it again.
                        # Note: This isn't quite the case for Clip Guided generations, which we'll tackle in a future example notebook.
        steps=steps, # Amount of inference steps performed on image generation. Defaults to 30. 
        cfg_scale=cfg_scale, # Influences how strongly your generation is guided to match your prompt.
        width=width, # Generation width, defaults to 512 if not included.
        height=height, # Generation height, defaults to 512 if not included.
        samples=samples, # Number of images to generate, defaults to 1 if not included.
        sampler=sampler
    )
    for resp in answers:
        for artifact in resp.artifacts:
            if artifact.finish_reason == generation.FILTER:
                warnings.warn(
                    "Your request activated the API's safety filters and could not be processed."
                    "Please modify the prompt and try again.")
            if artifact.type == generation.ARTIFACT_IMAGE:
                img = Image.open(io.BytesIO(artifact.binary))
                img.save(str(artifact.seed)+ ".png") # Save our generated images with their seed number as the filename.

## Creating the agents

In [None]:
alices_memory = GenerativeAgentMemory(
    llm=LLM,
    memory_retriever=create_new_memory_retriever(),
    verbose=False,
    reflection_threshold=8,  # we will give this a relatively low number to show how reflection works
)

alice = GenerativeAgent(
    name="Alice",
    age=25,
    traits="expressive, likes classical art",  # You can add more persistent traits here
    status="wants to create art",  # When connected to a virtual world, we can have the characters update their status
    memory_retriever=create_new_memory_retriever(),
    llm=LLM,
    memory=alices_memory,
)

bobs_memory = GenerativeAgentMemory(
    llm=LLM,
    memory_retriever=create_new_memory_retriever(),
    verbose=False,
    reflection_threshold=8,  # we will give this a relatively low number to show how reflection works
)

bob = GenerativeAgent(
    name="Bob",
    age=25,
    traits="bold, likes modern art",  # You can add more persistent traits here
    status="wants to create art",  # When connected to a virtual world, we can have the characters update their status
    memory_retriever=create_new_memory_retriever(),
    llm=LLM,
    memory=bobs_memory,
)

In [None]:
# We can add memories directly to the memory object
alices_observation = [
    "Alice is going to create art", 
    "Alice remembers having to collaborate with Bob",
    "Alice wants to come to a consensus with Bob"
]
bobs_observation = [
    "Bob is going to create art",
    "Bob remembers having to collaborate with Alice",
    "Bob wants to come to a consensus with Alice"
]

for observation in alices_observation:
    alice.memory.add_memory(observation)
for observation in bobs_observation:
    bob.memory.add_memory(observation)

In [None]:
def interview_agent(agent: GenerativeAgent, message: str) -> str:
    """Help the notebook user interact with the agent."""
    new_message = f"{USER_NAME} says {message}"
    return agent.generate_dialogue_response(new_message)[1]

In [None]:
interview_agent(alice, "What do you like to do?")

In [None]:
interview_agent(bob, "What do you like to do?")

In [None]:
env="env"

In [None]:
agents = {
    name: ActionMaskGenerativeAgent(name=name, agent=agent, env=env)
    for agent in [alice, bob]
}

## Main Event Loop

In [None]:
def main(agents, env):
    env.reset()

    for name, agent in agents.items():
        print(name)
        agent.reset()

    for agent_name in env.agent_iter():
        print(agent_name)
        observation, reward, termination, truncation, info = env.last()
        obs_message = agents[agent_name].observe(
            observation, reward, termination, truncation, info
        )
        print(obs_message)
        if termination or truncation:
            action = None
        else:
            action = agents[agent_name].act()
        print(f"Action: {action}")
        env.step(action)
    env.close()

In [None]:
main(agents, env)