## Single Act LLM Baseline


Single agent with a single module with Act Prompting with just LLMs (no vision aspect)

In [None]:
import re
import ast
import base64
import requests
import json, os
import pandas as pd
from pprint import pprint
import sys
from pathlib import Path

# set parent directory to address relative imports
directory = Path(os.getcwd()).absolute()
sys.path.append(str(directory.parent.parent.parent))

# import environment
from AI2Thor.env_new import AI2ThorEnv
from AI2Thor.base_env import convert_dict_to_string
from AI2Thor.object_actions import get_closest_feasible_action, get_closest_object_id
from AI2Thor.baselines.utils import Logger, AutoConfig

os.environ["TOKENIZERS_PARALLELISM"] = "true"

# save a json file with your openai api key in your
# home folder as {"my_openai_api_key": "INSERT API HERE"}
with open(os.path.expanduser("~") + "/openai_key.json") as json_file:
    key = json.load(json_file)
    api_key = key["my_openai_api_key"]
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}

In [None]:
# initialize environment parameters
class Config:
    def __init__(self):
        self.num_agents = 2
        self.scene = "FloorPlan1"
        self.scene_name = "FloorPlan1"
        self.model = "gpt-4"
        self.horizon = 30
        self.use_langchain = False
        self.use_strict_format = True
        self.use_obs_summariser = False
        self.use_act_summariser = False
        self.use_action_failure = True
        self.use_shared_subtask = True
        self.use_separate_subtask = False
        self.use_future_message = True
        self.forceAction = False
        self.use_memory = True
        self.use_plan = True
        self.use_separate_memory = False
        self.use_shared_memory = True
        self.temperature = 0.7

config = Config()
env = AI2ThorEnv(config)
env.controller.stop()

In [None]:
# Act PROMPT
agent_name = ["Alice", "Bob"]
PROMPT = f"""You are an excellent planner who is tasked with helping 2 embodied robots named {agent_name[0]} and {agent_name[1]} carry out a task. Both robots have a partially observable view of the environment. Hence they have to explore around in the environment to do the task.
They can perform the following actions: ["navigate to object <object_id>", "rotate in <rotation> direction", "pick up object <object_id>", "put object on <receptacle_id>", "open object <object_id>", "close object <object_id>", "slice object <object_id>", “toggle object <object_id> on”, “toggle object <object_id> off”, "clean object <object_id>", "look up by angle <angle>", "look down by angle <angle>", “move in <translation> direction", "stay idle", "Done"]
Here "Done" is used when all the robots have completed the main task. Only use it when you think all the subtasks are complete.
"stay idle" is used when you want the robot to stay idle for one time step. This could be used to wait for the other robot to complete its subtask. Use it only when you think it is necessary.
Here <rotation> can be one of ["Right", "Left"].
Here <angle> is the angle in degrees and can only be one of [30, 60, 90, 120, 150, 180].
Here <translation> can be one of ["Ahead", "Back", "Left", "Right”].

You need to suggest the action that each robot should take at the current time step.

### Important Notes ###
* The robots can hold only one object at a time.
For example: If {agent_name} is holding an apple, it cannot pick up another object until it puts the apple down.
* Even if the robot can see objects, it might not be able to interact with them if they are too far away. Hence you will need to make the robot navigate closer to the objects they want to interact with.
For example: An action like "pick up <object_id>" is feasible only if robot can see the object and is close enough to it. So you will have to navigate closer to it before you can pick it up.
In some scenarios, the agents might not see the objects that they want to interact with. In such cases, you will have to make the robot explore the environment to find the object.
In such scenarios you can use actions to rotate in place or look up / down or navigate to explore the environment.
* If you open an object, please ensure that you close it before you navigate to a different place.
* Opening object like drawers, cabinets, fridge can block the path of the robot. So open objects only when you think it is necessary.

### INPUT FORMAT ###
* You will get a description of the task robots are supposed to do.
* You will get an image of the environment at the current time step from {agent_name[0]}'s perspective and {agent_name[1]}'s perspective as the observation input. To help you with detecting objects in the image, you will also get a list objects each agent is able to see in the environment. Here the objects are named as "<object_name>_<object_id>". 
* You will get a trace of the steps taken by the robots and the actions they took at each time step and whether it was successful or not.

### OUTPUT FORMAT ###
In your output, do not have any extra text or content outside of the python dictionary as below. Do NOT put any text, spaces, or enter keys (i.e. "/n") outside of it.
Your output should ONLY be in the form of a python dictionary, without any reasoning or extra text, as shown below:
{{"{agent_name[0]}": "action to be taken by {agent_name[0]}", "{agent_name[1]}": "action to be taken by {agent_name[1]}"}}

For example: If you think {agent_name[0]} should pick up an apple and {agent_name[1]} should navigate to the fridge, you will have to give the output as:
{{"{agent_name[0]}": "pick up apple", "{agent_name[1]}": "navigate to fridge"}}

* NOTE: DO NOT OUTPUT ANYTHING EXTRA OTHER THAN WHAT HAS BEEN SPECIFIED
"""


