## 2 Player Escape Room Game

- Plays until Stage 10
- 200 turns before Game Over
- Control both players using A, W, S, D, and Up, Down, Left, Right
- Get both players to the blue squares
- Updated 21 Aug

In [17]:
import pygame
import sys
import random
import os
import openai
import json
import re

#API Keys
os.environ['OPENAI_API_TOKEN'] = 'YOUR_API_KEY_HERE'
openai.api_key = os.environ['OPENAI_API_TOKEN']

## Strict JSON Framework

- Taken from: https://github.com/tanchongmin/strictjson

- **system_prompt**: Write in whatever you want GPT to become. "You are a \<purpose in life\>"
- **user_prompt**: The user input. Later, when we use it as a function, this is the function input
- **output_format**: JSON format with the key as the output key, and the value as the output description
    - The output keys will be preserved exactly, while GPT will generate content to match the description of the value as best as possible

#### Example Usage
```python
res = strict_output(system_prompt = 'You are a classifier',
                    user_prompt = 'It is a beautiful day',
                    output_format = {"Sentiment": "Type of Sentiment",
                                    "Tense": "Type of Tense"})
                                    
print(res)
```

#### Example output
```{'Sentiment': 'Positive', 'Tense': 'Present'}```


In [56]:
def strict_output(system_prompt, user_prompt, output_format, default_category = "", output_value_only = False,
                  model = 'gpt-3.5-turbo', temperature = 0, num_tries = 2, verbose = False):
    ''' Ensures that OpenAI will always adhere to the desired output json format. 
    Uses rule-based iterative feedback to ask GPT to self-correct.
    Keeps trying up to num_tries it it does not. Returns empty json if unable to after num_tries iterations.
    If output field is a list, will treat as a classification problem and output best classification category.
    Text enclosed within < > will generated by GPT accordingly'''

    # if the user input is in a list, we also process the output as a list of json
    list_input = isinstance(user_prompt, list)
    # if the output format contains dynamic elements of < or >, then add to the prompt to handle dynamic elements
    dynamic_elements = '<' in str(output_format)
    # if the output format contains list elements of [ or ], then we add to the prompt to handle lists
    list_output = '[' in str(output_format)
    
    # start off with no error message
    error_msg = ''
    
    for i in range(num_tries):
        
        output_format_prompt = f'''\nYou are to output the following in json format: {output_format}. 
Do not put quotation marks or escape character \ in the output fields.'''
        
        if list_output:
            output_format_prompt += f'''\nIf output field is a list, classify output into the best element of the list.'''
        
        # if output_format contains dynamic elements, process it accordingly
        if dynamic_elements: 
            output_format_prompt += f'''
Any text enclosed by < and > indicates you must generate content to replace it. Example input: Go to <location>, Example output: Go to the garden
Any output key containing < and > indicates you must generate the key name to replace it. Example input: {{'<location>': 'description of location'}}, Example output: {{'school': 'a place for education'}}'''

        # if input is in a list format, ask it to generate json in a list
        if list_input:
            output_format_prompt += '''\nGenerate a list of json, one json for each input element.'''
            
        # Use OpenAI to get a response
        response = openai.ChatCompletion.create(
          temperature = temperature,
          model=model,
          messages=[
            {"role": "system", "content": system_prompt + output_format_prompt + error_msg},
            {"role": "user", "content": str(user_prompt)}
          ]
        )

        res = response['choices'][0]['message']['content'].replace('\'', '"')
        
        # ensure that we don't replace away aprostophes in text 
        res = re.sub(r"(\w)\"(\w)", r"\1'\2", res)

        if verbose:
            print('System prompt:', system_prompt + output_format_prompt + error_msg)
            print('\nUser prompt:', str(user_prompt))
            print('\nGPT response:', res)
    
        # try-catch block to ensure output format is adhered to
        try:
            output = json.loads(res)
            if isinstance(user_prompt, list):
                if not isinstance(output, list): raise Exception("Output format not in a list of json")
            else:
                output = [output]
                
            # check for each element in the output_list, the format is correctly adhered to
            for index in range(len(output)):
                for key in output_format.keys():
                    # unable to ensure accuracy of dynamic output header, so skip it
                    if '<' in key or '>' in key: continue
                    # if output field missing, raise an error
                    if key not in output[index]: raise Exception(f"{key} not in json output")
                    # check that one of the choices given for the list of words is an unknown
                    if isinstance(output_format[key], list):
                        choices = output_format[key]
                        # ensure output is not a list
                        if isinstance(output[index][key], list):
                            output[index][key] = output[index][key][0]
                        # output the default category (if any) if GPT is unable to identify the category
                        if output[index][key] not in choices and default_category:
                            output[index][key] = default_category
                        # if the output is a description format, get only the label
                        if ':' in output[index][key]:
                            output[index][key] = output[index][key].split(':')[0]
                            
                # if we just want the values for the outputs
                if output_value_only:
                    output[index] = [value for value in output[index].values()]
                    # just output without the list if there is only one element
                    if len(output[index]) == 1:
                        output[index] = output[index][0]
                    
            return output if list_input else output[0]

        except Exception as e:
            error_msg = f"\n\nResult: {res}\n\nError message: {str(e)}"
            print("An exception occurred:", str(e))
            print("Current invalid json format:", res)
         
    return {}

