# Toy Simulacra 1.3

In this notebook, our focus is on harnessing Language Learning Models (LLMs) to empower agents with decision-making capabilities regarding their daily plans.

## Objectives
Our objectives are threefold:

1. **Enable agents to plan their daily activities.** This involves not just deciding on the activities themselves but also scheduling when and where these activities will take place.
2. **Determine the actions and locations necessary for these activities** within their daily schedule, ensuring a structured approach to their day.
3. **Navigate the agents through the maze** to accomplish these tasks, adding a layer of spatial dynamics to their planning.

By leveraging LLMs and the agents' memory structures, we plan to define and utilize planning functions to achieve these goals.

In the next notebook, we will explore how to facilitate interactions among agents, further enriching the simulation environment.

## Setup 

This notebook requires a bit more setup than usual. A key part of this setup involves defining our function for prompting LLMs. For the purposes of this tutorial, we've utilized GPT-3.5 via Azure OpenAI Studio, and thus our prompt functions are tailored to this environment. Should you opt to use different APIs, adjustments to these prompt functions will be necessary.

**Note:** While most of the prompts have been directly adopted from the original Simulacra code, I've modified a few to enhance their reliability and reduce the likelihood of failure.

In [1]:
import json
import os
import time
import random
import re
import numpy as np
from pathlib import Path
from openai import AzureOpenAI
from datetime import datetime, timedelta

from maze import Maze

BASE_SIM_FOLDER = Path("./generative_agents/environment/frontend_server/storage/base_the_ville_isabella_maria_klaus/").resolve()
PERSONAS_FOLDER = BASE_SIM_FOLDER / "personas"

# Client to prompt LLMs
CLIENT = AzureOpenAI(
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
    api_version="2023-12-01-preview",
    azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
)

# Just a map between the name used for the model and the one recognized by the API
AZURE_MODEL_MAP = {
  'gpt-3.5-turbo-instruct': 'gpt-35-turbo-instruct',
  'gpt-3.5-turbo': 'gpt-35-turbo'
}

# Default parameters
GPT_PARAMS = {
    "engine": "gpt-3.5-turbo-instruct", 
    "max_tokens": 500, 
    "temperature": 1.0, 
    "top_p": 1, 
    "stream": False,
    "frequency_penalty": 0, 
    "presence_penalty": 0, 
    "stop": None
}

TIME_SLEEP_BETWEEN_REQUESTS = 0.1 # seconds

# All prompts are stored in this folder. We need to replace various parts with the inputs.
TEMPLATE_FOLDER = Path('./prompt_templates').resolve()
print("template folder", TEMPLATE_FOLDER)

# Used for hourly planning
HOUR_STR = ["00:00 AM", "01:00 AM", "02:00 AM", "03:00 AM", "04:00 AM", 
              "05:00 AM", "06:00 AM", "07:00 AM", "08:00 AM", "09:00 AM", 
              "10:00 AM", "11:00 AM", "12:00 PM", "01:00 PM", "02:00 PM", 
              "03:00 PM", "04:00 PM", "05:00 PM", "06:00 PM", "07:00 PM",
              "08:00 PM", "09:00 PM", "10:00 PM", "11:00 PM"]

# Log files
LOGFILE = "./prompts_log.txt"
SIM_LOGFILE = "./sim_logs.txt"
PRINT_PROMPTS = True

template folder /Users/gupta/Workspace/tutorials/simulacra/prompt_templates


In [2]:
# Basic prompting helper functions

def get_embedding(text):
    """Returns the embedding for `text` according to the LLM."""
    text = text.replace("\n", " ")
    if not text:
        text = "blank"

    response = CLIENT.embeddings.create(
        input = [text],
        model="text-embedding-ada-002"
    )
    return response.data[0].embedding

def generate_prompt(prompt_inputs, template_file):
    """Returns the prompt after replacing relevant spots with the input. """
    with open(template_file) as f:
        template = f.read()

    prompt = template.split("<prompt_start>###</prompt_start>")[1]
    for count, input in enumerate(prompt_inputs):
        replace_str = input if input is not None else ""
        prompt = prompt.replace(f"!<INPUT {count}>!", replace_str)

    return prompt.strip()

def prompt_gpt(prompt, parameters):
    """Returns the response from prompting the LLM."""
    try:
        response = CLIENT.completions.create(
            model=AZURE_MODEL_MAP[parameters["engine"]],
            prompt=prompt,
            temperature=parameters["temperature"],
            max_tokens=parameters["max_tokens"],
            top_p=parameters["top_p"],
            frequency_penalty=parameters["frequency_penalty"],
            presence_penalty=parameters["presence_penalty"],
            stream=parameters["stream"],
            stop=parameters["stop"]
        )
        return response.choices[0].text
    except Exception as e:
        print(e)
        return -1

def safe_prompting(prompt, parameters, func_clean_up, func_validate=None, repeat=5):
    """Wrapper around the prompt to ensure repeated queries in case of failing API calls."""
    if func_validate is None:
        def func_validate(response):
            try: func_clean_up(response)
            except: return False
            return True

    for i in range(repeat):
        curr_response = PROMPT_FN(prompt, parameters)
        if func_validate(curr_response):
            return func_clean_up(curr_response)
        else:
            time.sleep(TIME_SLEEP_BETWEEN_REQUESTS)

    print(f"{prompt} failed after {repeat} attempt. Returning None.")
    return None

In [3]:
PROMPT_FN = prompt_gpt # If you use some other function to prompt LLMs, specify here. 

Define some helper function to log various aspects of the simulation. Here we define such logs for prompts. We will define more in the next notebook. 

