--- 1. GLOBAL VARIABLES & CONFIGURATION ---

In [1]:
import random
import copy

In [2]:
GRID_WIDTH = 5
GRID_HEIGHT = 5

Using coordinates as (x, y) which map to (col, row)

(0, 0) is top-left

In [3]:
INITIAL_PICKUP_LOCS = {(1, 2): 5, (3, 3): 5}  # (x, y): block_count
INITIAL_DROPOFF_LOCS = {(0, 0): 0, (0, 4): 0, (2, 2): 0, (4, 3): 0} # (x, y): block_count
TOTAL_BLOCKS_AT_START = sum(INITIAL_PICKUP_LOCS.values())

--- Agent Start Positions ---

In [4]:
AGENT_F_START = {'x': 0, 'y': 2, 'has_block': False}
AGENT_M_START = {'x': 4, 'y': 2, 'has_block': False}

--- RL Parameters ---

In [5]:
LEARNING_RATE = 0.3  # Alpha (α)
DISCOUNT_FACTOR = 0.5  # Gamma (γ)
ACTIONS = ['North', 'South', 'East', 'West', 'Pickup', 'Dropoff']

--- Global State Variables (will be modified during the simulation) ---

Agent states (dictionaries)

In [6]:
agent_f = {}
agent_m = {}

World state (dictionaries)

In [7]:
pickup_locs = {}
dropoff_locs = {}
total_blocks_delivered = 0

The Q-Tables. We'll use Option (a) from the prompt: one Q-table per agent

Key: state tuple. Value: dictionary of {action: q-value}

In [8]:
q_table_f = {}
q_table_m = {}

--- 2. ENVIRONMENT & AGENT FUNCTIONS ---

In [9]:
def reset_world_and_agents(): # general function for both sarsa and q-learning
    """Resets the environment and agents to their initial states."""

    global pickup_locs, dropoff_locs, total_blocks_delivered, agent_f, agent_m # sets some variables as global to modify them
    
    # Reset world
    pickup_locs = copy.deepcopy(INITIAL_PICKUP_LOCS) # uses deepcopy to avoid reference issues and 
    dropoff_locs = copy.deepcopy(INITIAL_DROPOFF_LOCS)
    total_blocks_delivered = 0 # resets the delivered blocks count

    # Reset agents
    agent_f = copy.deepcopy(AGENT_F_START) # resets agent F to its initial state
    agent_m = copy.deepcopy(AGENT_M_START) # resets agent M to its initial state

    print("World and agents reset.")

In [10]:
def is_terminal_state(): # general function for both sarsa and q-learning
    """Checks if all blocks have been delivered."""
    return total_blocks_delivered == TOTAL_BLOCKS_AT_START # this checks if this state is terminal (terminal being the end state)

In [11]:
def get_possible_actions(agent, other_agent): # general function for both sarsa and q-learning
    """Returns a list of all valid actions for the agent."""
    
    possible = [] # list of possible actions (since some actions may be invalid such as moving out of bounds or into another agent)
   
    agent_pos = (agent['x'], agent['y']) # current position of the agent
    other_pos = (other_agent['x'], other_agent['y']) # current position of the other agent
    
    # Check movement actions
    for action, (dx, dy) in {'North': (0, -1), 'South': (0, 1), 'East': (1, 0), 'West': (-1, 0)}.items(): # for each movement action and its corresponding delta x and delta y
        next_x, next_y = agent['x'] + dx, agent['y'] + dy # calculate the next position after the move
        
        # Check boundaries
        if 0 <= next_x < GRID_WIDTH and 0 <= next_y < GRID_HEIGHT: # check if the next position is within grid bounds
            # Check collision with other agent
            if (next_x, next_y) != other_pos: # check if the next position does not collide with the other agent
                possible.append(action) # add the action to possible actions list

    # Check Pickup
    if not agent['has_block'] and agent_pos in pickup_locs and pickup_locs[agent_pos] > 0: # if the agent does not have a block and is at a pickup location with available blocks
        possible.append('Pickup')

    # Check Dropoff
    if agent['has_block'] and agent_pos in dropoff_locs: # if the agent has a block and is at a dropoff location
        # Assuming dropoff locs have infinite capacity for this simple version
        # The prompt implies a capacity (e.g., 5), which you can add here
        possible.append('Dropoff')
        
    return possible