In [None]:
def get_action_llm_input(env):
    """
    Returns the input to the subtask LLM
    ### INPUT FORMAT ###
    {{Task: description of the task the robots are supposed to do,
    {agent_name[i]}'s observation: list of objects the {agent_name[0]} is observing}}
    """
    # extract the agent_name's observations based on how many agents there are
    llm_input_feats = []
    for i in range(env.num_agents):
        agent_name = env.agent_names[i]
        llm_input_feats.extend([agent_name + "'s observation", ])
    return dict((k, env.input_dict[k]) for k in llm_input_feats)

In [None]:
def convert_obs_list_str(string_list):
    my_list = ast.literal_eval(string_list)
     #my_list = ['Stove_1', 'Drawer_1', 'Cabinet_1', 'Cabinet_4']
    # Join all elements in the list with commas
    result = ', '.join(my_list)
    return result

In [None]:
def prepare_prompt(user_prompt:str, step_num:int):
    """module_name: str 
        choose from planner, verifier, action
    """
    system_prompt = PROMPT
    user_prompt += f"\nStep {step_num}:\n"
    out_dict = get_action_llm_input(env)
    for i in range(env.num_agents):
        agent = env.agent_names[i]
        obs = convert_obs_list_str(out_dict[f"{agent}'s observation"])
        user_prompt += f"{agent} observes {obs}\n"
    
    # user_prompt = convert_dict_to_string(get_action_llm_input(env))
    return system_prompt, user_prompt

In [None]:
def encode_image(image_path:str):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")
    
def get_action_llm_input(env):
    """
    Returns the input to the subtask LLM
    ### INPUT FORMAT ###
    {{Task: description of the task the robots are supposed to do,
    {agent_name[i]}'s observation: list of objects the {agent_name[0]} is observing}}
    """
    # extract the agent_name's observations based on how many agents there are
    llm_input_feats = ["Task"]
    for i in range(env.num_agents):
        agent_name = env.agent_names[i]
        llm_input_feats.extend([agent_name + "'s observation", ])
    return dict((k, env.input_dict[k]) for k in llm_input_feats)

def prepare_payload(user_prompt, step_num:int):
    """# payload consists of 
    * the image from each agent's perspective
    * the system prompt (which is constant)
    * the user prompt (which changes based on the state)
    This is then sent to the openai api to get the response (action or plan or verification of the plan)
    """
    system_prompt, user_prompt = prepare_prompt(user_prompt, step_num)
    base64_image = []
    image_path = env.get_frame(0)
    base64_image.append(encode_image(image_path))
    image_path = env.get_frame(1)
    base64_image.append(encode_image(image_path))
    payload = {
        "model": "gpt-4-vision-preview",
        "messages": [
            {
                "role": "system",
                "content": [
                    {"type": "text", "text": system_prompt},
                ],
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": user_prompt},
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/jpeg;base64,{base64_image[0]}"},
                    },
                    {
                        "type": "image_url",
                        "image_url": {"url": f"data:image/jpeg;base64,{base64_image[1]}"},
                    },
                ],
            }
        ],
        "max_tokens": 1000,
        "temperature": config.temperature,
    }
    return payload, user_prompt


def get_action(response):
    response_dict = response.json()
    # convert the string to a dict
    # json_acceptable_string = response_dict["choices"][0]["message"]["content"].replace("'", "\"").replace("\n", "").replace("json", "").replace("`", "")
    try:
        output = response_dict["choices"][0]["message"]["content"]
    except:
        pprint(f"error: choices not in response_dict\n {response_dict}\n")
    dict_match = re.search(r'\{.*\}', output, re.DOTALL)

    if dict_match:
        # Extract the dictionary from the matched string
        return json.loads(dict_match.group())
    else:
        # parsing error
        pprint(f"response_dict\n {response_dict}\n")
        pprint(f"output\n {output}\n")
        pprint(f"dict_match\n {dict_match}\n")

def get_gpt_response(user_prompt, step_num:int):
    payload, user_prompt = prepare_payload(user_prompt, step_num)
    response = requests.post(
    "https://api.openai.com/v1/chat/completions", headers=headers, json=payload
)
    return response, user_prompt