In [4]:
def print_prompt(fn_name, persona, prompt, response, params, do_not_print=False):
    if do_not_print: return
    curr_time = persona.scratch.curr_time.strftime('%A %B %d %H:%M:%S')
    with open(LOGFILE, mode="a") as f:
        string = "\n\n" + ">"*50 + "<"*50 + "\n\n" + str(params) + "\n\n"
        print(f"{string}{curr_time}\n{fn_name} --- {persona.name}\n\n --- PROMPT: ---\n{prompt}\n\n--- RESPONSE: ---\n{response}", file=f)

def print_to_file(string, logfile):
    with open(logfile, 'a') as f:
        print(string, file=f)

def _normalize(seq):
    _min = min(seq)
    _max = max(seq)
    return [(i - _min) / (1e-6 + _max-_min) for i in seq]

def cos_sim(a,b):
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))


### Spatial Memory

In [5]:
class SpatialMemoryTree: 
    def __init__(self, f_saved): 
        self.tree = {}
        self.tree = json.load(open(f_saved))
    
    def print_tree(self): 
        def _print_tree(tree, depth):
            dash = " >" * depth
            if type(tree) == type(list()): 
                if tree:
                  print (dash, tree)
                return 
            
            for key, val in tree.items(): 
                if key: 
                  print (dash, key)
                _print_tree(val, depth+1)
        _print_tree(self.tree, 0)
    
    def get_str_accessible_sectors(self, curr_world):
        return ", ".join(list(self.tree[curr_world].keys()))
    
    def get_str_accessible_sector_arenas(self, sector):
        curr_world, curr_sector = sector.split(":")
        if not curr_sector:
            return ""
        return ", ".join(list(self.tree[curr_world][curr_sector].keys()))

    def get_str_accessible_arena_game_objects(self, arena):
        curr_world, curr_sector, curr_arena = arena.split(":")
        if not curr_arena:
            return ""

        try: 
            x = ", ".join(self.tree[curr_world][curr_sector][curr_arena])
        except:
            x = ", ".join(self.tree[curr_world][curr_sector][curr_arena.lower()])

        return x      

In [6]:
filename = str(PERSONAS_FOLDER / "Isabella Rodriguez/bootstrap_memory/spatial_memory.json")
s_mem = SpatialMemoryTree(filename)
s_mem.print_tree()

 the Ville
 > Hobbs Cafe
 > > cafe
 > > > ['refrigerator', 'cafe customer seating', 'cooking area', 'kitchen sink', 'behind the cafe counter', 'piano']
 > Isabella Rodriguez's apartment
 > > main room
 > > > ['bed', 'desk', 'refrigerator', 'closet', 'shelf']
 > The Rose and Crown Pub
 > > pub
 > > > ['shelf', 'refrigerator', 'bar customer seating', 'behind the bar counter', 'kitchen sink', 'cooking area', 'microphone']
 > Harvey Oak Supply Store
 > > supply store
 > > > ['supply store product shelf', 'behind the supply store counter', 'supply store counter']
 > The Willows Market and Pharmacy
 > > store
 > > > ['behind the pharmacy counter', 'pharmacy store shelf', 'pharmacy store counter', 'grocery store shelf', 'behind the grocery counter', 'grocery store counter']
 > Dorm for Oak Hill College
 > > garden
 > > > ['dorm garden']
 > > common room
 > > > ['common room sofa', 'pool table', 'common room table']
 > Johnson Park
 > > park
 > > > ['park garden']


### Associative memory (Long term memory)

In [7]:
class ConceptNode:
    node_id: int # bookkeeping
    node_count: int #bookkeeping
    node_type: str # thought / event / chat
    type_count: int # bookkeeping
    depth: int # 

    created: int # 
    expiration: int #

    subject: str # subject usually the agent itself
    predicate: str # 
    object: str # object of this event

    description: str # A full description of the event (usually obtained from LLM)
    embedding_key: str # a key to reference while accessing the embeddings of the event
    embedding: int  # a vector instead. 
    poignancy: int # ??
    keywords: list # keywords to retrieve this node
    filling: int # ??
    
    def __init__(self, **kwargs):
        for key in self.__annotations__:
            setattr(self, key, kwargs.get(key, None)) 
        self.last_accessed = self.created

    def spo_summary(self):
        return (self.subject, self.predicate, self.object)