# Agents
- MetaData Format (Dictionary)
    - 'agent1': tuple containing x and y position of agent1
    - 'agent2': tuple containing x and y position of agent2
    - 'active_agent': 'agent1' or 'agent2', the current active agent to make an action for
    - 'blue_squares': list of (x, y) positions of blue squares
    - 'grid_size': dimension of grid, boundary of grid is a wall
    - 'valid_moves': ['Up', 'Down', 'Left', 'Right', 'None']

In [87]:
def findnearest(startpos, endposlist):
    ''' finds the nearest square given start position to end position, and starting direction '''
    nearest, direction = 1e9, None
    for endpos in endposlist:
        totaldist = abs(endpos[0]-startpos[0])+abs(endpos[1]-startpos[1])
        if totaldist < nearest:
            nearest = totaldist
            if endpos[1] > startpos[1]: direction = 'Down'
            elif endpos[1] < startpos[1]: direction = 'Up'
            elif endpos[0] > startpos[0]: direction = 'Right'
            elif endpos[0] < startpos[0]: direction = 'Left'
            else: direction = 'None'
    return endpos, direction
    
def NoneAgent(metadata, memory):
    ''' Does nothing'''
    return ''

def RandomAgent(metadata, memory):
    ''' Makes a move randomly '''
    return random.choice(metadata['valid_moves'])

def NearestCellAgent(metadata, memory):
    ''' Makes a move to nearest blue cell '''
    active_agent = metadata['active_agent']
    start_cell = metadata[active_agent]
    endpos, direction = findnearest(start_cell, metadata['blue_squares'])
    return direction

def NearestCellCoopAgent(metadata, memory):
    ''' Makes a move to nearest blue cell, does not move to cell other agent is occupying '''
    active_agent = metadata['active_agent']
    start_cell = metadata[active_agent]
    other_agent = 'agent1' if active_agent == 'agent2' else 'agent2'
    blue_squares = metadata['blue_squares']
    # remove from choice if other agent is within cell
    if metadata[other_agent] in blue_squares: 
        blue_squares.remove(metadata[other_agent])
    endpos, direction = findnearest(start_cell, blue_squares)
    return direction

In [103]:
def LLMAgent(metadata, memory):
    ''' Uses LLM to decide what action to take. Make sure action lies within valid_moves '''
    ### TO-DO use LLM decision making agent to make a move   
    res = strict_output(system_prompt = f'''You are given the following metadata: {metadata}.
Past memory: {memory}
You are one of the agents in the world and your aim is to achieve the task collaboratively.
The grid world has coordinates (row, col).
Right moves in positive row direction, Down moves in positive col direction.
Choose one of the moves in valid_moves to achieve the task.
''', 
        user_prompt = '',
        output_format = {
"Current Position": "Your current position, for example (0, 1)",
"Destination": "Your desired destination to reach, for example (1, 2)",
"Next Move": "A move within valid_moves that will get you closer to the destination"},
verbose = True)
    
    print('Metadata')
    print(metadata)
    print('GPT output')
    print(res)
    print()
    
    # add to memory
    if 'history' not in memory:
        memory['history'] = []
    active_agent = metadata['active_agent']
    start_cell = metadata[active_agent]
    memory['history'].append(str(start_cell))
    memory['history'].append(res["Next Move"])

    return res["Next Move"]

