## 2 Player Escape Room Game

- Can 2 LLM Agents fulfil an arbitrary task in a Escape Room-like gridworld?
- Let's find out!
- **Created:** 19 Aug 2023
- **Updated:** 22 Aug 2023

In [1]:
import pygame
import sys
import random
import os
import openai
import json
import re
import time

#API Keys
os.environ['OPENAI_API_TOKEN'] = 'YOUR_API_KEY_HERE'
openai.api_key = os.environ['OPENAI_API_TOKEN']

pygame 2.5.1 (SDL 2.28.2, Python 3.11.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


## Strict JSON Framework

- Taken from: https://github.com/tanchongmin/strictjson

- **system_prompt**: Write in whatever you want GPT to become. "You are a \<purpose in life\>"
- **user_prompt**: The user input. Later, when we use it as a function, this is the function input
- **output_format**: JSON format with the key as the output key, and the value as the output description
    - The output keys will be preserved exactly, while GPT will generate content to match the description of the value as best as possible

#### Example Usage
```python
res = strict_output(system_prompt = 'You are a classifier',
                    user_prompt = 'It is a beautiful day',
                    output_format = {"Sentiment": "Type of Sentiment",
                                    "Tense": "Type of Tense"})
                                    
print(res)
```

#### Example output
```{'Sentiment': 'Positive', 'Tense': 'Present'}```


In [3]:
def strict_output(system_prompt, user_prompt, output_format, default_category = "", output_value_only = False,
                  model = 'gpt-3.5-turbo', temperature = 0, num_tries = 2, verbose = False):
    ''' Ensures that OpenAI will always adhere to the desired output json format. 
    Uses rule-based iterative feedback to ask GPT to self-correct.
    Keeps trying up to num_tries it it does not. Returns empty json if unable to after num_tries iterations.
    If output field is a list, will treat as a classification problem and output best classification category.
    Text enclosed within < > will generated by GPT accordingly'''

    # if the user input is in a list, we also process the output as a list of json
    list_input = isinstance(user_prompt, list)
    # if the output format contains dynamic elements of < or >, then add to the prompt to handle dynamic elements
    dynamic_elements = '<' in str(output_format)
    # if the output format contains list elements of [ or ], then we add to the prompt to handle lists
    list_output = '[' in str(output_format)
    
    # start off with no error message
    error_msg = ''
    
    for i in range(num_tries):
        
        output_format_prompt = f'''\nYou are to output the following in json format: {output_format}. 
Do not put quotation marks or escape character \ in the output fields.'''
        
        if list_output:
            output_format_prompt += f'''\nIf output field is a list, classify output into the best element of the list.'''
        
        # if output_format contains dynamic elements, process it accordingly
        if dynamic_elements: 
            output_format_prompt += f'''
Any text enclosed by < and > indicates you must generate content to replace it. Example input: Go to <location>, Example output: Go to the garden
Any output key containing < and > indicates you must generate the key name to replace it. Example input: {{'<location>': 'description of location'}}, Example output: {{'school': 'a place for education'}}'''

        # if input is in a list format, ask it to generate json in a list
        if list_input:
            output_format_prompt += '''\nGenerate a list of json, one json for each input element.'''
            
        # Use OpenAI to get a response
        response = openai.ChatCompletion.create(
          temperature = temperature,
          model=model,
          messages=[
            {"role": "system", "content": system_prompt + output_format_prompt + error_msg},
            {"role": "user", "content": str(user_prompt)}
          ]
        )

        res = response['choices'][0]['message']['content'].replace('\'', '"')
        
        # ensure that we don't replace away aprostophes in text 
        res = re.sub(r"(\w)\"(\w)", r"\1'\2", res)

        if verbose:
            print('System prompt:', system_prompt + output_format_prompt + error_msg)
            print('\nUser prompt:', str(user_prompt))
            print('\nGPT response:', res)
    
        # try-catch block to ensure output format is adhered to
        try:
            output = json.loads(res)
            if isinstance(user_prompt, list):
                if not isinstance(output, list): raise Exception("Output format not in a list of json")
            else:
                output = [output]
                
            # check for each element in the output_list, the format is correctly adhered to
            for index in range(len(output)):
                for key in output_format.keys():
                    # unable to ensure accuracy of dynamic output header, so skip it
                    if '<' in key or '>' in key: continue
                    # if output field missing, raise an error
                    if key not in output[index]: raise Exception(f"{key} not in json output")
                    # check that one of the choices given for the list of words is an unknown
                    if isinstance(output_format[key], list):
                        choices = output_format[key]
                        # ensure output is not a list
                        if isinstance(output[index][key], list):
                            output[index][key] = output[index][key][0]
                        # output the default category (if any) if GPT is unable to identify the category
                        if output[index][key] not in choices and default_category:
                            output[index][key] = default_category
                        # if the output is a description format, get only the label
                        if ':' in output[index][key]:
                            output[index][key] = output[index][key].split(':')[0]
                            
                # if we just want the values for the outputs
                if output_value_only:
                    output[index] = [value for value in output[index].values()]
                    # just output without the list if there is only one element
                    if len(output[index]) == 1:
                        output[index] = output[index][0]
                    
            return output if list_input else output[0]

        except Exception as e:
            error_msg = f"\n\nResult: {res}\n\nError message: {str(e)}"
            print("An exception occurred:", str(e))
            print("Current invalid json format:", res)
         
    return {}

# Agents
- MetaData Format (Dictionary)
    - 'agent1': tuple containing x and y position of agent1
    - 'agent2': tuple containing x and y position of agent2
    - 'instruction': task description
    - 'world_description': description of the grid world and how the actions impact the agent
    - 'blue_squares': list of (x, y) positions of blue squares
    - 'grid_size': dimension of grid, boundary of grid is a wall
    - 'valid_moves': ['Up', 'Down', 'Left', 'Right', 'None'] and their descriptions

In [51]:
def findnearest(startpos, endposlist):
    ''' finds the nearest square given start position to end position, and starting direction '''
    nearest, direction = 1e9, None
    for endpos in endposlist:
        totaldist = abs(endpos[0]-startpos[0])+abs(endpos[1]-startpos[1])
        if totaldist < nearest:
            nearest = totaldist
            if endpos[1] > startpos[1]: direction = 'Down'
            elif endpos[1] < startpos[1]: direction = 'Up'
            elif endpos[0] > startpos[0]: direction = 'Right'
            elif endpos[0] < startpos[0]: direction = 'Left'
            else: direction = 'None'
    return endpos, direction
    
def NoneAgent(metadata, agent, memory):
    ''' Does nothing'''
    return ''

def RandomAgent(metadata, agent, memory):
    ''' Makes a move randomly '''
    return random.choice(metadata['valid_moves'])['Name']

def NearestCellAgent(metadata, agent, memory):
    ''' Makes a move to nearest blue cell '''
    start_cell = metadata[agent]
    endpos, direction = findnearest(start_cell, metadata['blue_squares'])
    memory['Goal'] = endpos
    return direction

def NearestCellCoopAgent(metadata, agent, memory):
    ''' Makes a move to nearest blue cell, does not move to cell other agent is occupying '''
    start_cell = metadata[agent]
    other_agent = 'agent1' if agent == 'agent2' else 'agent2'
    blue_squares = metadata['blue_squares']
    # remove from choice if other agent is within cell
    if metadata[other_agent] in blue_squares: 
        blue_squares.remove(metadata[other_agent])
    endpos, direction = findnearest(start_cell, blue_squares)
    memory['Goal'] = endpos
    return direction

In [56]:
def LLMAgent(metadata, agent, memory):
    ''' Uses LLM to decide what action to take. Make sure action lies within valid_moves '''
    # If the agent is already at destination, no need to ask LLM again
    if metadata[agent] == memory['Goal']:
        return 'None'
    
    # If there is already a series of moves, then no need to ask LLM again
    if 'List of Next Moves' not in memory:

        ### Use LLM decision making agent to plan a series of moves   
        res = strict_output(system_prompt = f'''You are {agent}.
You are given the following metadata: {metadata}.
Past memory: {memory}
Choose one of the moves in valid_moves to achieve the task.
''', 
            user_prompt = '',
            output_format = {
    "Current Position": "Your current position, for example [0, 1]",
    "Goal": "Your desired goal destination to reach, for example [1, 2]",
    "Offset": "Offset from Current Position to Goal. For example, for Current Position [1, 1] to Goal [3, 0], Offset is [2, -1]",
    "Thoughts": "How to get to goal destination from current position. Do not use quotation marks",
    "List of Next Moves": "A list of valid_moves to be executed that will get you to the destination. For example, ['Down', 'Right']."},
    verbose = True)

        # print('Metadata')
        # print(metadata)
        # print('GPT output')
        # print(res)
        # print()

        # Add List of Moves
        memory["List of Next Moves"] = res["List of Next Moves"]
    
    move = memory['List of Next Moves'].pop(0)
    # if last move, then ask the LLM Agent again what to do the next turn by deleting the memory
    if len(memory['List of Next Moves']) == 0: del memory['List of Next Moves']
    
    # Add to memory
    if 'history' not in memory:
        memory['history'] = []
    memory['history'].append(str(metadata[agent]))
    memory['history'].append(move)

    return move

def Conversation(memory1, memory2, metadata, controllable_agents, turns):
    ''' Uses the memory to converse between two AI agents '''
    # If there is already a goal, no need to keep asking
    # check the goal once every 10 turns
    if 'Goal' in memory1 and turns%10 != 0: 
        return memory1, memory2

    ### TO-DO Implement conversation (Now it is a central planner. Possible to have agents free-flow communicate)
    ### TO-DO Implement dynamic goal planner if the agent is not listening
    res = strict_output(system_prompt = f'''You are a central planner for agents.
You can only control {controllable_agents}. 
Metadata: {metadata}
You are to output the goal for the two agents based on the task at hand''',
    user_prompt = '',
    output_format = {
"Thoughts": "How to achieve the goal using the two agents. Do not use quotation marks.",
"Goal 1": "Goal destination for Agent 1, for example [1, 2]",
"Goal 2": "Goal destination for Agent 2, for example [1, 2]"},
verbose = True)
    
    memory1['Goal'] = res['Goal 1']
    memory2['Goal'] = res['Goal 2']
    
    return memory1, memory2

## Environment
- Uses 5 different iterations of grid size 5x5 to 20x20
- Ends at 100 turns
- 5 different agents
     + NoneAgent: Does nothing
     + RandomAgent: Does a random action
     + NearestCellAgent: Moves to nearest cell
     + NearestCellCoopAgent: Moves to nearest cell. If nearest cell is occupied, moves to another.
     + PlayerAgent: Player controls agent. Agent 1 is W, A, S, D. Agent 2 is Up, Down, Left, Right
     + LLMAgent: Uses environmental description to choose an action/list of actions
- Conversation: If one agent is an LLMAgent, have a central planner to assign goals to each agent

In [57]:
# Initialize pygame
pygame.init()

# set the AI agents here
# we have NoneAgent, RandomAgent, NearestCellAgent, NearestCellCoopAgent, LLMAgent
ai_agent1 = LLMAgent
ai_agent2 = LLMAgent

# Constants
WIDTH, HEIGHT = 800, 800
GRID_SIZE = random.randint(5, 20)
CELL_SIZE = min(WIDTH, HEIGHT) // (GRID_SIZE + 2)  # +2 for walls
RED = (255, 0, 0)
GREEN = (0, 255, 0)
GOLD = (255, 200, 0)
BLUE = (0, 0, 255)
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
WALL_COLOR = (50, 50, 50)
TOTAL_TURNS = 100

# Set up screen
screen = pygame.display.set_mode((WIDTH, HEIGHT))
pygame.display.set_caption('Two Actor Grid World')

# fill in number of squares here
NUM_BLUE = 2
NUM_RED = 2
NUM_GREEN = 2
instruction = "Step on one green square and one blue square"

class Actor:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.pos = [self.x, self.y]

    def move(self, dx, dy):
        if 0 < self.x + dx < GRID_SIZE+1 and 0 < self.y + dy < GRID_SIZE+1:
            self.x += dx
            self.y += dy
        self.pos = [self.x, self.y]

def draw_grid():
    for y in range(GRID_SIZE + 2):
        for x in range(GRID_SIZE + 2):
            color = WHITE
            if x == 0 or x == GRID_SIZE+1 or y == 0 or y == GRID_SIZE+1:
                color = WALL_COLOR
            elif [x, y] in blue_squares:
                color = BLUE
            elif [x, y] in green_squares:
                color = GREEN
            elif [x, y] in red_squares:
                color = RED
                
            pygame.draw.rect(screen, color, pygame.Rect(x * CELL_SIZE, y * CELL_SIZE, CELL_SIZE, CELL_SIZE))
            pygame.draw.line(screen, BLACK, (x * CELL_SIZE, 0), (x * CELL_SIZE, HEIGHT))
            pygame.draw.line(screen, BLACK, (0, y * CELL_SIZE), (WIDTH, y * CELL_SIZE))

stage = 1

while stage < 5:
    reward = 0
    memory1 = {}
    memory2 = {}
    
    GRID_SIZE = random.randint(5, 20)
    CELL_SIZE = min(WIDTH, HEIGHT) // (GRID_SIZE + 2)  # +2 for walls
    # Main game loop
    agent1 = Actor(1, 1)
    agent2 = Actor(GRID_SIZE, GRID_SIZE)
    
    # forms the random squares
    repeat = True
    while repeat:
        blue_squares = [[random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE)] for _ in range(NUM_BLUE)]
        green_squares = [[random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE)] for _ in range(NUM_GREEN)]
        red_squares = [[random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE)] for _ in range(NUM_RED)]
    
        # check sum of squares
        cur_squares = []
        repeat = False
        for square in blue_squares + green_squares + red_squares:
            if square in cur_squares:
                repeat = True   
            cur_squares.append(square)
            
    turns = 0
    running = True
    while running:
        ### AGENT LOGIC GOES HERE ###
        
        ## Replace these with relevant descriptions of the world
        moves_with_description = [
        {"Name": "Up", "Use Case": "used when row offset is negative, for example Offset of [0, -1]",
         "Description": "move a step towards negative row direction", 
         # "Example": ["[1, 1]", "Up", "[1, 0]"]
        },
        {"Name": "Down",  "Use Case": "used when row offset is positive, for example Offset of [0, 1]",
         "Description": "move a step towards positive row direction", 
         # "Example": ["[1, 1]", "Down", "[1, 2]"]
        },
        {"Name": "Left",  "Use Case": "used when col offset is negative, for example Offset of [-1, 0]",
         "Description": "move a step towards negative col direction", 
         # "Example": ["[1, 1]", "Left", "[0, 1]"]
        },
        {"Name": "Right",  "Use Case": "used when col offset is positive, for example Offset of [1, 0]",
         "Description": "move a step towards positive col direction", 
         # "Example": ["[1, 1]", "Right", "[2, 1]"]
        },
        {"Name": "None",  "Use Case": "used when Offset is [0, 0]",
         "Description": "stay in current square", 
         # "Example": ["[1, 1]", "None", "[1, 1]"]
        }]

        world_description = '''The grid coordinates are given as [col, row]. 
Down moves in the positive row direction, Right moves in the positive col direction.
The two agents have to accomplish a given task cooperatively.'''
        
        metadata = {'agent1': agent1.pos, 'agent2': agent2.pos, 'instruction': instruction,
                         'blue_squares': blue_squares.copy(), 'green_squares': green_squares.copy(), 
                         'red_squares': red_squares.copy(), 'grid_size': GRID_SIZE + 2, 
                         'world_description': world_description,
                         'valid_moves': moves_with_description}
        
        moved = False
        # have conversation between two AI actors (only when one is an LLMAgent)
        # store the coordinated goals for each agent in that memory
        controllable_agents = ''
        if ai_agent1 == LLMAgent:
            controllable_agents += ' agent1'
        if ai_agent2 == LLMAgent:
            controllable_agents += ' agent2'
        if len(controllable_agents) > 0:
            memory1, memory2 = Conversation(memory1, memory2, metadata, controllable_agents, turns)
        
        action1, action2 = '', ''
        # do AI actions over here
        action1 = ai_agent1(metadata, 'agent1', memory1)
        action2 = ai_agent2(metadata, 'agent2', memory2)
        
        for event in pygame.event.get():
            if event.type == pygame.KEYDOWN:
                if event.key == pygame.K_a:  # Agent 1
                    action1 = 'Left'
                elif event.key == pygame.K_w:
                    action1 = 'Up'
                elif event.key == pygame.K_s:
                    action1 = 'Down'
                elif event.key == pygame.K_d:
                    action1 = 'Right'
                if event.key == pygame.K_UP:  # Agent 2
                    action2 = 'Up'
                elif event.key == pygame.K_DOWN:
                    action2 = 'Down'
                elif event.key == pygame.K_LEFT:
                    action2 = 'Left'
                elif event.key == pygame.K_RIGHT:
                    action2 = 'Right'
        
        # print('Selected moves for each agent:', action1, action2)
        
        for action, agent in [(action1, agent1), (action2, agent2)]:
            if action != '':
                moved = True
                if action == 'Left':  # Agent 1
                    agent.move(-1, 0)
                elif action == 'Up':
                    agent.move(0, -1)
                elif action == 'Down':
                    agent.move(0, 1)
                elif action == 'Right':
                    agent.move(1, 0)
                    
        # if both agents choose None, that is the end of the turn too
        if action1 == 'None' and action2 == 'None': moved = True

        # Change this here for the win condition. Make sure it matches with the instruction
        win_condition = ((agent1.pos in blue_squares and agent2.pos in green_squares) or (agent1.pos in green_squares and agent2.pos in blue_squares))
        if win_condition and agent1.pos != agent2.pos:
            reward = 1
            running = False
            
        ### Display Screen ###
        screen.fill(WHITE)

        draw_grid()

        pygame.draw.circle(screen, GOLD, (agent1.x * CELL_SIZE + CELL_SIZE // 2, agent1.y * CELL_SIZE + CELL_SIZE // 2), CELL_SIZE // 4)
        pygame.draw.circle(screen, BLACK, (agent2.x * CELL_SIZE + CELL_SIZE // 2, agent2.y * CELL_SIZE + CELL_SIZE // 2), CELL_SIZE // 4)

        # Displaying texts
        font = pygame.font.Font(None, 30)
        stage_text = font.render(f"Stage: {stage}", True, WHITE)
        turns_text = font.render(f"Turns: {turns}", True, RED)
        instruction_text = font.render(instruction, True, WHITE)
        screen.blit(stage_text, (10, 10))
        screen.blit(turns_text, (10, 50))
        screen.blit(instruction_text, (WIDTH // 2 - instruction_text.get_width() // 2, 10))
        text = f'Agent 1 Last Move: {action1}'
        if 'Goal' in memory1:
            text += f', Goal: {memory1["Goal"]}'
        text = font.render(text, True, GOLD)
        screen.blit(text, (WIDTH // 2 - text.get_width() // 2, 90))
        if 'List of Next Moves' in memory1:
            text = f'List of Next Moves: {memory1["List of Next Moves"][:5]}'
            text = font.render(text, True, GOLD)
            screen.blit(text, (WIDTH // 2 - text.get_width() // 2, 130))
        text = f'Agent 2 Last Move: {action2}'
        if 'Goal' in memory2:
            text += f', Goal: {memory2["Goal"]}'
        text = font.render(text, True, BLACK)
        screen.blit(text, (WIDTH // 2 - text.get_width() // 2, 170))
        if 'List of Next Moves' in memory2:
            text = f'List of Next Moves: {memory2["List of Next Moves"][:5]}'
            text = font.render(text, True, BLACK)
            screen.blit(text, (WIDTH // 2 - text.get_width() // 2, 210))            

        # have a slight delay
        time.sleep(0.5)
        pygame.display.flip()

        if moved:
            turns += 1
        if turns > TOTAL_TURNS:
            running = False

    if reward == 1:
        print(f'Completed Stage {stage}!')
    else:
        print(f'Did not Complete {stage}!')
        
    stage += 1
    
pygame.quit()
try:
    sys.exit()
except SystemExit:
    pass

System prompt: You are a central planner for agents.
You can only control  agent1 agent2. 
Metadata: {'agent1': [1, 1], 'agent2': [20, 20], 'instruction': 'Step on one green square and one blue square', 'blue_squares': [[10, 6], [15, 6]], 'green_squares': [[11, 5], [18, 10]], 'red_squares': [[9, 19], [7, 15]], 'grid_size': 22, 'world_description': 'The grid coordinates are given as [col, row]. \nDown moves in the positive row direction, Right moves in the positive col direction.\nThe two agents have to accomplish a given task cooperatively.', 'valid_moves': [{'Name': 'Up', 'Use Case': 'used when row offset is negative, for example Offset of [0, -1]', 'Description': 'move a step towards negative row direction'}, {'Name': 'Down', 'Use Case': 'used when row offset is positive, for example Offset of [0, 1]', 'Description': 'move a step towards positive row direction'}, {'Name': 'Left', 'Use Case': 'used when col offset is negative, for example Offset of [-1, 0]', 'Description': 'move a ste

KeyboardInterrupt: 