In [1]:
from generativeAgent import GAGroup
import time

In [None]:
"""
# * Preparing Agents
change the city as you wish
"""
ag = GAGroup("./config.yaml", "GA", "san_francisco")

In [None]:
"""
# agents initialization
arg_1: number of agents you want to simulate
arg_2: Day
arg_3: (Optional) if you have already sampled profile for agents, you can use this to accelerate the initialization, other wise, it will sample profiles according to the city
"""
city = "san_francisco"
await ag.prepare_agents(1000, "Sunday", f"prototype/{city}/agents_meta.json")

In [4]:
import json
city = 'san_francisco'  # change city's name when you use
# load agent profile from pre-stored files
with open(f'prototype/{city}/protos.json', 'r') as f:
    group_des = json.load(f)
groups = {}
for key, value in group_des.items():
    groups[key] = []
for i in range(len(ag.agents)):
    groups[ag.agent_groups[i]].append(i)

# get the actual agent group
agent_groups = {}
for proto, agent_ids in groups.items():
    agent_groups[proto] = []
    for id in agent_ids:
        agent_groups[proto].append(ag.agents[id])

In [5]:
def sample_from_groups(groups, noa):
    """if you want some of the agents to start simulation, you can use this sample function"""
    nog = len(list(groups.keys()))
    key_name = list(groups.keys())
    noa_g = []
    for key in key_name:
        noa_g.append(len(groups[key]))
    total = sum(noa_g)
    propotion = []
    for i in range(nog):
        propotion.append(noa_g[i]/total)
    propotion_n = [int(noa*propotion[i]) for i in range(len(propotion))]
    if sum(propotion_n) < noa:
        left = noa-sum(propotion_n)
        index = 0
        for i in range(left):
            propotion_n[index] += 1
            index = (index+1)%len(propotion_n)
    new_groups = {}
    index = 0
    for key in key_name:
        new_groups[key] = groups[key][:propotion_n[index]]
        index += 1
    return new_groups

In [7]:
# * OpenCity

In [5]:
import os
import re
from datetime import datetime, timedelta
import random

"""
CoT for Distill meta-prompt generation
1. Conclusion and Summarization
2. Context Share Cue
3.1 With variable share cue
3.2 With only context share cue
"""
def distill_meta_prompt_generate(llm, func_des, var_des, var_share, o_prompt) -> str:
    len_o = len(o_prompt)
    # step 1: Summary
    messages = [{
        'role': 'user',
        'content': f"""
Part One: {func_des};
Part Two:{var_des};
Part Three:{o_prompt};

The text above including 3 parts:
Part One is the target of the original prompt; Part Two is the variable description in the original prompt; Part Three is the original prompt which is used to send to the request an answer after insert specific variable values. 
Can you sum up the the upper text? Include a summary and key points."""
    }]
    summary = llm.text_request(messages)
    messages.append({'role': 'assistant', 'content': summary})
    
    # step 2: Context Share Cue
    messages.append({
        'role': 'user',
        'content': "Based on your analysis, if I have multiple requests use the same prompt shown abouv, what context information can be reused?"})
    can_share_context = llm.text_request(messages)
    messages.append({'role': 'assistant', 'content': can_share_context})

    if len(var_share) > 0:
        # step 3: Variable Share Cue and Rebuild
        messages.append({
            'role': 'user',
            'content': f"""
Beside context, these variables {var_share} are identical across multiple requests.
Can you rewrite the prompt with three sections?
The first section is '## Shared Background', including shareable context and identical variables.
The second section is '## Requriements', including the functionality and output format
The third section is '## Situations', only remains a '!<INPUT-X>!' here.

## Requirement
1. For those variables that are identical, place it in the first section, replace other variables to a '!<INPUT-X>!' in third section, in which 'X' is 1 plus number of identical variables
1. Your output only contains the rewrite version of Part Three.
2. Remove as much honorary, repetitive information as possible.

## Output Format
You output is a string.

Your output:
"""})
        rewrite = llm.text_request(messages)
    else:
        # step 4: Rebuild prompt
        messages.append({
            'role': 'user',
            'content': f"""
Can you rewrite the prompt with two sections?
The first section is Shared Background, including shareable context and identical variables.
The second part is Requests, including variables that can not be shared across multiple requests.

## Requirement
1. Your output only contains the rewrite version of Part Three.
2. Remove as much honorary, repetitive information as possible.

## Output Format
You output is a string.

Your output:
"""})
        rewrite = llm.text_request(messages)
    return rewrite


