# Tutorial 5 - Additional Context (Advanced)

- Agent takes in one additional parameter: `get_additional_context`
- This is a function that takes in the agent's internal parameters (self) and outputs a string to the LLM to append to the prompts of any LLM-based calls internally, e.g. `get_next_subtask`, `use_llm`, `reply_to_user`
- You have full flexibility to access anything the agent knows and configure an additional prompt to the agent

## Uses
- Used mainly to provide persistent variables to an agent that is not conveniently stored in `subtasks_completed`, e.g. ingredients remaining, location in grid for robot
<br></br>
- Implementing your own specific instructions to the default planner prompt
    - Implement your own memory-based RAG / additional prompt instruction if you need more than what the default prompt can achieve
<br></br>
- Avoid Multiple Similar Subtasks in `subtasks_history`
    - If you have multiple similar subtask names, then it is likely the Agent can be confused and think it has already done the subtask
    - In this case, you can disambiguate by resetting the agent and store the persistent information in `shared_variables` and provide it to the agent using `get_additional_context`

In [1]:
# !pip install taskgen-ai

In [1]:
# Set up API key and do the necessary imports
import os
from taskgen import *
import random

os.environ['OPENAI_API_KEY'] = '<YOUR API KEY HERE>'

# Basic Example: AI-Powered Task Scheduler
- This is how we can do a basic AI-powered chatbot that does scheduling (to be implemented)

# Advanced Example: Maze Navigator
- We can use the additional context to let agent know present state in a 2D grid, and obstacle positions that the agent has seen
- Task: Given current position and end position, navigate to end position using Up, Down, Left, Right

In [3]:
# These are the utility functions
def generate_grid(size, start_pos, exit_pos, obstacles):
    '''Generates a grid with obstacles'''
    grid = [[' ' for _ in range(size)] for _ in range(size)]
    grid[start_pos[0]][start_pos[1]] = 'S'  # Start
    grid[exit_pos[0]][exit_pos[1]] = 'E'  # Exit
    
    # Mark a basic path (optional, for simplicity in ensuring a path)
    # This part could be removed or replaced with a more sophisticated path marking
    path = set()
    for i in range(min(start_pos[0], exit_pos[0]), max(start_pos[0], exit_pos[0]) + 1):
        path.add((i, start_pos[1]))
    for j in range(min(start_pos[1], exit_pos[1]), max(start_pos[1], exit_pos[1]) + 1):
        path.add((exit_pos[0], j))
    
    # Randomly add obstacles
    count = 0
    while count < obstacles:
        r, c = random.randint(0, size-1), random.randint(0, size-1)
        if (r, c) not in path and grid[r][c] != 'O' and (r, c) != start_pos and (r, c) != exit_pos:
            grid[r][c] = 'O'
            count += 1
            
    return grid

def print_grid(grid):
    '''Prints the grid'''
    for row in grid:
        print(' '.join(row))
        
def check_valid_moves(cur_pos, grid, grid_size):
    '''Checks the valid moves in the grid given the cur_pos and grid_size. Returns list of valid moves within action space of Up, Down, Left, Right, Stay'''
    row, col = cur_pos
    mapping = {'Up': (-1, 0), 'Down': (1, 0), 'Left': (0, -1), 'Right': (0, 1), 'Stay': (0, 0)}
    valid_moves = []
    for key, value in mapping.items():
        row_offset, col_offset = value
        # check if valid move
        if 0 <= row+row_offset < grid_size and 0 <= col+col_offset < grid_size and grid[row+row_offset][col+col_offset] != 'O':
            valid_moves.append(key)
    return valid_moves