class AssociativeMemory:
    def __init__(self, folder_name):
        self.id_to_node = dict()

        self.seq_event = []
        self.seq_thought = []
        self.seq_chat = []

        self.kw_to_event = dict()
        self.kw_to_thought = dict()
        self.kw_to_chat = dict()

        x = json.load(open(folder_name + '/kw_strength.json'))
        self.kw_strength_event = x.get('kw_strength_event', dict())
        self.kw_strength_thought = x.get('kw_strength_thought', dict())

        self.embeddings = json.load(open(folder_name + "/embeddings.json"))
        nodes = json.load(open(folder_name + "/nodes.json"))
        for count in range(len(nodes)):
            node_id = f"node_{str(count+1)}"
            node_details = nodes[node_id]

            node_count = node_details['node_count']
            depth = node_details['depth']

            created = datetime.datetime.strptime(node_details['created'], '%Y-%m-%d %H:%M:%S')

            expiration = None
            if node_details['expiration']:
                expiration = datetime.datetime.strptime(node_details['expiration'], '%Y-%m-%d %H:%M:%S')

            subject, predicate, object = node_details['subject'], node_details['predicate'], node_details['object']
            description = node_detals['description']
            embedding_pair = (node_details['embedding_key'], self.embeddings[node_details['embedding_key']])
            poignancy = node_details['poignancy']
            keywords = set(node_details['keywords'])
            filling = node_details['filling']

            self.add_node(node_type, created, expiration, subject, predicate,
                 object, description, keywords, poignancy, embedding_pair, filling)
        
    def add_node(self, node_type, created, expiration, subject, predicate,
                 object, description, keywords, poignancy, embedding_pair, filling):

        node_count = len(self.id_to_node.keys()) + 1
        node_id = f'node_{node_count}'

            
        if node_type == 'chat':
            type_count = len(self.seq_chat) + 1
            depth = 0
        elif node_type == 'event':
            type_count = len(self.seq_event) + 1
            depth = 0  
        elif node_type == 'thought':
            type_count = len(self.seq_thought) + 1
            depth = 1
            if filling:
                depth += max([self.id_to_node[i].depth for i in filling])
                
        node = ConceptNode(node_id=node_id, node_count=node_count, type_count=type_count, node_type=node_type, depth=depth,
                           created=created, expiration=expiration, subject=subject, predicate=predicate,
                           object=object, description=description, embedding_key=embedding_pair[0], poignancy=poignancy,
                           keywords=keywords, filling=filling, embedding=embedding_pair[1])

        if node_type == 'chat':
            self.seq_chat[0:0] = [node]
            cache, cache_strength = self.kw_to_chat, None
        elif node_type == 'event':
            self.seq_event[0:0] = [node]
            cache, cache_strength = self.kw_to_event, self.kw_strength_event
        elif node_type == 'thought':
            self.seq_thought[0:0] = [node]
            cache, cache_strength = self.kw_to_thought, self.kw_strength_thought

        keywords = [i.lower() for i in keywords]
        for kw in keywords:
            if kw in cache:
                cache[kw][0:0] = [node]
            else:
                cache[kw] = [node]

            if cache_strength is not None and f"{predicate} {object}" != "is idle":
                if kw in cache_strength:
                    cache_strength[kw] += 1
                else:
                    cache_strength[kw] = 1

        self.embeddings[embedding_pair[0]] = embedding_pair[1]

        return node
            
        
    def get_summarized_latest_events(self, retention):
        ret_set = set()
        for e_node in self.seq_event[:retention]:
            ret_set.add(e_node.spo_summary())
        return ret_set

    def get_str_seq_events(self):
        ret_str = ""
        for count, event in enumerate(self.seq_event):
            ret_str += f'{"Event", len(self.seq_event) - count, ": ", event.spo_summary(), " -- ", event.description}\n' # returns a string of tuple
        return ret_str    

    def get_str_seq_thoughts(self):
        ret_str = ""
        for count, event in enumerate(self.seq_event):
            ret_str += f'{"Thought", len(self.seq_event) - count, ": ", event.spo_summary(), " -- ", event.description}\n' # returns a string of tuple
        return ret_str  

    def get_str_seq_chats(self):
        ret_str = ""
        for event in self.seq_chat:
            ret_str += f"with {event.object.content} ({event.description})\n"
            ret_str += f"{event.created.strftime('%B %d, %Y, %H:%M:%S')}\n"
            for row in event.filling:
                ret_str += f"{row[0]}: {row[1]}\n"
        return ret_str
            

    def retrieve_relevant_thoughts(self, s, p, o):
        contents = [s, p, o]
        ret = []
        for i in contents:
            if i in self.kw_to_thought:
                ret += self.kw_to_thought[i.lower()]
        return set(ret)

    def retrieve_relevant_events(self, s, p, o):
        contents = [s, p, o]
        ret = []
        for i in contents:
            if i in self.kw_to_event:
                ret += self.kw_to_event[i.lower()]
        return set(ret)
    

### Short term memory (scratch)