class BatchRunner:
    def __init__(self, agents, prompt_dir, batch_dir:bool=False, max_size:int=None):
        """
        agents: agents for the group
        group_des: description of the agents for the group, profile unified description information
        prompt_dir: path to the original prompt store used to generate the batch_prompt.
        batch_dir: default None, if this is not None, the batch prompt will not be regenerated, but will reuse the prompt in this folder.
        max_size: default None, indicates the size of the batch, if None means infinite.
        """
        self.hub = False
        self.prompts_raw = {}
        self.prompt_vars = {}
        self.prompt_des = {}
        self.prompt_var_share = {}
        self.share_pool = {}
        self.prompts_batch = {}
        self.num_group = 1
        self.root_dir = prompt_dir
        self.agents = agents
        if self.agents != None and len(self.agents) > 0:
            self.llm = self.agents[0].Soul
        else:
            self.llm = None
        for item in os.listdir(prompt_dir):
            file_path = os.path.join(prompt_dir, item)
            file_name = item
            if os.path.isfile(file_path):
                ext = os.path.splitext(file_name)[1].lower()
                if ext == '.json':
                    with open(file_path, 'r') as file:
                        share_config = json.load(file)
                    for var in share_config['share']:
                        self.share_pool[var] = None
                else:
                    f = open(file_path, "r")
                    name = file_name.split('.')[0]
                    prompt_des, prompt_var, prompt_raw = f.read().split("<commentblockmarker>###</commentblockmarker>")
                    self.prompts_raw[name] = prompt_raw
                    self.prompt_vars[name] = prompt_var
                    self.prompt_des[name] = prompt_des
                    f.close()
                    
        for key, value in self.prompt_vars.items():
            self.prompt_var_share[key] = []
            var_used = re.findall(r'--\s*(\w+)', value)
            for var in var_used:
                if var in self.share_pool.keys():
                    self.prompt_var_share[key].append(var)
                    
        if max_size != None:
            self.num_group = int(len(agents)/max_size)
            if len(agents)%max_size != 0:
                self.num_group += 1

        if batch_dir:
            # Load prebuild batch prompts
            for dirpath, dirnames, filenames in os.walk(prompt_dir):
                if "batch_prompt" in dirnames:
                    target_folder_path = os.path.join(prompt_dir, "batch_prompt")
                    for file in os.listdir(target_folder_path):
                        name = file.split('.')[0]
                        file_path = os.path.join(target_folder_path, file)
                        with open(file_path, 'r') as f:
                            self.prompts_batch[name] = json.load(f)
        else:
            print("""You have not set [batch_dir], default [root_dir/batch_prompt]:
            1. You can run [generate_batch_prompt] to generate batch prompt for Batch group or 
            2. You can run [load_batch_prompt] to load exist batch prompt.""")

    def generate_batch_prompt(self):
        """
        Generate batch prompt using LLM-based method.
        """
        print("Start Generation Batch Prompt...")
        for key, o_prompt in self.prompts_raw.items():
            func_des = br.prompt_des[key]
            var_des = br.prompt_vars[key]
            var_share = br.prompt_var_share[key]
            self.prompts_batch[key] = distill_meta_prompt_generate(self.llm, func_des, var_des, var_share, o_prompt)
        print("Generation Success.")

    def load_batch_prompt(self, prompt_dir):
        """
        Load exsit batch prompt
        """
        for dirpath, dirnames, filenames in os.walk(prompt_dir):
            if "batch_prompt" in dirnames:
                target_folder_path = os.path.join(prompt_dir, "batch_prompt")
                for file in os.listdir(target_folder_path):
                    name = file.split('.')[0]
                    file_path = os.path.join(target_folder_path, file)
                    with open(file_path, 'r') as f:
                        self.prompts_batch[name] = json.load(f)
        print("Load Batch Prompt Success.")

    def set_llm(self, llm_agent):
        """
        Default, BatchRunner using the same llm agent as agents do, you can set a distinguish llm agent
        The llm_agent has to support:
        1. [.text_request(messages:dict)->string] for sync llm text request
        2. [.atext_request(messages:dict)->string] for async llm text request
        You can use [UrbanLLM] agent from pycityagent repo
        """
        self.llm = llm_agent

    def set_share_pool(self, target, value):
        """
        Set value for share pool
        """
        if target not in self.share_pool.keys():
            print(f"Error: no share entity nameed {target}")
        self.share_pool[target] = value

    def save_batch_prompt(self, folder:str=None) -> None:
        """
        save batch prompt into json files
        default in [self.root_dir/batch_prompt]
        if signed [folder], then save in [folder]
        """
        if len(list(self.prompts_batch.keys())) == 0:
            print("Nothing to save")
            return
        else:
            if folder == None:
                save_folder = os.path.join(self.root_dir, 'batch_prompt')
            else:
                save_folder = folder
            if not os.path.exists(save_folder):
                print(f"Creating subfolder [batch_prompt] in {self.root_dir}")
                os.makedirs(save_folder)
            for key, value in self.prompts_batch.items():
                with open(f"{save_folder}/{key}.json", "w") as file:
                    json.dump(value, file, indent=4)

    def to_batch_prompt_str(self, values:list, names:list):
        result_str = """"""
        for i in range(len(self.agents)):
            single_situation = """"""
            single_situation += f"### Situation_{i+1}:\n"
            for j in range(len(names)):
                single_situation += f"{names[j]}: {values[j][i]}\n"
            single_situation += "\n"
            result_str += single_situation
        return result_str

    def gather_vars(self, target):
        """
        gather variable values from agents, return a list group, for example:
        """
        results_list = []
        for agent in self.agents:
            single = agent.Brain.Memory.Working.Reason[target]
            results_list.append(single)
        return results_list

    def get_prompt(self, target, values:list):
        """
        set values to target prompt and return the target prompt
        """
        prompt = self.prompts_batch[target]
        curr_input = [str(value) for value in values]
        for i, value in enumerate(curr_input):  
            prompt = prompt.replace(f"!<INPUT {i}>!", value)
        return prompt

    async def perceive(self):
        for agent in self.agents:
            pois = agent._simulator.map.query_pois(
                center = (agent.motion['position']['xy_position']['x'], agent.motion['position']['xy_position']['y']), 
                radius = 1000,  
                category_prefix= "", 
                limit = 20      # 限制了perceive的POI数量，防止过多
            )
            for poi, _ in pois:
                agent.Brain.Memory.Working.Reason['spatial'].add_memory_from_poi(poi, "You can see it from here.", "scene")
            agent.Brain.Memory.Working.Reason["perceived"] = agent.Brain.Memory.Working.Reason['spatial'].to_str("all")

    async def generate_wake_up_hour(self):
        # All share
        commonset = self.share_pool['Commonset']
        lifestyle = self.share_pool['Lifestyle']
        day = self.share_pool['Day']
        bprompt = self.get_prompt('wake_up_hour', [commonset, lifestyle, day])
        sysprompt = "Just answer a number x that refers to a hour of in 24-hour, no reasons needed."
        messages = [
            {"role": "system", "content": sysprompt},
            {"role": "user", "content": bprompt}
        ]
        hours = await self.llm.atext_request(messages)
        if '0' == hours[0]:
            hours = hours[1:]
        return int(eval(hours))

    async def generate_daily_plan(self):
        # All share
        commonset = self.share_pool['Commonset']
        lifestyle = self.share_pool['Lifestyle']
        day = self.share_pool['Day']
        wake_up_hour = self.share_pool['Wake_Up_Hour']
        bprompt = self.get_prompt('daily_planning', [commonset, lifestyle, day, wake_up_hour])
        messages = [{"role":"user", "content": bprompt}]
        daily_plan = await self.llm.atext_request(messages)
        return daily_plan

    async def generate_action_from_plan(self):
        def __checkPlace(loc_str, nxt):
            loc_list = loc_str.split('\n')
            for loc in loc_list:
                if nxt in loc:
                    try:
                        p = json.loads(loc[loc.find(':')+1:].replace('\'', '\"'))
                    except:
                        p = None
                    return p
            return None

        def sampleNoiseTime():
            noise = random.randint(-10, 10)
            return noise


        def add_time(start_time, minutes):
            """
            Calculate the time after the end, giving the start time and the incremental time (in minutes)
            :param start_time: start time in the format '%H:%M'
            :param minutes: incremental time in minutes
            :return: the time after the end, in the format '%H:%M'; a flag for whether the day has been spanned
            """
            start_datetime = datetime.strptime('2000-01-01 ' + start_time, '%Y-%m-%d %H:%M')
            end_datetime = start_datetime + timedelta(minutes=minutes)
            if end_datetime.day != start_datetime.day:
                cross_day = True
            else:
                cross_day = False
            end_time = end_datetime.strftime('%H:%M')
            return end_time, cross_day

        def getTimeFromZone(timeZone0):
            time0, time1 = timeZone0.split('-')
            time0 = float(time0)/2  # 这里已经化成小时了
            time1 = float(time1)/2
            sampleResult = random.uniform(time0, time1)  # 采样一个具体的时间值出来,单位是小时
            minutes = int(sampleResult*60)
            return minutes
        
        def sampleGapTime():
            minutes = getTimeFromZone('0-2')  # 将事件的间隔时间设置为0到1个小时
            return minutes

        commonset = self.share_pool['Commonset']
        daily_plan = self.share_pool['Daily_Plan']
        current_time = self.share_pool['Current_Time']
        # individual: memory, perceived
        memory_list = self.gather_vars('memory')
        perceived_list = self.gather_vars('perceived')
        batch_prompt_str = self.to_batch_prompt_str([memory_list, perceived_list], ["Memory", "Surrounding Places"])
        bprompt = self.get_prompt('action_planning', [commonset, daily_plan, current_time, batch_prompt_str])
        messages = [{"role":"user", "content": bprompt}]
        arrangements = []
        nxt_locs = []
        nxt_pois = []
        hours = []
        minutes = []
        try:
            response = await self.llm.atext_request(messages)
            if "```json" in response:
                response = json.loads(response.split('```json')[1].split('```')[0])
            else:
                response = json.loads(response)
            if len(self.agents) != len(response):
                raise Exception("Error number of response")
        
            index = 0
            for resp in response:
                arrangement = resp['arrangement']
                nxt_loc = resp['next_place']
                nxt_poi = __checkPlace(perceived_list[index], nxt_loc)
                hour = int(resp['hours'])
                minute = int(resp['minutes'])
                if nxt_poi is None: 
                    scence_length = len(self.agents[index].Brain.Memory.Working.Reason['spatial'].scene)
                    cid = random.randint(0, scence_length-1)
                    node_id = self.agents[index].Brain.Memory.Working.Reason['spatial'].scene[cid]
                    nxt_poi = self.agents[index].Brain.Memory.Working.Reason['spatial'].spatial_dict[node_id]
                arrangements.append(arrangement)
                nxt_locs.append(nxt_loc)
                nxt_pois.append(nxt_poi)
                hours.append(hour)
                minutes.append(minute)
                index += 1
        except Exception as err:
            arrangements = []
            nxt_locs = []
            nxt_pois = []
            hours = []
            minutes = []
            for i in range(len(self.agents)):
                arrangements.append("go to this place")
                scence_length = len(self.agents[i].Brain.Memory.Working.Reason['spatial'].scene)
                cid = random.randint(0, scence_length-1)
                node_id = self.agents[i].Brain.Memory.Working.Reason['spatial'].scene[cid]
                nxt_poi = self.agents[i].Brain.Memory.Working.Reason['spatial'].spatial_dict[node_id]
                nxt_loc = nxt_poi['name']
                hours.append(random.randint(1, 5))
                minutes.append(random.randint(0, 60))
                nxt_locs.append(nxt_loc)
                nxt_pois.append(nxt_poi)

        end_times = []
        cross_days = []
        for i in range(len(hours)):
            increment_minutes = 60*hours[i]+minutes[i]
            noiseTime = sampleNoiseTime()
            if increment_minutes + noiseTime > 0:
                increment_minutes = increment_minutes + noiseTime
            end_time, cross_day = add_time(current_time, increment_minutes)
            end_times.append(end_time)
            cross_days.append(cross_day)
        for i in range(len(self.agents)):
            nxt_poi = nxt_pois[i]
            current_intention = arrangements[i]
            end_time = end_times[i]
            cross_day = cross_days[i]
            self.agents[i].Brain.Memory.Working.Reason['spatial'].add_memory_from_node(nxt_poi, f"You went here for {current_intention} from {current_time} to {end_time}.")
            if cross_day or end_time == "23:59":
                seTime = "("+ current_time +", 23:59)"
                self.agents[i].Brain.Memory.Working.Reason['history'].append([current_intention, seTime])
            else:
                seTime = "("+ current_time+", "+end_time+")"
                self.agents[i].Brain.Memory.Working.Reason['history'].append([current_intention, seTime])
            self.agents[i].Brain.Memory.Working.Reason['intention'] = current_intention
            count = len(self.agents[i].Brain.Memory.Working.Reason['memory'])
            self.agents[i].Brain.Memory.Working.Reason['memory'].append(f"({count}). {current_time}: you decide to {current_intention};")
            nextPlace = (nxt_poi['name'], nxt_poi['id'])
            intent, setime = self.agents[i].Brain.Memory.Working.Reason['history'][-1]
            thisThing = [intent, setime, nextPlace]
            self.agents[i].Brain.Memory.Working.Reason['trajectory'].append(thisThing)
            self.agents[i].Brain.Memory.Working.Reason['nowPlace'] = nextPlace
            count = len(self.agents[i].Brain.Memory.Working.Reason['memory'])
            self.agents[i].Brain.Memory.Working.Reason['memory'].append(f"({count}). you go to {nextPlace[0]};")

        if cross_days[0] or end_times[0] == "23:59":
            self.share_pool['Break'] = True
        else:
            gapTime = sampleGapTime()
            tmpnowTime, cross_day = add_time(end_times[0], gapTime)  
            if cross_day:
                self.share_pool['Current_Time'] = end_times[0]
            else:
                self.share_pool['Current_Time'] = tmpnowTime
            self.share_pool['Break'] = False
        return None

    async def memory_reflection(self):
        if len(self.agents[0].Brain.Memory.Working.Reason['memory']) < 10:
            return
        memory_list = self.gather_vars('memory')
        batch_prompt_str = self.to_batch_prompt_str([memory_list], ["Memory"])
        bprompt = self.get_prompt('memory_reflection', [batch_prompt_str])
        messages = [{"role":"user", "content": bprompt}]
        response = await self.llm.atext_request(messages)
        try:
            response = json.loads(response.split('```json')[1].split('```')[0])
            index = 0
            for resp in response:
                self.agents[index].Brain.Memory.Working.Reason['memory'] = [f"(0). {resp}"] + [f"({i+1}). {m[m.find('.'):]}" for i, m in enumerate(self.agents[index].Brain.Memory.Working.Reason['memory'][10:])]
        except Exception as err:
            pass
    
    async def run(self, log:bool=False):
        """main entrance"""
        for agent in self.agents:
            agent.Brain.Memory.Working.Reason['spatial'].clear_scene()
        hour = await self.generate_wake_up_hour()
        self.set_share_pool('Wake_Up_Hour', hour)
        if int(hour) >= 10:
            self.set_share_pool('Current_Time', f"{hour}:00")
        else:
            self.set_share_pool('Current_Time', f"0{hour}:00")
        intent = "Sleep at home"
        set_time = "("+"00:00"+", "+self.share_pool['Current_Time']+")"
        for agent in self.agents:
            thisThing = [intent, set_time, agent.Brain.Memory.Working.Reason['Homeplace']]
            agent.Brain.Memory.Working.Reason['trajectory'].append(thisThing)
            agent.Brain.Memory.Working.Reason['history'].append([intent, set_time])
            count = len(agent.Brain.Memory.Working.Reason['memory'])
            agent.Brain.Memory.Working.Reason['memory'].append(f"({count}). 00:00: you decide to sleep at {agent.Brain.Memory.Working.Reason['Homeplace'][0]};")
        
        daily_plan = await self.generate_daily_plan()
        self.set_share_pool('Daily_Plan', daily_plan)
        for agent in self.agents:
            count = len(agent.Brain.Memory.Working.Reason['memory'])
            agent.Brain.Memory.Working.Reason['memory'].append(f"({count}). {self.share_pool['Current_Time']}: you make the plan: {self.share_pool['Daily_Plan']};")
        while True:
            for agent in self.agents:
                agent.Brain.Memory.Working.Reason['spatial'].clear_scene()
                
            # perceive
            await self.perceive()
            # plan
            await self.generate_action_from_plan()
            # reflection
            await self.memory_reflection()
            
            for agent in self.agents:
                await agent.set_position_poi(agent.Brain.Memory.Working.Reason['nowPlace'][1], hub=self.hub)
            if self.share_pool['Break']:
                break
        if log:
            index = 0
            for agent in self.agents:
                print(f"Agent{index}: {agent.Brain.Memory.Working.Reason['trajectory']}")
                index += 1