def Conversation(memory1, memory2):
    ''' Uses the memory to converse between two AI agents '''
    ### TO-DO Implement conversation
    
    return memory1, memory2

In [105]:
# Initialize pygame
pygame.init()

# set the AI agents here
# we have NoneAgent, RandomAgent, NearestCellAgent, NearestCellCoopAgent, LLMAgent
## TO-DO Implement LLM Agent here
ai_agent1 = NearestCellAgent
ai_agent2 = LLMAgent

# Constants
WIDTH, HEIGHT = 800, 800
GRID_SIZE = random.randint(5, 7)
CELL_SIZE = min(WIDTH, HEIGHT) // (GRID_SIZE + 2)  # +2 for walls
RED = (255, 0, 0)
BLUE = (0, 0, 255)
WHITE = (255, 255, 255)
BLACK = (0, 0, 0)
WALL_COLOR = (50, 50, 50)
TOTAL_TURNS = 200

# Set up screen
screen = pygame.display.set_mode((WIDTH, HEIGHT))
pygame.display.set_caption('Two Actor Grid World')

instruction = "Step on the two blue squares"

class Actor:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.pos = (self.x, self.y)

    def move(self, dx, dy):
        if 0 < self.x + dx < GRID_SIZE+1 and 0 < self.y + dy < GRID_SIZE+1:
            self.x += dx
            self.y += dy
        self.pos = (self.x, self.y)

def draw_grid():
    for y in range(GRID_SIZE + 2):
        for x in range(GRID_SIZE + 2):
            color = WHITE
            if x == 0 or x == GRID_SIZE+1 or y == 0 or y == GRID_SIZE+1:
                color = WALL_COLOR
            elif (x, y) in blue_squares:
                color = BLUE
            pygame.draw.rect(screen, color, pygame.Rect(x * CELL_SIZE, y * CELL_SIZE, CELL_SIZE, CELL_SIZE))
            pygame.draw.line(screen, BLACK, (x * CELL_SIZE, 0), (x * CELL_SIZE, HEIGHT))
            pygame.draw.line(screen, BLACK, (0, y * CELL_SIZE), (WIDTH, y * CELL_SIZE))

stage = 1

while stage < 10:
    reward = 0
    memory1 = {}
    memory2 = {}
    
    GRID_SIZE = random.randint(5, 7)
    CELL_SIZE = min(WIDTH, HEIGHT) // (GRID_SIZE + 2)  # +2 for walls
    # Main game loop
    agent1 = Actor(1, 1)
    agent2 = Actor(GRID_SIZE, GRID_SIZE)
    blue_squares = [(random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE)),
            (random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE))]
    while blue_squares[0] == blue_squares[1]:
        blue_squares = [(random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE)),
                        (random.randint(2, GRID_SIZE), random.randint(2, GRID_SIZE))]

    turns = 0
    running = True
    while running:
        moved = False
        # have conversation between two AI actors
        # store the coordinated goals for each agent in that memory
        memory1, memory2 = Conversation(memory1, memory2)
        
        action1, action2 = '', ''
        # do AI actions over here
        # moves_with_description = [