def update_obstacles(cur_pos, grid, grid_size, known_obstacle_pos):
    '''Returns the updated known obstacle positions in the current grid given the cur_pos and grid_size'''
    row, col = cur_pos
    mapping = {'Up': (-1, 0), 'Down': (1, 0), 'Left': (0, -1), 'Right': (0, 1), 'Stay': (0, 0)}
    for key, value in mapping.items():
        row_offset, col_offset = value
        next_row, next_col = row+row_offset, col+col_offset
        # check if valid move
        if 0 <= next_row < grid_size and 0 <= next_col < grid_size:
            # adds in obstacle if observed
            if grid[next_row][next_col] == 'O' and (next_row, next_col) not in known_obstacle_pos:
                known_obstacle_pos.append((next_row, next_col))
            # remove obstacle that is not observed
            elif (next_row, next_col) in known_obstacle_pos:
                known_obstacle_pos.remove((next_row, next_col))
    return known_obstacle_pos

In [4]:
def move(shared_variables, action: str):
    ''' Moves the agent according to the action and updates shared_variables '''
    if action not in shared_variables["next_valid_moves"]: 
        # if next move is not valid, reflect to agent
        return f'The current move of {action} is not valid. You must choose one action from {shared_variables["next_valid_moves"]}'
    mapping = {'Up': (-1, 0), 'Down': (1, 0), 'Left': (0, -1), 'Right': (0, 1), 'Stay': (0, 0)}
    
    # Retrieve from shared variables
    row, col = shared_variables["cur_pos"]
    grid = shared_variables["grid"]
    grid_size = shared_variables["grid_size"]
    known_obstacle_pos = shared_variables["known_obstacle_pos"]
    
    # Do processing for the next action
    row_offset, col_offset = mapping[action]
    newpos = (row+row_offset, col+col_offset)
    next_valid_moves = check_valid_moves(newpos, grid, grid_size)
    known_obstacle_pos = update_obstacles(newpos, grid, grid_size, known_obstacle_pos)
    
    # shift the current agent position
    grid[row][col] = ' '
    grid[row+row_offset][col+col_offset] = 'S'
    
    # Store back into shared variables
    shared_variables["cur_pos"] = newpos
    shared_variables["next_valid_moves"] = next_valid_moves
    shared_variables["known_obstacle_pos"] = known_obstacle_pos
    shared_variables["past_grid_states"].append(newpos)
    shared_variables["grid"] = grid
    
    print_grid(grid)
    
    return f'Action successful. Agent moved from {(row, col)} to {newpos}'

In [5]:
fn_list = [
    Function(f'''Moves the agent by <action>. 
From initial position (x, y), this is what you end up with after doing actions
{{'Up': (x-1, y), 'Down': (x+1, y), 'Left': (x, y-1), 'Right': (x, y+1), 'Stay': (x, y)}}''', 
             output_format = {"Status": "Outcome of action"}, external_fn = move)
]

In [6]:
def get_additional_context(agent):
    ''' Outputs additional information to the agent '''
    
    # process additional context based on shared variables (this is what is called persistent variables - variables that will be updated each step)
    additional_context = f'''Agent position in grid: {agent.shared_variables["cur_pos"]}
Exit Position: {agent.shared_variables["exit_pos"]}
Last 10 Visited Grid Positions: {agent.shared_variables["past_grid_states"][:10]}
Known Obstacle Positions: {agent.shared_variables["known_obstacle_pos"]}
Next Valid Moves: {agent.shared_variables["next_valid_moves"]}'''
    
    # you can also influence how the planner performs the plan with additional details
    additional_context += '''For Overall Plan, list the steps in detail, highlighting the predicted start and end positions for each action
Example Task: ```Navigate from (0, 0) to (1, 1)```
Example Overall Plan: ```['Move Down from (0, 0) to (1, 0), 'Move Right from (1, 0) to (1, 1)']```'''
    
    return additional_context