In [6]:
# create batch runner
brs = []
max_batch_size = 10
for key, agents in agent_groups.items():
    for agent in agents:
        agent.Brain.Memory.Working.Reason['memory'] = []
        agent.Brain.Memory.Working.Reason['trajectory'] = []
    group_num = int(len(agents)/max_batch_size)
    if len(agents)%max_batch_size != 0:
        group_num += 1
    for i in range(group_num):
        if i == group_num-1:
            br = BatchRunner(agents[i*max_batch_size:], "./prompt_template/generative_agent", batch_dir=True)
        else:
            br = BatchRunner(agents[i*max_batch_size:(i+1)*max_batch_size], "./prompt_template/generative_agent", batch_dir=True)
        br.set_share_pool('Commonset', group_des[key])
        br.set_share_pool('Lifestyle', "It depends on weekday or weekend. When weekday people generally go to work and do some other things between work. It is important to note that people generally do not work on weekends and prefer entertainment, sports and leisure activities. There will also be more freedom in the allocation of time.")
        br.set_share_pool('Day', 'Sunday')
        br.set_share_pool('Current_Time', '00:00')
        br.set_share_pool('Break', False)
        brs.append(br)

In [None]:
import asyncio

print("Start simulation...")
start = time.time()
tasks = [brs[i].run() for i in range(len(brs))]
await asyncio.gather(*tasks)
end = time.time()
print("Finished simulation...")
print(f"Time: {end-start:.2f} s")