In [8]:
class Scratch:
    def __init__(self, filename):
        scratch = json.load(open(filename))

        # PERSONA HYPERPARAMETERS
        self.vision_r = scratch.get('vision_r', 4) # Radius of visible boundaries
        self.att_bandwidth = scratch.get('att_bandwidth', 3) # ??
        self.retention = scratch.get('retention', 5) # ??

        # CORE IDENTITY 
        self.name = scratch.get('name', None)
        self.first_name = scratch.get('first_name', None)
        self.last_name = scratch.get('last_name', None)
        self.age = scratch.get('age', None)
        self.innate = scratch.get('innate', None) # Nature of the person
        self.learned = scratch.get('learned', None) # Learned traits of the person
        self.currently = scratch.get('currently', None) # Any current plans?
        self.lifestyle = scratch.get('lifestyle', None) # Lifestyle of the person
        self.living_area = scratch.get('living_area', None) # General living area; area where they spent the time most outside of work

        # RETRIEVAL RELATED VARIABLES
        self.recency_w = scratch.get('recency_w', 1)
        self.relevance_w = scratch.get('relevance_w', 1)
        self.importance_w = scratch.get('importance_w', 1)
        self.recency_decay = scratch.get('recency_decay', 0.99)

        # WORLD INFORMATION
        self.curr_time = scratch.get('curr_time', datetime.now()) # What's the current time?
        self.curr_tile = scratch.get('curr_tile', None) # Where is the person now?
        self.daily_plan_req = scratch.get('daily_plan_req', '') # What's a typical daily plan?
        tile_filename = '/'.join(filename.split('/')[:-4]) + '/environment/0.json'
        curr_tile = json.load(open(tile_filename))[self.name]
        self.curr_tile = (curr_tile['x'], curr_tile['y'])

        # PLANNING VARIABLES
        self.daily_req = scratch.get('daily_req', [])
        self.f_daily_schedule = scratch.get('f_daily_schedule', []) # This is changed every time the hourly activity is decomposed
        self.f_daily_schedule_hourly_org = scratch.get('f_daily_schedule_hourly_org', []) # This remains same as f_daily_schedule

        # ACTIONS OF THE AGENT
        self.act_address = scratch.get('act_address', None)
        self.act_start_time = scratch.get('act_start_time', None)
        self.act_duration = scratch.get('act_duration', None)
        self.act_description = scratch.get('act_description', None)
        self.act_event = scratch.get('act_event', (self.name, None, None)) # See the section on events.

        # CONVERSATION VARIABLES
        self.chatting_with = scratch.get('chatting_with', None)
        self.chat = scratch.get('chat', None)
        self.chatting_with_buffer = scratch.get('chatting_with_buffer', dict())
        self.chatting_end_time = scratch.get('chatting_end_time', None)

    def get_f_daily_schedule_index(self, advance=0, main=True):
        """
        Returns the index of action that is taking place now (advance=0) or sometime in future for a non-zero advance minutes. 
        """ 
        total_time_elapsed = self.curr_time.hour * 60 
        total_time_elapsed += self.curr_time.minute + advance

        ref_list = self.f_daily_schedule if main else self.f_daily_schedule_hourly_org
        elapsed, curr_index = 0, 0
        for task, duration in ref_list:
            elapsed += duration
            if elapsed > total_time_elapsed:
                return curr_index
            curr_index += 1
        return curr_index

    def get_str_iss(self):
        # ISS stands for Identity Stable Set - a bare minimum description of the persona that is used in prompts that need to call on the persona.
        commonset = ""
        commonset += f"Name: {self.name}\n"
        commonset += f"Age: {self.age}\n"
        commonset += f"Innate traits: {self.innate}\n"
        commonset += f"Learned traits: {self.learned}\n"
        commonset += f"Currently: {self.currently}\n"
        commonset += f"Lifestyle: {self.lifestyle}\n"
        commonset += f"Daily plan requirement: {self.daily_plan_req}\n"
        if self.curr_time:
            commonset += f"Current Date: {self.curr_time.strftime('%A %B %d')}\n"
        return commonset

    def add_new_action(self, 
                       action_address,
                       action_duration,
                       action_description,
                       action_event,
                       chatting_with, 
                       chat, 
                       chatting_with_buffer,
                       chatting_end_time,
                       act_obj_description,
                       act_obj_event,
                       act_start_time=None):
        self.act_address = action_address
        self.act_duration = action_duration
        self.act_description = action_description
        self.act_event = action_event

        self.chatting_with = chatting_with
        self.chat = chat
        if chatting_with_buffer:
            self.chatting_with_buffer.update(chatting_with_buffer)

        self.chatting_end_time = chatting_end_time 
        self.act_start_time = self.curr_time # This is the start time of the action
        self.act_path_set = False

    def act_check_finished(self):
        # Returns True if the action has finished
        if not self.act_address:
            return True

        # Compute the end time for the chat
        if self.chatting_with:
            end_time = self.chatting_end_time
        else:
            x = self.act_start_time
            if x.second != 0:
                x = x.replace(second=0)
                x = (x + timedelta(minutes=1))
            end_time = (x + timedelta(minutes=self.act_duration))

        if end_time < self.curr_time:
            return True

        return False

    def act_summarize_str(self):
        start_datetime_str = self.act_start_time.strftime('%A %B %d -- %H:%M %p')
        x = f"[{start_datetime_str}]\n"
        x += f"Activity: {self.name} is {self.act_description}\n"
        x += f"Address: {self.act_address}\n"
        x += f"Duration in minutes (e.g., x min): {str(self.act_duration)} min\n"
        return ret

    def get_curr_event_and_desc(self): 
        if not self.act_address: 
          return (self.name, None, None, None)
        else: 
          return (self.act_event[0], 
                  self.act_event[1], 
                  self.act_event[2],
                  self.act_description)


In [9]:
filename = str(PERSONAS_FOLDER / "Isabella Rodriguez/bootstrap_memory/scratch.json")
stm_mem = Scratch(filename)
print(stm_mem.get_str_iss())

Name: Isabella Rodriguez
Age: 34
Innate traits: friendly, outgoing, hospitable
Learned traits: Isabella Rodriguez is a cafe owner of Hobbs Cafe who loves to make people feel welcome. She is always looking for ways to make the cafe a place where people can come to relax and enjoy themselves.
Currently: Isabella Rodriguez is planning on having a Valentine's Day party at Hobbs Cafe with her customers on February 14th, 2023 at 5pm. She is gathering party material, and is telling everyone to join the party at Hobbs Cafe on February 14th, 2023, from 5pm to 7pm.
Lifestyle: Isabella Rodriguez goes to bed around 11pm, awakes up around 6am.
Daily plan requirement: Isabella Rodriguez opens Hobbs Cafe at 8am everyday, and works at the counter until 8pm, at which point she closes the cafe.



## Integrating Memory into Agents

In this section, we will implement function calls responsible for the daily and hourly planning activities of the agent. These plans will outline the agents' destinations, the events they will observe, their actions, and the reflections they will undergo as a result. Specifically, we will:

- Establish the daily agenda for each agent.
- Determine the subsequent action for them to undertake.

This structured approach ensures that the agents' activities are both purposeful and reflective of their capabilities and environments.