In [12]:
def apply_action(agent, other_agent, action): # general function for both sarsa and q-learning
    """
    Applies an agent's action to the world.
    action is one of ['North', 'South', 'East', 'West', 'Pickup', 'Dropoff'].
    MODIFIES the global 'agent', 'pickup_locs', 'dropoff_locs', 'total_blocks_delivered'.
    Returns (reward, new_agent_state_tuple)
    """
    global total_blocks_delivered # We need to modify this global 
    agent_pos = (agent['x'], agent['y']) # get current position of the agent
    other_pos = (other_agent['x'], other_agent['y']) # get current position of the other agent

    reward = -1  # Default cost for a movement step
    is_valid = True

    new_x, new_y = agent['x'], agent['y'] # Initialize new position variables

    if action == 'North': # if the following action is taken
        new_y -= 1
    elif action == 'South':
        new_y += 1
    elif action == 'East':
        new_x += 1
    elif action == 'West':
        new_x -= 1
    elif action == 'Pickup': # however, if the action is pickup, then 
        # check if pickup is valid
        if not agent['has_block'] and agent_pos in pickup_locs and pickup_locs[agent_pos] > 0:
            agent['has_block'] = True # agent now has a block
            pickup_locs[agent_pos] -= 1 # decrease the number of blocks at that pickup location
            reward = 13 # reward for successful pickup. this number can be adjusted
        else:
            is_valid = False # invalid action
            reward = -10  # Penalize invalid action (trying to pick up when already carrying a block)
    elif action == 'Dropoff': # if the action is dropoff
        # check if dropoff is valid
        if agent['has_block'] and agent_pos in dropoff_locs:
            agent['has_block'] = False # agent no longer has a block
            dropoff_locs[agent_pos] += 1 # increase the number of blocks at that dropoff location
            total_blocks_delivered += 1 # increment the total blocks delivered
            reward = 13 # reward for successful dropoff. this number can be adjusted
        else:
            is_valid = False # invalid action
            reward = -10 # Penalize invalid action (trying to drop off without a block or at an invalid location)

    # Check for boundary violations (for move actions)
    if action in ['North', 'South', 'East', 'West']: # if the action is a movement action
        # check for out of bounds
        if not (0 <= new_x < GRID_WIDTH and 0 <= new_y < GRID_HEIGHT):
            is_valid = False
            reward = -10 # penalize for moving out of bounds
            new_x, new_y = agent['x'], agent['y']  # stay in place
        
        # Check for blockage (collision)
        elif (new_x, new_y) == other_pos:
            is_valid = False
            reward = -10 # penalize for collision with other agent
            new_x, new_y = agent['x'], agent['y']  # Stay in place

    # If a move was valid, update agent's position
    if action in ['North', 'South', 'East', 'West'] and is_valid:
        agent['x'], agent['y'] = new_x, new_y

    # Return the reward and the *new* state of the agent who just moved
    new_state = get_state(agent, other_agent) 
    return reward, new_state

--- 3. REINFORCEMENT LEARNING FUNCTIONS ---

In [13]:
def get_state(agent, other_agent): # general function for both sarsa and q-learning
    """
    Generates the state tuple. This is the key to the Q-table.
    State = (agent_x, agent_y, agent_has_block, other_agent_x, other_agent_y)
    This implements option (a) from the prompt [cite: 173-176]
    """
    # returns the positions and block status of both agents as a tuple
    return (agent['x'], agent['y'], agent['has_block'], other_agent['x'], other_agent['y']) 

In [14]:
def get_q_value(q_table, state, action): # general function for both sarsa and q-learning 
    """Helper to get Q-value for a state-action pair, initializing if not present. this gets the q value from the q-table"""
    
    if state not in q_table: # if the state is not in the q-table
        # Initialize all actions with 0 for this new state
        q_table[state] = {act: 0.0 for act in ACTIONS} # each action needs to have a position + initial Q-value of 0.0
        
    # This handles cases where the action might be invalid (e.g., 'Pickup' at a D loc)
    # We still want to be able to look it up without an error
    if action not in q_table[state]:
        q_table[state][action] = 0.0
            
    return q_table[state][action]


In [15]:
def get_max_q_action(q_table, state, possible_actions): # general function for both sarsa and q-learning
    """Finds the action with the highest Q-value from the possible actions."""
    if not possible_actions:
        return None, 0.0  # No action possible (pickup/dropoff not available. obviously should not happen in practice)
    
    max_q = -float('inf')
    best_actions = []

    for action in possible_actions: # for each possible action (north, south, east, west, pickup, dropoff)
        q_val = get_q_value(q_table, state, action) # get the q value for that state and action
        if q_val > max_q: # if the q value is greater than the current max q value
            max_q = q_val # update max q value
            best_actions = [action] # start a new list of best actions
        elif q_val == max_q:
            best_actions.append(action)

    # "break ties by rolling a dice"
    return random.choice(best_actions), max_q