In [8]:
import json
import os

"""For storage output"""
def get_profile(agent):
    gender = agent.Brain.Memory.Working.Reason['genderDescription'].split(':')[1].split(';')[0].strip()
    educationDescription = agent.Brain.Memory.Working.Reason['educationDescription'].split(':')[1].split(';')[0].strip()
    consumptionDescription = agent.Brain.Memory.Working.Reason['consumptionDescription'].split(':')[1].split(';')[0].strip()
    occupationDescription = agent.Brain.Memory.Working.Reason['occupationDescription'].split(':')[1].split(';')[0].strip()
    return [gender, educationDescription, consumptionDescription, occupationDescription]

def get_output2folder(ag, output_folder, start_index:int=0):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    index = start_index
    for agent in ag.agents:
        # traj
        user_id = ag.ids[index]
        path = os.path.join(output_folder, f"{user_id}_traj.json")
        with open(path, 'w') as json_file:
            json.dump(agent.Brain.Memory.Working.Reason['trajectory'], json_file, indent=4, ensure_ascii=False)

        # profile
        profile = get_profile(agent)
        path = os.path.join(output_folder, f"{user_id}_profile.json")
        with open(path, 'w') as json_file:
            json.dump(profile, json_file, indent=4, ensure_ascii=False)
        index += 1

In [9]:
city = 'san_francisco'
get_output2folder(ag, f'output/{city}/even_output_{len(ag.agents)}/')