In [10]:
class Persona:
    def __init__(self, name, folder_mem, curr_time, initiate_plan=True):
        self.name = name
        self.s_mem = SpatialMemoryTree(f"{folder_mem}/bootstrap_memory/spatial_memory.json")
        self.a_mem = AssociativeMemory(f"{folder_mem}/bootstrap_memory/associative_memory")
        self.scratch = Scratch(f"{folder_mem}/bootstrap_memory/scratch.json")
        self.scratch.curr_time = curr_time
        
        if initiate_plan:
            self.generate_day_plan(first_day=True)

    def generate_day_plan(self, first_day):
        # To ensure continuity in plans, we update currently that is akin to reflection on the day and broad goals for the next day
        if not first_day:
            self.scratch.currently = get_new_currently(self)
            
        # Generating persona's daily plan in the short term memory
        self.scratch.daily_req = generate_day_plan(self)
        self.scratch.f_daily_schedule = generate_hourly_schedule(self) # Breaks down the plan into sub units with coherence across the time
        # f_daily_schedule_hourly_org keeps the hourly schedule intact where as f_daily_schedule 
        # further decomposes hourly activities therefore it changes as the simulation evolves.
        self.scratch.f_daily_schedule_hourly_org[:] = (self.scratch.f_daily_schedule)

        # Adding the broad plan to the long term memory (associative memory) as thoughts
        curr_date = self.scratch.curr_time.strftime("%A %B %d")
        thought = f"This is {self.scratch.first_name}'s plan for {curr_date}"
        for i in self.scratch.daily_req:
            thought += f" {i},"
        thought = thought[:-1] + "."
        created = self.scratch.curr_time
        expiration =  self.scratch.curr_time + timedelta(days=7) ## EXPIRY = 7 days
        s, p, o = (self.scratch.name, "plan", curr_date)
        keywords = set(["plan"])
        poignancy = 5
        thought_embedding_pair = (thought, get_embedding(thought))
        self.a_mem.add_node("thought", created, expiration, s, p, o,
                               thought, keywords, poignancy, thought_embedding_pair, None)

    def perceive_and_retrieve_and_focus(self):
        pass

    def advance_one_step(self, maze, personas, curr_time):
        if self.scratch.curr_time.strftime('%A %B %d') != curr_time.strftime('%A %B %d'):
            self.generate_day_plan(first_day=False)
            
        self.scratch.curr_time = curr_time
        if self.scratch.act_check_finished():
            new_action = determine_action(self, maze)
            self.scratch.add_new_action(**new_action)

            # determine it's location
            action_address = new_action['action_address']
            target_tiles = maze.address_tiles[action_address]
            # Note: we take a random choice from the available tiles; there is more logic in the original Simulacra design
            new_tile = random.sample(list(target_tiles), 1)[0] 
            return new_tile
        return None

    def get_curr_tile(self):
        return self.scratch.curr_tile

    def move(self, new_tile):
        self.scratch.curr_tile = new_tile
        

## Scheduling

Traditionally, agent-based models (or simulations) relied on hardcoded behaviors to dictate how agents behave in their daily lives, encompassing activities like going to school or work. Hardcoding such behaviors requires extensive effort due to the myriad of possible scenarios, often resulting in a loss of heterogeneity among agents due to their similar patterns of behavior.

The advent of Language Learning Models (LLMs) has revolutionized this aspect, making it feasible to bypass the arduous task of hardcoding. By providing a description of an agent's character, LLMs can be prompted to outline suitable daily activities. The Original Simulacra code adopts this innovative approach by defining a broad daily agenda (`generate_day_plan`), such as accomplishing specific goals (e.g., writing a research paper). This agenda is then broken down into an hourly schedule (`generate_hourly_schedule`), detailing actions for each hour to achieve the day’s goals (e.g., visiting the library to write). Further, as the simulation clock reaches specific times, these hourly plans are decomposed into smaller, more detailed tasks (`generate_task_decompose`), like writing at the desk.

A key feature of Simulacra's design is that the hourly breakdown occurs only at the designated simulation time, allowing the day’s unfolding events to potentially modify future schedules. This flexibility is achieved through the `determine_action` function.

However, LLMs primarily offer textual descriptions of activities. Translating these descriptions into specific locations within the simulation environment requires additional prompting. This translation is accomplished through functions like generate_action_sector, generate_action_sector_arena, generate_action_sector_arena_object, and generate_action_event_triple, incorporating both environmental and agent-specific information.

As events unfold and interactions occur throughout the day, agents may need to adjust their priorities for the following day. This involves updating their current goals, a process handled by the `generate_new_currently` function, which leverages `extract_relevant_nodes` to identify significant events from the past.

In [11]:
def generate_day_plan(persona):
    """Produces broad agenda for the day. """
    prompt_template_file = str(TEMPLATE_FOLDER / "day_planning.txt")
    prompt_input = [
        persona.scratch.get_str_iss(),
        persona.scratch.lifestyle,
        persona.scratch.curr_time.strftime("%A %B %d"),
        persona.scratch.first_name
    ]
    
    prompt = generate_prompt(prompt_input, prompt_template_file)
    schedule = safe_prompting(prompt, GPT_PARAMS, lambda x:x)
    if PRINT_PROMPTS:
        print_prompt("generate_daily_plan", persona, prompt, schedule, GPT_PARAMS)

    schedule = prompt + schedule
    schedule = schedule[schedule.find("1)") + 2:]
    schedule = re.split(r'\d+\)', schedule)
    return [i.strip() for i in schedule]

def generate_hourly_schedule(persona):
    """Uses broad agenda for the day to plan hourly schedule."""
    curr_date = persona.scratch.curr_time.strftime("%A %B %d")
    prompt_template_file = str(TEMPLATE_FOLDER / "hourly_planning.txt")
    # Example of a schedule
    schedule_format = ""
    for hour in HOUR_STR:
        schedule_format += f"[{curr_date} -- {hour}]"
        schedule_format += f" Activity: [Fill in]\n"
    schedule_format = schedule_format[:-1]

    # Broad plan of the persona
    plan_str = f"Here is the orginally intended today's schedule of {persona.scratch.first_name}: "
    for count, activity in enumerate(persona.scratch.daily_req):
        plan_str += f"({str(count+1)}) {activity}, "
    plan_str = plan_str[:-2]
    plan_str += f"\nIf {persona.scratch.first_name} is sleeping, use 'sleeping' as the activity"

    prompt_inputs = [schedule_format, persona.scratch.get_str_iss(), plan_str, None, None]

    # today's prior schedule (needed for coherence)
    activities_list = []
    prior_schedule = "\n"
    for count, hour in enumerate(HOUR_STR):
        # prepare the string for prior schedule
        if count > 0:
            prior_schedule += f"{curr_date} -- {HOUR_STR[count-1]} Acitvity:"
            prior_schedule += f" {persona.scratch.first_name} is {activities_list[count - 1]}\n"
            prompt_inputs[-2] = prior_schedule

        # final prompt to be completed
        final_prompt = f" [{curr_date} -- {hour}] Activity: {persona.scratch.first_name} is"
        prompt_inputs[-1] = final_prompt

        # modify the parameters because we don't need to generate a lot of tokens
        prompt = generate_prompt(prompt_inputs, prompt_template_file)
        params = GPT_PARAMS.copy()
        params['stop'] = ['\n']
        params['temperature'] = 0.5
        params['max_tokens'] = 50
        next_hour_activity = safe_prompting(prompt, params, lambda x:x)

        if PRINT_PROMPTS:
            print_prompt("generate_hourly_schedule", persona, prompt, next_hour_activity, params)
        
        activities_list.append(next_hour_activity.strip())

    # post-processing the output
    compressed_list = [('###', 0)]
    for activity in activities_list:
        if compressed_list[-1][0] == activity:
            compressed_list[-1][1] += 1
        else:
            compressed_list.append([activity, 1])
    compressed_list.pop(0)

    return [(x, y*60) for x,y in compressed_list]