# {"Name": "Up", "Description": "move upwards with offset (0, -1)", "Example": "(1, 1) -> (1, 0)"},
# {"Name": "Down", "Description": "move down with offset (0, 1)", "Example": "(1, 1) -> (1, 2)"},
# {"Name": "Left", "Description": "move left with offset (-1, 0)", "Example": "(1, 1) -> (0, 1)"},
# {"Name": "Right", "Description": "move right with offset (1, 0)", "Example": "(1, 1) -> (2, 1)"},
# {"Name": "None", "Description": "stay in current square", "Example": "(1, 1) -> (1, 1)"}]
        moves_with_description = ['Up', 'Down', 'Left', 'Right', 'None']
        action1 = ai_agent1({'agent1': agent1.pos, 'agent2': agent2.pos, 'active_agent': 'agent1', 'instruction': instruction,
                         'blue_squares': blue_squares.copy(), 'grid_size': GRID_SIZE + 2, 
                         'valid_moves': moves_with_description}, memory1)
        action2 = ai_agent2({'agent1': agent1.pos, 'agent2': agent2.pos, 'active_agent': 'agent2', 'instruction': instruction,
                         'blue_squares': blue_squares.copy(), 'grid_size': GRID_SIZE + 2, 
                         'valid_moves': moves_with_description}, memory2)
        
        print('Selected moves for each agent:', action1, action2)
        
        for action, agent in [(action1, agent1), (action2, agent2)]:
            if action != '':
                moved = True
                if action == 'Left':  # Agent 1
                    agent.move(-1, 0)
                elif action == 'Up':
                    agent.move(0, -1)
                elif action1 == 'Down':
                    agent.move(0, 1)
                elif action1 == 'Right':
                    agent.move(1, 0)
                    
        # if both agents choose None, that is the end of the turn too
        if action1 == 'None' and action2 == 'None': moved = True
        
        # do Player actions over here
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            if event.type == pygame.KEYDOWN:
                moved = True
                if event.key == pygame.K_a:  # Agent 1
                    agent1.move(-1, 0)
                elif event.key == pygame.K_w:
                    agent1.move(0, -1)
                elif event.key == pygame.K_s:
                    agent1.move(0, 1)
                elif event.key == pygame.K_d:
                    agent1.move(1, 0)
                elif event.key == pygame.K_UP:  # Agent 2
                    agent2.move(0, -1)
                elif event.key == pygame.K_DOWN:
                    agent2.move(0, 1)
                elif event.key == pygame.K_LEFT:
                    agent2.move(-1, 0)
                elif event.key == pygame.K_RIGHT:
                    agent2.move(1, 0)
                elif event.key == pygame.K_q:
                    running = False

        if (agent1.x, agent1.y) in blue_squares and (agent2.x, agent2.y) in blue_squares and (agent1.x, agent1.y) != (agent2.x, agent2.y):
            reward = 1
            running = False

        screen.fill(WHITE)

        draw_grid()

        pygame.draw.circle(screen, BLACK, (agent1.x * CELL_SIZE + CELL_SIZE // 2, agent1.y * CELL_SIZE + CELL_SIZE // 2), CELL_SIZE // 4)
        pygame.draw.circle(screen, BLACK, (agent2.x * CELL_SIZE + CELL_SIZE // 2, agent2.y * CELL_SIZE + CELL_SIZE // 2), CELL_SIZE // 4, 2)  # outline for agent2

        # Displaying texts
        font = pygame.font.Font(None, 36)
        stage_text = font.render(f"Stage: {stage}", True, RED)
        turns_text = font.render(f"Turns: {turns}", True, RED)
        instruction_text = font.render(instruction, True, RED)
        screen.blit(stage_text, (10, 10))
        screen.blit(turns_text, (10, 50))
        screen.blit(instruction_text, (WIDTH // 2 - instruction_text.get_width() // 2, 10))

        pygame.display.flip()

        if moved:
            turns += 1
        if turns > TOTAL_TURNS:
            running = False

    if reward == 1:
        print(f'Completed Stage {stage}!')
    else:
        print(f'Did not Complete {stage}!')
        
    stage += 1
    
pygame.quit()
try:
    sys.exit()
except SystemExit:
    pass

System prompt: You are given the following metadata: {'agent1': (1, 1), 'agent2': (5, 5), 'active_agent': 'agent2', 'instruction': 'Step on the two blue squares', 'blue_squares': [(4, 2), (5, 2)], 'grid_size': 7, 'valid_moves': ['Up', 'Down', 'Left', 'Right', 'None']}.
Past memory: {}
You are one of the agents in the world and your aim is to achieve the task collaboratively.
The grid world has coordinates (row, col).
Right moves in positive row direction, Down moves in positive col direction.
Choose one of the moves in valid_moves to achieve the task.

You are to output the following in json format: {'Current Position': 'Your current position, for example (0, 1)', 'Destination': 'Your desired destination to reach, for example (1, 2)', 'Next Move': 'A move within valid_moves that will get you closer to the destination'}. 
Do not put quotation marks or escape character \ in the output fields.

User prompt: 

GPT response: {"Current Position": "(5, 5)", "Destination": "(4, 2)", "Next Move

KeyboardInterrupt: 