# Customise your grid here
# O means obstacles, S means start, E means end, blank means nothing there
grid_size = 5  # Grid size
start_pos = (random.randint(0, grid_size//2 - 1), random.randint(0, grid_size//2 - 1))  # Starting position
exit_pos = (random.randint(grid_size//2, grid_size-1), random.randint(grid_size//2, grid_size-1))  # Exit position
num_obstacles = 5  # Number of obstacles

grid = generate_grid(grid_size, start_pos, exit_pos, num_obstacles)
valid_moves = check_valid_moves(start_pos, grid, grid_size)

# Assign your agent
my_agent = Agent('Maze Navigator', 
                 f'''You are to move to the Exit Position of the maze. Task is completed when Agent's position is at Exit Position.
At each step, you can only move Up, Down, Left, Right or Stay.
You can only move to cells without obstacles. The valid moves will be made known to you at each time step.
Top left of grid is (0, 0), bottom right is {(grid_size, grid_size)}.
Grid position is referred to by (row, col)''', 
                 shared_variables = {
                    "cur_pos": start_pos,
                    "exit_pos": exit_pos,
                    "past_grid_states": [],
                    "next_valid_moves": valid_moves,
                    "known_obstacle_pos": [],
                    "grid_size": grid_size,
                    "grid": grid}, 
                 max_subtasks = 10,
                 get_additional_context = get_additional_context, # this is something new to store persistent states
                 default_to_llm = False).assign_functions(fn_list)

In [7]:
print('### Starting Grid ###')
print_grid(grid)
output = my_agent.run(f'Navigate from {start_pos} to {exit_pos}')

### Starting Grid ###
O        
  S      
    E    
  O     O
  O O    
Subtask identified: Move Down from (1, 1) to (2, 1)
Calling function move with parameters {'action': 'Down'}
O        
         
  S E    
  O     O
  O O    
> {'Status': 'Action successful. Agent moved from (1, 1) to (2, 1)'}

Subtask identified: Move Right from (2, 1) to (2, 2)
Calling function move with parameters {'action': 'Right'}
O        
         
    S    
  O     O
  O O    
> {'Status': 'Action successful. Agent moved from (2, 1) to (2, 2)'}

Task completed successfully!



In [8]:
my_agent.status()

Agent Name: Maze Navigator
Agent Description: You are to move to the Exit Position of the maze. Task is completed when Agent's position is at Exit Position.
At each step, you can only move Up, Down, Left, Right or Stay.
You can only move to cells without obstacles. The valid moves will be made known to you at each time step.
Top left of grid is (0, 0), bottom right is (5, 5).
Grid position is referred to by (row, col)
Available Functions: ['end_task', 'move']
Shared Variables: ['cur_pos', 'exit_pos', 'past_grid_states', 'next_valid_moves', 'known_obstacle_pos', 'grid_size', 'grid']
Task: Navigate from (1, 1) to (2, 2)
Subtasks Completed:
Subtask: Move Down from (1, 1) to (2, 1)
{'Status': 'Action successful. Agent moved from (1, 1) to (2, 1)'}

Subtask: Move Right from (2, 1) to (2, 2)
{'Status': 'Action successful. Agent moved from (2, 1) to (2, 2)'}

Is Task Completed: True


## Advanced: Avoiding Multiple Similar Subtasks
- In the above example, we made the Agent state the start and end position along with the action for each subtask, so as to disambiguate
- This does not work for all positions since an agent might have to repeat that transition again sometime later

- Solution:
    - Preserve only the overall state in `shared_variables`
    - Reset agent at each time step manually so `subtasks_history` will be reset

In [16]:
def get_additional_context(agent):
    ''' Outputs additional information to the agent '''
    
    # process additional context based on shared variables (this is what is called persistent variables - variables that will be updated each step)
    additional_context = f'''Agent position in grid: {agent.shared_variables["cur_pos"]}
Exit Position: {agent.shared_variables["exit_pos"]}
Last 10 Visited Grid Positions: {agent.shared_variables["past_grid_states"][:10]}
Known Obstacle Positions: {agent.shared_variables["known_obstacle_pos"]}
Next Valid Moves: {agent.shared_variables["next_valid_moves"]}'''
    
    return additional_context

# Customise your grid here
# O means obstacles, S means start, E means end, blank means nothing there
grid_size = 5  # Grid size
start_pos = (random.randint(0, grid_size//2 - 1), random.randint(0, grid_size//2 - 1))  # Starting position
exit_pos = (random.randint(grid_size//2, grid_size-1), random.randint(grid_size//2, grid_size-1))  # Exit position
num_obstacles = 5  # Number of obstacles

grid = generate_grid(grid_size, start_pos, exit_pos, num_obstacles)
valid_moves = check_valid_moves(start_pos, grid, grid_size)

# Assign your agent
my_agent = Agent('Maze Navigator', 
                 f'''You are to move to the Exit Position of the maze. Task is completed when Agent's position is at Exit Position.
At each step, you can only move Up, Down, Left, Right or Stay.
You can only move to cells without obstacles. The valid moves will be made known to you at each time step.
Top left of grid is (0, 0), bottom right is {(grid_size, grid_size)}.
Grid position is referred to by (row, col)''', 
                 shared_variables = {
                    "cur_pos": start_pos,
                    "exit_pos": exit_pos,
                    "past_grid_states": [],
                    "next_valid_moves": valid_moves,
                    "known_obstacle_pos": [],
                    "grid_size": grid_size,
                    "grid": grid}, 
                 max_subtasks = 10,
                 get_additional_context = get_additional_context, # this is something new to store persistent states
                 default_to_llm = False).assign_functions(fn_list)

In [17]:
print('### Starting Grid ###')
print_grid(grid)

num_moves = 0
# Keep resetting subtask's history so that we don't confuse the agent due to previous moves
# Also, if plan fails, start task from a subset of the earlier task by changing start position to current position
while num_moves < 50:
    my_agent.reset()
    my_agent.run(f"Navigate from {my_agent.shared_variables['cur_pos']} to {my_agent.shared_variables['exit_pos']}", num_subtasks = 1)
    # use rule-based task checks as agent may not get it right all the time
    if my_agent.shared_variables['cur_pos'] == my_agent.shared_variables['exit_pos']: 
        my_agent.task_completed = True
        break

### Starting Grid ###
        O
S        
        E
O     O  
O     O  
Subtask identified: Move Right
Calling function move with parameters {'action': 'Right'}
        O
  S      
        E
O     O  
O     O  
> {'Status': 'Action successful. Agent moved from (1, 0) to (1, 1)'}

Subtask identified: Move Right
Calling function move with parameters {'action': 'Right'}
        O
    S    
        E
O     O  
O     O  
> {'Status': 'Action successful. Agent moved from (1, 1) to (1, 2)'}

Subtask identified: Move Right
Calling function move with parameters {'action': 'Right'}
        O
      S  
        E
O     O  
O     O  
> {'Status': 'Action successful. Agent moved from (1, 2) to (1, 3)'}

Subtask identified: Move Right
Calling function move with parameters {'action': 'Right'}
        O
        S
        E
O     O  
O     O  
> {'Status': 'Action successful. Agent moved from (1, 3) to (1, 4)'}

Subtask identified: Move down
Calling function move with parameters {'action': 'Down'}
     

In [18]:
my_agent.status()

Agent Name: Maze Navigator
Agent Description: You are to move to the Exit Position of the maze. Task is completed when Agent's position is at Exit Position.
At each step, you can only move Up, Down, Left, Right or Stay.
You can only move to cells without obstacles. The valid moves will be made known to you at each time step.
Top left of grid is (0, 0), bottom right is (5, 5).
Grid position is referred to by (row, col)
Available Functions: ['end_task', 'move']
Shared Variables: ['cur_pos', 'exit_pos', 'past_grid_states', 'next_valid_moves', 'known_obstacle_pos', 'grid_size', 'grid']
Task: Navigate from (1, 4) to (2, 4)
Subtasks Completed:
Subtask: Move down
{'Status': 'Action successful. Agent moved from (1, 4) to (2, 4)'}

Is Task Completed: True


## Can we do better? (To be added)
- LLMs are not known for their planning abilities
- Perhaps we can use an in-built planner to decide what to do for the next moves, based on what we know of the current position, exit position, obstacle positions
- Imbue the plan as part of additional_context