def generate_task_decompose(persona, act_desc, act_dura):
    """Generates 5 min increments of the action for the action duration."""
    curr_date = persona.scratch.curr_time.strftime("%A %B %d")
    prompt_template_file = str(TEMPLATE_FOLDER / "decompose_task.txt")
    
    curr_f_org_index = persona.scratch.get_f_daily_schedule_index(main=False) # gets from f_daily_schedule_hourly_org
    # print(curr_f_org_index, persona.scratch.f_daily_schedule_hourly_org, len(persona.scratch.f_daily_schedule_hourly_org))
    # Prepare a summary string to capture an hour before and an hour after the current action event
    summary_str = f"Today is {curr_date}. From "
    for index in [curr_f_org_index- 1, curr_f_org_index, curr_f_org_index+1]:
        if index >= len(persona.scratch.f_daily_schedule_hourly_org) or index < 0:
            continue

        start_min = sum(i[1] for i in persona.scratch.f_daily_schedule_hourly_org[:index])
        action, time_elapsed = persona.scratch.f_daily_schedule_hourly_org[index]
        start_time = datetime.strptime("00:00:00", "%H:%M:%S") + timedelta(minutes=start_min)
        end_time = start_time + timedelta(minutes=time_elapsed)

        start_time_str, end_time_str = start_time.strftime('%H:%M%p'), end_time.strftime('%H:%M%p')
        summary_str += f"{start_time_str} ~ {end_time_str}, {persona.name} is planning {action}, "

        if index == curr_f_org_index: # We are interested in decomposing the activity at curr_f_org_index
            curr_time_range = f"{start_time_str} ~ {end_time_str}"
    summary_str = summary_str[:-2] + "."

    prompt_inputs = [
        persona.scratch.get_str_iss(),
        summary_str,
        persona.scratch.first_name,
        act_desc,
        curr_time_range,
        str(act_dura),
    ]

    params = GPT_PARAMS.copy()
    prompt  = generate_prompt(prompt_inputs, prompt_template_file)
    response = safe_prompting(prompt, params, lambda x:x)
    
    if PRINT_PROMPTS:
        print_prompt("generate_task_decompose", persona, prompt, response, params)

    full_str = prompt + response
    rem_str = full_str.split("---")[3]
    schedule = re.split(r'\d+\>', rem_str)
    schedule = [i.strip() for i in schedule if i.strip()]

    # post-process this schedule to 5 min increments
    activities = [["dummy", -1]]
    for activity in schedule:
        task, rest = activity.split("(duration in minutes:")
        duration = int(rest.split(",")[0])
        activities.append([task.strip(), duration])
    return activities[1:]

def determine_action(persona, maze):
    """Updates the agent's activity as well as decompose if necessary."""
    
    def determine_decompose(act_desc, act_dura):
        if "sleeping" in act_desc:
            return False
        if act_dura < 60:
            return False 
        return True
    
    curr_action_index = persona.scratch.get_f_daily_schedule_index()

    act_desc, act_dura =  persona.scratch.f_daily_schedule[curr_action_index]
    if determine_decompose(act_desc, act_dura):
        persona.scratch.f_daily_schedule[curr_action_index: curr_action_index+1] = (
            generate_task_decompose(persona, act_desc, act_dura) # GPT
        )

        print(f"revised schedule --- {persona.name}\n", persona.scratch.f_daily_schedule)
        # to add up minutes
        total_time_accounted = sum(i[1] for i in persona.scratch.f_daily_schedule)
        if total_time_accounted < 1440:
            persona.scratch.f_daily_schedule += [["sleeping", 1440 - total_time_accounted]]

    act_desc, act_dura = persona.scratch.f_daily_schedule[curr_action_index]

    # Now we determine this action's location to execute
    act_world = maze.access_tile(persona.scratch.curr_tile)['world']
    act_sector = generate_action_sector(act_desc, persona, maze, 
                                         curr_determined_address=act_world) # GPT
    act_arena = generate_action_sector_arena(act_desc, persona, maze, 
                                        curr_determined_address=f"{act_world}:{act_sector}") # GPT
    act_object = generate_action_sector_arena_object(act_desc, persona, maze, 
                                                     curr_determined_address=f"{act_world}:{act_sector}:{act_arena}") # GPT
    new_address = f"{act_world}:{act_sector}:{act_arena}" 
    new_address += f":{act_object}" if act_object else ""
    act_event = generate_action_event_triple(act_desc, persona)
    act_obj_desc = None
    act_obj_event = None

    return {
        "action_address": new_address,
        "action_duration": int(act_dura),
        "action_description": act_desc,
        "action_event": act_event,
        "chatting_with": None, "chat": None, "chatting_with_buffer": None, "chatting_end_time": None,
        "act_obj_description": act_obj_desc,
        "act_obj_event": act_obj_event,
    }
                                   