In [16]:
def choose_action(q_table, state, possible_actions, policy): # general function for both sarsa and q-learning
    """Chooses an action based on the current policy. Current policy being one of 'PRANDOM', 'PEXPLOIT', 'PGREEDY'."""
    
    if not possible_actions: # if there are no possible actions
        return None  # Agent is trapped
    
    
    # "If pickup and dropoff is applicable, choose this operator" 
    # This rule overrides all policies
    if 'Pickup' in possible_actions:
        return 'Pickup'
    if 'Dropoff' in possible_actions:
        return 'Dropoff'

    # --- PRANDOM --- 
    if policy == 'PRANDOM':
        return random.choice(possible_actions) # choose a random action from possible actions

    # --- PEXPLOIT --- 
    elif policy == 'PEXPLOIT': 
        if random.random() < 0.8:  # 80% chance to exploit
            # we call the general function to get the best action for that state + all possible actions
            best_action, _ = get_max_q_action(q_table, state, possible_actions)
            return best_action 
        else:  # 20% chance to explore - we go against the best action
            # we still need to get the best action to avoid it
            best_action, _ = get_max_q_action(q_table, state, possible_actions) 
            
            # "choose a different applicable operator randomly"
            # we make sure to not choose the best action so we choose from the other actions
            exploration_choices = [a for a in possible_actions if a != best_action]
            if not exploration_choices:
                return best_action  # No other choice, so just return the best one
            return random.choice(exploration_choices)

    # --- PGREEDY --- 
    elif policy == 'PGREEDY': 
        # we call the general function to get the best action for that state + all possible actions
        best_action, _ = get_max_q_action(q_table, state, possible_actions)
        return best_action

    raise ValueError(f"Unknown policy: {policy}")


In [17]:
def update_q_table(q_table, old_state, action, reward, new_state, new_possible_actions): # this function is only for q-learning
    """Performs the Q-Learning update rule."""
    # Get the Q-value for the action we took
    # this works by calling the general function to get the q value for that state and action
    old_q = get_q_value(q_table, old_state, action) 

    # get the max Q-value for the next state
    # using the new state and possible actions, we call the general function to get the max q action
    _ , max_next_q = get_max_q_action(q_table, new_state, new_possible_actions)

    # q-learning formula: Q(s,a) = Q(s,a) + α * [R + γ * max_a' Q(s',a') - Q(s,a)]
    temporal_difference = reward + (DISCOUNT_FACTOR * max_next_q) - old_q
    new_q = old_q + (LEARNING_RATE * temporal_difference)

    q_table[old_state][action] = new_q # make sure to update the q-table specific to the agent

In [18]:
def update_sarsa_table(q_table, old_state, action, reward, new_state, next_action): # this function is only for sarsa
    """Performs the SARSA update rule."""
    old_q = get_q_value(q_table, old_state, action) # get the q value for the old state and action
    next_q = get_q_value(q_table, new_state, next_action) # get the q value for the next state and next action
    # sarsa formula Q(s,a) = Q(s,a) + α * [R + γ * Q(s',a') - Q(s,a)]
    temporal_difference = reward + (DISCOUNT_FACTOR * next_q) - old_q 
    new_q = old_q + (LEARNING_RATE * temporal_difference)
    q_table[old_state][action] = new_q # make sure to update the q-table specific to the agent

--- 4. MAIN SIMULATION LOOP (Example: Experiment 1.c) ---