#  "look up by angle <angle>", "look down by angle <angle>"
def action_checker(actions):
    """
    Get closest valid action

    The action output from the model is in natural language.
    This function will find the env feasible action which has the closest embedding 
    to the natural language action output from the model.
    Eg: "pick up the apple" -> "PickupObject(Apple_1)"
    """
    checked_actions = []
    for act in actions:
        act = get_closest_feasible_action(act)
        action_type = act.split("(")[0]
        if action_type in ['PickupObject', 'PutObject', 'OpenObject', 'CloseObject', 'SliceObject', 'NavigateTo', 'ToggleObjectOn', 'ToggleObjectOff', 'CleanObject']:
            act = get_closest_object_id(act, env.object_dict)
        checked_actions.append(act)
    return checked_actions

In [None]:
def prepare_prompt_post_action(user_prompt, action, action_successes):
    for i in range(env.num_agents):
        agent = env.agent_names[i]
        user_prompt += f"{agent}'s action was {action[i]} and it was {'successful' if action_successes[i] else 'unsuccessful'}\n"
    return user_prompt

In [9]:
# modify parent directory to address relative imports in autoconfig file

# Run all tasks and floorplans in config.json

auto=AutoConfig()
amt_tasks=auto.get_amt_tasks()
for task_index in range(amt_tasks):

    auto.set_task(task_index)
    amt_floorplans=auto.get_amt_floorplans(task_index)

    for fp_index in range(amt_floorplans):

        auto.set_floorplan(fp_index)
        timeout=auto.get_task_timeout()

        max_retries = 3
        retries = 0

        while retries < max_retries:
            try: # sometimes rare errors happen, so give each task and floorplan a few tries

                env = AI2ThorEnv(auto.config())
                task = auto.task_string()
                d = env.reset(task=task)

                user_prompt = f"Task: {task}"

                logger=Logger('ActTest', env) # changed from Act to ActTest so that Act baselines are not overwritten
                for step_num in range(1, timeout+1):
                    response, user_prompt = get_gpt_response(user_prompt, step_num)
                    outdict = get_action(response)
                    # get closest feasible action
                    action_texts = [outdict[agent_name[0]], outdict[agent_name[1]]]
                    action = action_checker(action_texts)
                    # execute action in environment
                    d, action_successes = env.step(action)
                    # update user prompt with action taken and its success
                    user_prompt = prepare_prompt_post_action(user_prompt, action_texts, action_successes)
                    # append to dataframe
                    coverage = env.checker.get_coverage()
                    transport_rate = env.checker.get_transport_rate()
                    finished = env.checker.check_success()
                    # logging
                    logger.log_step(step=step_num, preaction=action_texts, action=action, success=action_successes, coverage=coverage, transport_rate=transport_rate, finished=finished)
                    print('_'*50)
                    print(f"Step {step_num}")
                    print(f"Completed Subtasks: ")
                    print("\n".join(env.checker.subtasks_completed))
                    # if the model outputs "Done" for both agents, break
                    if all(status == 'Done' for status in action):
                        break

                env.controller.stop()
                break
            except Exception as e:
                print(f"Error occurred: {e}. Retrying...")
                env.controller.stop()
                retries += 1
                if retries >= max_retries:
                    print("Max retries reached. Moving to next floorplan or task.")
                    break

In [None]:
pprint(logger.summarize())

In [None]:
# Run only the given task and floorplan

df = pd.DataFrame(columns=['Step', 'Action', 'Success', 'Coverage', 'Transport Rate'])

def append_row(df, step, action, success, coverage, transport_rate):
    row = pd.DataFrame([[step, action, success, coverage, transport_rate]], columns=['Step', 'Action', 'Success', 'Coverage', 'Transport Rate'])
    df = pd.concat([df, row])
    return df

env = AI2ThorEnv(config)
task = "Put the bread, lettuce, and tomato in the fridge"
d = env.reset(task=task)
user_prompt = f"Task: {task}"

logger=Logger('ActTest', env)
for step_num in range(1, config.horizon+1):
    response, user_prompt = get_gpt_response(user_prompt, step_num)
    outdict = get_action(response)
    # get closest feasible action
    action_texts = [outdict[agent_name[0]], outdict[agent_name[1]]]
    action = action_checker(action_texts)
    # execute action in environment
    d, action_successes = env.step(action)
    # update user prompt with action taken and its success
    user_prompt = prepare_prompt_post_action(user_prompt, action_texts, action_successes)
    # append to dataframe
    coverage = env.checker.get_coverage()
    transport_rate = env.checker.get_transport_rate()
    finished = env.checker.check_success()
    # logging
    df = append_row(df, step_num, action, action_successes, coverage, transport_rate)
    logger.log_step(step=step_num, preaction=action_texts, action=action, success=action_successes, coverage=coverage, transport_rate=transport_rate, finished=finished)
    print('_'*50)
    print(f"Step {step_num}")
    print(f"Completed Subtasks: ")
    print("\n".join(env.checker.subtasks_completed))
    # if the model outputs "Done" for both agents, break
    if all(status == 'Done' for status in action):
        break

df