def generate_action_sector(action, persona, maze, curr_determined_address=None):
    """Returns the appropriate sector to carry out that action."""
    prompt_template_file = str(TEMPLATE_FOLDER / "determine_action_sector.txt")
    curr_tile = persona.scratch.curr_tile
    tile_info = maze.access_tile(curr_tile)
    world, curr_sector = tile_info['world'], tile_info['sector']
    all_sectors = persona.s_mem.get_str_accessible_sectors(f"{world}")
    curr_sector_arenas = persona.s_mem.get_str_accessible_sector_arenas(f"{world}:{curr_sector}")
    
    living_area = persona.scratch.living_area
    living_area_sector = living_area.split(":")[1]
    living_area_arenas = persona.s_mem.get_str_accessible_sector_arenas(f"{world}:{living_area_sector}")
    prompt_inputs = [
        persona.scratch.name,
        living_area_sector,
        living_area_arenas,
        curr_sector,
        curr_sector_arenas,
        all_sectors,
        action
    ]
    
    params = GPT_PARAMS.copy()
    params['max_tokens'] = 20
    params['temperature'] = 0
    params['top_p'] = 1
    prompt  = generate_prompt(prompt_inputs, prompt_template_file)
    response = safe_prompting(prompt, params, lambda x:x)

    if PRINT_PROMPTS:
        print_prompt("generate_action_sector", persona, prompt, response, params)

    return response.split("}")[0]

def generate_action_sector_arena(action, persona, maze, curr_determined_address):
    """Returns the appropriate arena within the sector to carry out that action."""
    prompt_template_file = str(TEMPLATE_FOLDER / "determine_action_arena.txt")
    new_sector = curr_determined_address.split(":")[1]
    new_possible_arenas = persona.s_mem.get_str_accessible_sector_arenas(curr_determined_address)
    prompt_inputs = [
        persona.scratch.name,
        new_sector,
        new_possible_arenas,
        action     
    ]
    params = GPT_PARAMS.copy()
    params['max_tokens'] = 20
    params['temperature'] = 0
    params['top_p'] = 1
    prompt  = generate_prompt(prompt_inputs, prompt_template_file)
    response = safe_prompting(prompt, params, lambda x:x)
    
    if PRINT_PROMPTS:
        print_prompt("generate_action_sector", persona, prompt, response, params)

    return response.split("}")[0]


def generate_action_sector_arena_object(action, persona, maze, curr_determined_address):
    """Returns the appropriate object within the arena to carry out that action."""
    prompt_template_file = str(TEMPLATE_FOLDER / "determine_action_object.txt")
    possible_objects = persona.s_mem.get_str_accessible_arena_game_objects(curr_determined_address)
    prompt_inputs = [
        f"{action}",
        possible_objects    
    ]
    params = GPT_PARAMS.copy()
    params['max_tokens'] = 20
    params['temperature'] = 0.1
    params['top_p'] = 1
    params['stop'] = ["\n"]
    prompt  = generate_prompt(prompt_inputs, prompt_template_file)
    response = safe_prompting(prompt, params, lambda x:x)

    if PRINT_PROMPTS:
        print_prompt("generate_action_sector_arena_object", persona, prompt, response, params)

    # Fail safe mechanism
    if response.strip() not in possible_objects.split(","):
        object = random.sample(possible_objects.split(","), 1)[0].strip()
        print(f"\nUsing fail safe @ generate_action_sector_arena_object @ {persona.name} @ {curr_determined_address} @ {action} --> {object}")
        return object

    return response.strip()

def generate_action_event_triple(action, persona):
    """Returns the (subject, predicate, object) decomposition corresponding to the action."""
    prompt_template_file = str(TEMPLATE_FOLDER / "generate_event_triplet.txt")
    prompt_inputs = [
        persona.scratch.name,
        action.lower()
    ]  
    params = GPT_PARAMS.copy()
    params['max_tokens'] = 50
    params['temperature'] = 0
    params['top_p'] = 1
    prompt  = generate_prompt(prompt_inputs, prompt_template_file)
    response = safe_prompting(prompt, params, lambda x:x)

    if PRINT_PROMPTS:
        print_prompt("generate_action_event_triple", persona, prompt, response, params)

    full_str = prompt + response
    output = full_str.split("---")[-1].split("Output:")[-1].strip()[1:]

    output = [i.strip() for i in output.split(")")[0].split(",")]
    if len(output) != 3:
        output = [persona.scratch.name, 'is', output[-1]]
        print(f"\nUsing fail safe @ generate_action_event_triple @ {persona.name} @ {action} --> {output}")
    
    return output