In [19]:
def run_simulation():
    """Main experiment loop."""
    # --- Experiment Setup ---
    # Let's set up Experiment 1.c: 500 steps PRANDOM, 7500 steps PEXPLOIT
    TOTAL_STEPS = 8000
    POLICY_SCHEDULE = [(500, 'PRANDOM'), (7500, 'PEXPLOIT')]
    # CURRENT_ALGORITHM = 'Q_LEARNING' # vs 'SARSA'
    CURRENT_ALGORITHM = 'SARSA'


    # --- Statistics Tracking ---
    total_reward_f = 0
    total_reward_m = 0
    terminal_states_reached = 0
    steps_per_run = [] # List to store how many steps it took to reach each terminal state
    current_run_steps = 0

    # --- Initialization ---
    reset_world_and_agents()

    # The simulation alternates turns: F, M, F, M, ...
    # "female agent acting first" [cite: 169]
    agents = [agent_f, agent_m]
    q_tables = [q_table_f, q_table_m]
    agent_names = ['F', 'M']

    current_policy = POLICY_SCHEDULE[0][1]
    policy_switch_step = POLICY_SCHEDULE[0][0]
    policy_index = 0

    print(f"Starting simulation: {TOTAL_STEPS} steps.")
    print(f"Initial policy: {current_policy}")

    for step in range(TOTAL_STEPS):
        
        # --- Policy Switching Logic ---
        if policy_index < len(POLICY_SCHEDULE) and step >= policy_switch_step:
            policy_index += 1
            if policy_index < len(POLICY_SCHEDULE):
                switch_step, new_policy = POLICY_SCHEDULE[policy_index]
                current_policy = new_policy
                policy_switch_step += switch_step # Next switch step is cumulative
                print(f"--- Step {step}: Switching policy to {current_policy} ---")
            
        # --- Agent Turn ---
        agent_turn = step % 2  # 0 for F, 1 for M
        
        acting_agent = agents[agent_turn]
        other_agent = agents[1 - agent_turn]
        acting_q_table = q_tables[agent_turn]
        
        # --- 1. Get State & Possible Actions ---
        old_state = get_state(acting_agent, other_agent)
        possible_actions = get_possible_actions(acting_agent, other_agent)
        
        # --- 2. Choose Action ---
        action = choose_action(acting_q_table, old_state, possible_actions, current_policy)
        
        if action is None:
            # Agent is trapped, no action to take. Skip turn.
            continue
            
        # --- 3. Apply Action & Get Reward ---
        # This function modifies the agent's state *in-place*
        reward, new_state = apply_action(acting_agent, other_agent, action)
        
        # --- 4. Get New State's Possible Actions (for Q-Learning) ---
        # Note: The 'other_agent' for this *new* state is the same 'other_agent'
        # because it hasn't moved yet.
        new_possible_actions = get_possible_actions(acting_agent, other_agent)

        # --- 5. Update Q-Table ---
        if CURRENT_ALGORITHM == 'Q_LEARNING':
            update_q_table(acting_q_table, old_state, action, reward, new_state, new_possible_actions)
        
        elif CURRENT_ALGORITHM == 'SARSA':
            # For SARSA, you would choose the *next* action right here
            next_action = choose_action(acting_q_table, new_state, new_possible_actions, current_policy)
            update_sarsa_table(acting_q_table, old_state, action, reward, new_state, next_action)
            
        # --- 6. Stats & Terminal State Check ---
        if agent_turn == 0:
            total_reward_f += reward
        else:
            total_reward_m += reward
            
        current_run_steps += 1
            
        if is_terminal_state():
            print(f"--- Step {step}: Terminal state {terminal_states_reached + 1} reached in {current_run_steps} steps! ---")
            terminal_states_reached += 1
            steps_per_run.append(current_run_steps)
            current_run_steps = 0
            
            # "if a terminal state is reached, restart... but do not reset the Q-table" [cite: 211-212]
            reset_world_and_agents() 

    print("\n--- Simulation Finished ---")
    print(f"Total Steps: {TOTAL_STEPS}")
    print(f"Total Reward (Agent F): {total_reward_f}")
    print(f"Total Reward (Agent M): {total_reward_m}")
    print(f"Total Terminal States Reached: {terminal_states_reached}")
    if steps_per_run:
        print(f"Avg steps per run: {sum(steps_per_run) / len(steps_per_run):.2f}")
    print(f"Total Q-Table size (F): {len(q_table_f)} states")
    print(f"Total Q-Table size (M): {len(q_table_m)} states")

In [20]:
if __name__ == "__main__":
    run_simulation()

# You can add visualization code here, e.g.:
print("\n--- Q-Table (Agent F) Sample ---")
for i, (state, actions) in enumerate(q_table_f.items()):
    if i > 10: break
    print(f"State: {state}")
    print(f"  Actions: {actions}")

World and agents reset.
Starting simulation: 8000 steps.
Initial policy: PRANDOM
--- Step 166: Terminal state 1 reached in 167 steps! ---
World and agents reset.
--- Step 483: Terminal state 2 reached in 317 steps! ---
World and agents reset.
--- Step 500: Switching policy to PEXPLOIT ---
--- Step 710: Terminal state 3 reached in 227 steps! ---
World and agents reset.
--- Step 850: Terminal state 4 reached in 140 steps! ---
World and agents reset.
--- Step 1002: Terminal state 5 reached in 152 steps! ---
World and agents reset.
--- Step 1265: Terminal state 6 reached in 263 steps! ---
World and agents reset.
--- Step 1459: Terminal state 7 reached in 194 steps! ---
World and agents reset.
--- Step 1650: Terminal state 8 reached in 191 steps! ---
World and agents reset.
--- Step 1819: Terminal state 9 reached in 169 steps! ---
World and agents reset.
--- Step 2089: Terminal state 10 reached in 270 steps! ---
World and agents reset.
--- Step 2275: Terminal state 11 reached in 186 steps! 