def get_new_currently(persona):
    """Reflects on the day's activity and returns a new `currently` for persona to take on. """
    name = persona.scratch.name 
    curr_day = persona.scratch.curr_time.strftime("%A %B %d")
    queries = [
        f"{name}'s plan for {curr_day}",
        f"Important recent events for {name}'s life."
    ]
    retrieved = extract_relevant_nodes(persona, queries, count=30)

    # Add statements about the retrieved nodes
    statements = "[Statements]\n"
    for query, nodes in retrieved.items():
        for node in nodes:
            statements += f"{node.created.strftime('%A %B %d -- %H:%M %p')}: {node.embedding_key}\n"

    # Create a broad agenda for the next day
    planning_prompt = f"""{statements}
    Given the statements above, is there anything that {name} should remember as they plan for *{curr_day}*?
    If there is any scheduling information, be as specific as possible (including date, time, and location if stated in the statement).\n
    Write the response from {name}'s perspective.
    """
    params = GPT_PARAMS.copy()
    params['model'] = "gpt-3.5-turbo"
    params['max_tokens'] = 1000
    params['temperature'] = 0.8
    plan_note = safe_prompting(planning_prompt, params, lambda x:x)

    if PRINT_PROMPTS:
        print_prompt("get_new_currently --> plan_note", persona, planning_prompt, plan_note, params)

    thought_prompt = f"""{statements}
    Given the statements above, how might we summarize {name}'s feelings about their days up to now?\n
    Write the response from {name}'s perspective.
    """
    thought_note = safe_prompting(thought_prompt, params, lambda x:x)

    if PRINT_PROMPTS:
        print_prompt("get_new_currently --> thought_note", persona, thought_prompt, thought_note, params)

    prev_currently = persona.scratch.currently
    prev_day = persona.scratch.curr_time - timedelta(days=1)
    prev_day = prev_day.strftime('%A %B %d')
    update_currently_prompt = f"""
    {name}'s status from {prev_day}: {prev_currently}\n\n
    {name}'s thoughts at the end of {prev_day}: {plan_note} {thought_note}\n\n
    It is now {curr_day}. Given the above, write {name}'s status for {curr_day} that reflects {name}'s thoughts at the end of {curr_day}.
    Write this in third-person talking about {name}.
    If there is any scheduling information, be as specific as possible (include date, time, and location if stated in the statement).\n\n
    Follow this format below:\nStatus: <new_status>
    """
    new_currently = safe_prompting(update_currently_prompt, params, lambda x:x)

    if PRINT_PROMPTS:
        print_prompt("get_new_currently --> new_currently", persona, update_currently_prompt, new_currently, params)

    return new_currently

def extract_relevant_nodes(persona, queries, count=30):
    """Retrieves nodes from agent's memory relevant to all `query` in `queries`."""
    nodes = []
    for node in persona.a_mem.seq_thought + persona.a_mem.seq_event: ## ADD seq_chat here
        if "idle" not in node.embedding_key:
            nodes.append([node.last_accessed, node])
    nodes = sorted(nodes, key=lambda x:x[0])
    nodes = [node for _, node in nodes]

    persona_receny_w = persona.scratch.recency_decay
    recency = _normalize([persona_receny_w**i for i in range(1, len(nodes) + 1)])
    importance = _normalize([node.poignancy for node in nodes])
    
    retrieved = dict()
    v1, v2, v3 = persona.scratch.recency_w, persona.scratch.relevance_w, persona.scratch.importance_w
    w1, w2, w3 = 0.5, 3, 2 ## HARD CODED WEIGHTS 
    for query in queries:
        query_embedding = get_embedding(query)
        node_relevance = _normalize([cos_sim(node.embedding, query_embedding) for node in nodes])
        node_relevance = [x*v1*w1 + y*v2*w2 + z*v3*w3 for x,y,z in zip(recency, importance, node_relevance)]
        top_nodes = sorted([(val, idx) for idx, val in enumerate(node_relevance)], key=lambda x:x[0])[-count:]
        for _, idx in top_nodes:
            nodes[idx].last_accessed = persona.scratch.curr_time
        retrieved[query] = nodes

    return retrieved

## Simulation loop

In [12]:
open(SIM_LOGFILE, 'w').close()
open(LOGFILE, 'w').close() # Clear its contents

maze = Maze("the Ville")
curr_time = sim_start_time = datetime(2024, 2, 13, 0, 0, 0) # Start at midnight
seconds_per_step = 10 * 60 # 10 minutes
n_steps = 180

personas = []
for persona_folder in PERSONAS_FOLDER.iterdir():
    personas.append(Persona(persona_folder.name, persona_folder, curr_time, initiate_plan=True))

step = 0
personas = personas[:1] # we will restrict to one agent because interactions doesn't matter in this notebook
movements = {}
while step < n_steps:
    
    # update and execute activities
    for persona in personas:
        curr_tile = persona.get_curr_tile()
        new_tile = persona.advance_one_step(maze, personas, curr_time)
        movements[persona.name] = new_tile

    # update location
    for persona in personas:
        new_tile = movements[persona.name]
        if new_tile:            
            maze.remove_subject_events_from_tile(persona.name, curr_tile)
            maze.add_event_from_tile(persona.scratch.get_curr_event_and_desc(), new_tile)
            persona.move(new_tile)
        
        tile_path = maze.get_tile_path(persona.scratch.curr_tile, level='object')
        string = f"{curr_time.strftime('%H:%M')} {persona.name} {persona.scratch.curr_tile} {persona.scratch.act_description} {persona.scratch.act_address} {tile_path}"
        print_to_file(string, SIM_LOGFILE)
    
    step += 1
    curr_time = sim_start_time + timedelta(seconds=seconds_per_step*step)
    print(curr_time.strftime("%H:%M"),)

00:10
00:20
00:30
00:40
00:50
01:00
01:10
01:20
01:30
01:40
01:50
02:00
02:10
02:20
02:30
02:40
02:50
03:00
03:10
03:20
03:30
03:40
03:50
04:00
04:10
04:20
04:30
04:40
04:50
05:00
05:10
05:20
05:30
05:40
05:50
06:00
06:10
06:20
06:30
06:40
06:50
07:00
07:10
revised schedule --- Klaus Mueller
 [('sleeping', 420), ['turning off his alarm.', 1], ['stretching and getting out of bed.', 5], ['washing his face and brushing his teeth.', 5], ['getting dressed.', 5], ['checking his email and social media.', 10], ['making his bed.', 3], ['gathering his school materials for the day.', 10], ['packing his backpack.', 5], ['making breakfast.', 10], ['eating breakfast.', 10], ('having breakfast at Hobbs Cafe', 60), ('heading to the library at Oak Hill College', 60), ('writing his research paper on gentrification', 120), ('taking a short break for lunch', 60), ('continuing to write his research paper on gentrification', 60), ('writing his research paper on gentrification', 180), ('taking a break for di

In the next notebook, we will explore the implementation of interactions between different agents, delving into the mechanisms that enable these agents to communicate and affect each other within the simulation.