**<p>Action:</p>**

We have 5 actions: going left, right, down, up, and collecting an item. The action other than collecting items is generally available to all states (except where it’s inappropriate e.g. at the right-top corner, only available actions should be left and down actions). The action to collect the item is only available at the first time visiting the item location.

In [None]:
from enum import Enum


class Action(Enum):
    # NOTE: QValue matrix used these action values as their indices
    LEFT = 0
    RIGHT = 1
    DOWN = 2
    UP = 3

    # actions when agent just got the item and is moving to item_reached state
    COLLECT = 4  # goes to item reached state

In [None]:
import matplotlib
matplotlib.use('TkAgg')

**<p>State</p>**

We design a state space with 5 dimensions: (agent_location_x, agent_location_y, is_item_collected, item_location_x, item_location_y). Therefore, our q-value table has a shape of (5, 5, 2, 5, 5). Some points to note:
<ol><li>To fairly train states for all item locations, and because they are independent, we train these q-tables separately. Therefore, the agent is trained with one Q Table that consists of (agent_location_x, agent_location_y, is_item_collected) and this is repeated for environments with different pairs of (item_location_x, item_location_y).</li>
<li>Because the item should not be at the goal position, the number of possible item locations should be 5*5 - 1 instead of 5*5. However, numpy must have the same array size across one dimension; therefore, we simply allocate 5*5 item space and then do not use the item position state corresponding to the position of the item.</li>
<li>Technically, we only need one (agent_x, agent_y) table for all item positions after visiting the item location, because the state space after collecting the item is independent of the position of the item. However, to simplify implementation (and because we did not have time), we have a redundant (agent_x, agent_y) array for all item positions of the item-collected dimension.</li></ol>

<br>

**<p>Reward Structure</p>**

For the reward structure, please see get_reward function of the Environment class. We give an item collection reward of 200 iff it is the first time to visit where the item is (to avoid giving a reward at second visit to the item position). We give a reward of 300 for the goal state. Note that the goal state only exists in the item_collected dimension and not in non item_collected dimension (as reaching a goal position before collecting the item should be disallowed). Other than those two cases, we give a time penalty of -1.

<br>

**<p>Visualization</p>**

In the environment, we start by taking in 2 parameters, n (for the grid size for n x n dimensions) and with_animation which is a boolean value if we want to show animation or not.

In the defined **initialize_for_new_episode** function, we create the plot using matplotlib.and call the animate function to setup the elements inside the plot.


In the defined **animate** function,

<li>Sets up the grid of size n x n.</li>
<li>It adds in the 3 icons, A - “person”, I - “item” , G - “Goal”</li>
<li>The above is also represented in a legend.</li>
<li>In the grid we have it so that G is always positioned in the bottom right corner.</li>
<li>When the agent picks up the Item the colour changes and legend will reflect that.</li>
<li>We add a little delay of 0.5 sec, so that the changes can be made on the grid.</li>

In the defined **step** function,

<li>This is the function that causes the movement.</li>
<li>The agent will choose an action.</li>
<li>Perform the movement if possible and update its state,after this it calls the animate function to show the new state.</li>

By using the visualisation we can see that in the early stages of learning the agent was performing random actions and exploring more, moving further away from the two goal positions, but after training many episodes, the agent grew smarter and makes quick movements straight towards the item, picks up the item and moves to the final goal position G.








In [None]:
from __future__ import annotations
from abc import ABC
from random import randint

import matplotlib.axis
import numpy as np
import matplotlib
import matplotlib.pyplot as plt


DEFAULT_TIME_PENALTY = -1
GOAL_STATE_REWARD = 300
DEFAULT_ITEM_REWARD = 200


class Environment:
    def __init__(
        self,
        n: int = 5,
        fig: matplotlib.figure.Figure | None = None,
        ax: matplotlib.axes.Axes | None = None,
        item: ItemObject | None = None,
        goal_location: tuple[int, int] = (4, 0),
        time_penalty: int | float = DEFAULT_TIME_PENALTY,
        item_state_reward: int | float = DEFAULT_ITEM_REWARD,
        goal_state_reward: int | float = GOAL_STATE_REWARD,
        with_animation: bool = True,
    ) -> None:
        self.n = n
        self.goal_location = goal_location
        self.time_penalty = time_penalty
        self.item_state_reward = item_state_reward
        self.goal_state_reward = goal_state_reward

        self.item = ItemObject() if item is None else item
        self.agent = AgentObject()
        self.fig = fig
        self.ax = ax

        if self.item.location is None:
            self.item.set_location_randomly(self.n, self.n)

        self.state: State
        # TODO: possibly implmeent this if there are multiple GridObjects to check for
        # initialize grid and put grid objects on the grid
        # x_agent, y_agent = self.agent.location
        # x_item, y_item = self.item.location
        # self.grid = np.zeros((self.n, self.n))
        # self.grid[x_agent, y_agent] = self.agent
        # self.grid[x_item, y_item] = self.item

        # Setup for animation
        self.with_animation = with_animation

    def initialize_for_new_episode(self, agent_location: tuple[int, int] | None = None) -> None:
        if agent_location is None:
            self.agent.set_location_randomly(self.n, self.n, [self.item.get_location()])
        else:
            self.agent.location = agent_location
        self.agent.has_item = False
        self.state = State(
            agent_location=self.agent.get_location(),
            item_location=self.item.get_location(),
            has_item=self.agent.has_item,
        )
        self.animate()  # Initial drawing of the grid

    def get_state(self) -> State:
        return self.state

    def set_with_animation(self, with_animation: bool) -> None:
        self.with_animation = with_animation

    def get_available_actions(self) -> list[Action]:
        """
        Assumes that the current state is not the goal state
        """
        # logic to determine available actions
        actions = []
        current_state = self.get_state()
        x, y = current_state.agent_location

        if current_state.agent_location == current_state.item_location and not current_state.has_item:
            actions.append(Action.COLLECT)

        # note: technically speaking we know that whenever agent is at the item location, the only available (or, the most optimal) action is to collect the item
        # however, according to the CE, we must ensure that
        # "the agent is supposed to learn (rather than being told) that
        # once it has picked up the load it needs to move to the delivery point to complete its mission. ",
        # implyging that agent must be able to learn to "collect" instead of being told to collect (so add all possible actions)
        if x > 0:
            actions.append(Action.LEFT)  # left
        if x < self.n - 1:
            actions.append(Action.RIGHT)  # right
        if y > 0:
            actions.append(Action.DOWN)  # down
        if y < self.n - 1:
            actions.append(Action.UP)  # up

        return actions

    def get_reward(self, prev_state: State, current_state: State):
        """
        We can actually use self.state but to make it more explicit, we pass the states as an argument
        """
        # TODO: technically, i think it should accept (prev state, action, next state)

        # we ensure that Agent reveives item collection reward iff it has collected the item and is at the item location
        # or else, in the item collected space, agent receives high reward by going back to where the item was (which is already collected so wrong)
        if (
            prev_state.agent_location == current_state.item_location
            and current_state.agent_location == current_state.item_location
            and current_state.has_item
        ):
            return self.item_state_reward
        elif self.is_goal_state(current_state):
            return self.goal_state_reward
        else:
            return self.time_penalty

    def update_state(self, action: Action) -> None:
        """
        Be careful: this method updates the state of the environment
        """
        self.agent.move(action)
        self.state = State(
            agent_location=self.agent.get_location(),
            item_location=self.item.get_location(),
            has_item=self.agent.has_item,
        )

    def is_goal_state(self, state: State) -> bool:
        return self.state.has_item and self.goal_location == state.agent_location

    def animate(self):
        if not self.with_animation:
            return
        self.ax.clear()
        self.ax.set_xlim(0, self.n)
        self.ax.set_ylim(0, self.n)
        self.ax.set_xticks(np.arange(0, self.n + 1, 1))
        self.ax.set_yticks(np.arange(0, self.n + 1, 1))
        self.ax.grid(True)

        # Plotting the agent, item, and goal
        self.ax.text(
            self.agent.location[0] + 0.5,
            self.agent.location[1] + 0.5,
            "A",
            ha="center",
            va="center",
            fontsize=16,
            color="blue" if not self.agent.has_item else "purple",
        )
        self.ax.text(
            self.item.location[0] + 0.5,
            self.item.location[1] + 0.5,
            "I",
            ha="center",
            va="center",
            fontsize=16,
            color="green",
        )
        self.ax.text(
            self.goal_location[0] + 0.5,
            self.goal_location[1] + 0.5,
            "G",
            ha="center",
            va="center",
            fontsize=16,
            color="red",
        )

        # TODO: add a message saying "item collected" if the agent has collected the item
        # or else there is a single frame where the agent is at the same location twice,
        # so it looks like the agent is not moving
        handles = [
            plt.Line2D([0], [0], marker="o", color="w", markerfacecolor="blue", markersize=8, label="Agent (A)")
            if not self.agent.has_item
            else plt.Line2D(
                [0], [0], marker="o", color="w", markerfacecolor="purple", markersize=8, label="Agent (A) with item"
            ),
            plt.Line2D([0], [0], marker="o", color="w", markerfacecolor="green", markersize=8, label="Item (I)"),
            plt.Line2D([0], [0], marker="o", color="w", markerfacecolor="red", markersize=8, label="Goal (G)"),
        ]
        self.ax.legend(handles=handles, loc="center left", bbox_to_anchor=(1, 0.5))

        plt.subplots_adjust(right=0.75, left=0.1)
        self.fig.canvas.draw_idle()
        plt.pause(0.5)  # Pause to allow visualization of the movement

    def step(self, action: Action) -> tuple[float, State]:
        prev_state = self.get_state()
        self.update_state(action)
        next_state = self.get_state()
        self.animate()
        reward = self.get_reward(prev_state, next_state)
        return reward, next_state


class GridObject(ABC):
    def __init__(self, location: tuple[int, int] | None = None) -> None:
        self.icon: str
        self.location = (
            location  # NOTE: location is a tuple of (x, y) where x and y are coordinates on the grid (not indices)
        )

    def set_location_randomly(
        self, max_x: int, max_y: int, disallowed_locations: list[tuple[int, int]] = []
    ) -> tuple[int, int]:
        """
        Note: max_x and max_y are exclusive

        disallowed_locations: list of locations that are not allowed to be placed
        (e.g. agent and item location should not be initialized to the same place)
        """
        # The start, item, goal location must be different position
        location = None
        while location is None or location in disallowed_locations:
            location = (randint(0, max_x - 1), randint(0, max_y - 1))

        self.location = location
        return location

    def get_location(self) -> tuple[int, int]:
        if self.location is None:
            raise ValueError("Location is not set")
        return self.location


class AgentObject(GridObject):
    def __init__(self, location: tuple[int, int] | None = None) -> None:
        super().__init__(location)
        self.icon = "A"
        self.has_item = False  # TODO: has_item of AgentObject and State must be synched somehow

    def move(self, action: Action) -> None:
        # NOTE: assumes that action is valid (i.e. agent is not at the edge of the grid)
        if self.location is None:
            raise ValueError("Agent location is not set")

        x, y = self.location
        if action == Action.LEFT:
            self.location = (x - 1, y)  # left
        elif action == Action.RIGHT:
            self.location = (x + 1, y)  # right
        elif action == Action.DOWN:
            self.location = (x, y - 1)  # down
        elif action == Action.UP:
            self.location = (x, y + 1)  # up
        elif action == Action.COLLECT:
            self.has_item = True


class ItemObject(GridObject):
    def __init__(self, location: tuple[int, int] | None = None):
        super().__init__(location)
        self.icon = "I"


In [None]:
from dataclasses import dataclass


@dataclass
class State:
    # it doesn not hold AgentObject / ItemObject because I want State to be immutable
    # but in the future, we might want to add more attributes to State
    # in that case we need to make a copy of the AgentObject / ItemObject
    agent_location: tuple[int, int]
    item_location: tuple[int, int]

    has_item: bool = False

**<p>Learning Process:</p>**
In tabular Q-learning, an agent will learn to make optimal decisions by estimating the expected rewards for state-action pairs through iteratively updating Q-values. Our training process are following:

<ol><li>Initialize the Q-value matrix for the dimension explained above, (agent_location_x, agent_location_y, is_item_collected).</li>
<li>Multiple environments with different item positions of  (item_location_x, item_location_y) are initialized.</li>
<li>For each environment, we train the Q Value Matrix, using hyper-parameters such as learning rate, discount factor, epsilon (exploration rate), and the number of episodes. The training is managed by the Trainer class while updating Q Value and choosing the best action is handled by the Agent class.</li>
<li>With the epsilon-greedy policy, the agent will occasionally take random actions to explore new possibilities while mostly choosing actions with the highest estimated rewards while exploring the environment.</li>
<li>Following the Q-learning update rule, the agent updates its Q-values based on the rewards received and the estimated future rewards.</li>
<li>Once a state is explored, the agent will move to the next stage and repeat until the goal is reached.</li>
<li>Steps 3 to 6 are repeated for a predetermined number of episodes until the maximum number of episodes is reached.</li>
<li>After training, the agent's performance is evaluated by testing its behaviour in environments and visualising its path to understand its decision-making.</li>


In [None]:
import numpy as np
import pickle
import random

class QValueMatrix:
    """
    Abstracts the Q-value matrix for the agent,
    to hide different q value matrices for different states and action handling
    """
    def __init__(self, x_max: int, y_max: int, num_max_actions: int) -> None:
        # TODO: check item_location and goal_location are within the grid
        # TODO: the way we are stroing the q values is memory inefficient in a way that
        # not all state will have all actions (we are storing 0 for those)

        self.start_to_item = np.zeros((x_max, y_max, num_max_actions))
        self.item_to_goal = np.zeros((x_max, y_max, num_max_actions))

    def get_state_qvals(self, state: State, actions: list[Action] | Action = []) -> np.ndarray:
        """
        Returns Q(S), or Q(S, A) if actions are provided
        """
        if isinstance(actions, Action):
            actions = [actions]

        x, y = state.agent_location
        if state.has_item:
            return self.item_to_goal[x, y] if not actions else self.item_to_goal[x, y, [action.value for action in actions]]
        else:
            return self.start_to_item[x, y] if not actions else self.start_to_item[x, y, [action.value for action in actions]]

    def update_qval(self, state: State, action: Action, new_qval: float) -> None:
        """
        Updates the Q value for a state-action pair, i.e. Q(S, A) = new_qval
        """
        x, y = state.agent_location
        if state.has_item:
            self.item_to_goal[x, y, action.value] = new_qval
        else:
            self.start_to_item[x, y, action.value] = new_qval

    def increase_qval(self, state: State, action: Action, increment: float) -> None:
        """
        Increases the Q value for a state-action pair, i.e. Q(S, A) += increment
        """
        x, y = state.agent_location
        if state.has_item:
            self.item_to_goal[x, y, action.value] += increment
        else:
            self.start_to_item[x, y, action.value] += increment


def generate_grid_location_list(max_x: int, max_y) -> list[tuple[int, int]]:
    return [(i, j) for i in range(max_x) for j in range(max_y)]


def save_trained_qval_matrix(trained_qval_matrix: QValueMatrix, item: ItemObject) -> None:
    if item.location is None:
        raise ValueError("Item location is None")
    with open(f'qval_matrix{item.location[0]}_{item.location[1]}.pickle', "wb") as f:
        pickle.dump(trained_qval_matrix, f)


class Agent:
    def __init__(
        self,
        alpha: float = 0.3,
        discount_rate: float = 0.9,
        epsilon: float = 0.1,
        num_episode_per_intermediate_item: int = 1000,
        grid_size: tuple[int, int] = (5, 5),
        save_weights: bool = False,
    ) -> None:
        self.alpha = alpha  # learning rate
        self.epsilon = epsilon  # exploration rate
        self.discount_rate = discount_rate
        self.num_episode_per_intermediate_item = num_episode_per_intermediate_item
        self.grid_size = grid_size
        self.save_weights = save_weights

        self.trained_qval_matrices: list[QValueMatrix] = []

    def update(self, current_state: State, next_state: State, reward: float, action: Action, qval_matrix: QValueMatrix) -> None:
        qval_difference: float = self.alpha * (
            reward
            + self.discount_rate * np.max(qval_matrix.get_state_qvals(next_state))
            - qval_matrix.get_state_qvals(current_state, actions=action)
        )
        qval_matrix.increase_qval(current_state, action, qval_difference)

    def choose_action(self, possible_actions: list[Action], state: State, qval_matrix: QValueMatrix, is_training: bool = True) -> Action:
        """
        Epislon greedy method to choose action
        """
        if is_training and random.random() < self.epsilon:
            return random.choice(possible_actions)
        else:
            action_to_qval = list(zip(possible_actions, qval_matrix.get_state_qvals(state, actions=possible_actions)))
            random.shuffle(action_to_qval)  # to break ties randomly
            return max(action_to_qval, key=lambda x: x[1])[0]


class Trainer:
    def __init__(self, agent: Agent, envs: list[Environment]) -> None:
        self.agent = agent
        self.environments = envs

    def train_one(self, num_episodes: int, env: Environment) -> QValueMatrix:
        """
        Conducts training for a given number of episodes.
        """
        qval_matrix = QValueMatrix(self.agent.grid_size[0], self.agent.grid_size[1], len(Action))

        for episode in range(num_episodes):
            env.initialize_for_new_episode()
            current_state = env.get_state()
            while not env.is_goal_state(current_state):
                possible_actions = env.get_available_actions()
                action = self.agent.choose_action(possible_actions, current_state, qval_matrix)
                reward, next_state = env.step(action)
                self.agent.update(current_state, next_state, reward, action, qval_matrix)
                current_state = next_state

        return qval_matrix

    def train(self) -> None:
        """
        We are training for all "goal location" in the grid; so indivisual state consists of x, y, goal_x, goal_y, technically speaking.
        However, to ensure that the agent samples from all possible goal locations fairly, we will separately train for all possible goal locations.
        """

        for env in self.environments:
            qval_matrix = self.train_one(self.agent.num_episode_per_intermediate_item, env)

            # Store the trained Q-value matrix in the agent
            self.agent.trained_qval_matrices.append(qval_matrix)

            if self.agent.save_weights:
                save_trained_qval_matrix(qval_matrix, env.item)

### Metric:
$$ \frac{1}{number\ of\ episodes}\sum_{item\ location}\sum_{agent\ location}{\frac{M(agent\ location, item\ location) + 1 + M(item\ location, goal\ location)}{number\ of\ steps\ taken}} $$
where M represents Manhattan distance

  We began by training our Q-learner for all possible item locations and a fixed goal position. After training, we evaluated the agent's performance by calculating the ratio of the shortest possible distance (using Manhattan distance) to the actual number of steps taken by the agent from the start position, through the item, and finally reaching the goal (shortest distance / actual distance taken by the agent). This calculation was done for all possible item and agent locations within the environment. A ratio of 1 indicates that the actual path matches the shortest path, while a ratio less than 1 suggests a longer path was taken. By averaging these ratios across all possible scenarios, we obtained an average value greater than 0.975, indicating that our Q-learner effectively learns the optimal paths in the environment. Note that 1 in the metric represents additional step to pick up item.


In [None]:
import random

from tqdm import tqdm


class Evaluation:
    def __init__(self, n=5) -> None:
        self.n = n
        self.agent = Agent(num_episode_per_intermediate_item=1000)
        item_grid_locations = generate_grid_location_list(self.n, self.n)
        all_items = [ItemObject(grid_location) for grid_location in item_grid_locations]
        fig, ax = plt.subplots(figsize=(8, 8))
        self.envs = [Environment(item = item, with_animation=False, fig = fig, ax = ax) for item in all_items]

    def run_train(self) -> None:
        """
        Trains the agent in the environment and returns the trained agent.
        """
        trainer = Trainer(self.agent, self.envs)
        trainer.train()

    @staticmethod
    def calculate_manhattan_distance(start_location: tuple[int, int], goal_location: tuple[int, int]) -> int:
        """
        Calculates the Manhattan distance between two points.
        """
        start_x, start_y = start_location
        goal_x, goal_y = goal_location
        return abs(start_x - goal_x) + abs(start_y - goal_y)

    @staticmethod
    def calculate_metrics_score(shortest_distance: int, distance: int) -> float:
        """
        Calculates the proportion of the Q-learning distance to the shortest distance.
        """
        return shortest_distance / distance

    def visualize(self, num_of_vis: int = 5) -> None:
        """
        Visualize the path after trained
        """
        env_indices = random.sample(range(1, self.n*self.n), num_of_vis)
        for index in env_indices:
            env = self.envs[index]
            env.set_with_animation(True)
            env.initialize_for_new_episode()

            # Run the agent in the environment
            current_state = env.get_state()
            while not env.is_goal_state(current_state):
                possible_actions = env.get_available_actions()
                action = self.agent.choose_action(possible_actions, current_state, self.agent.trained_qval_matrices[index], is_training=False)
                _, next_state = env.step(action)
                current_state = next_state

    def performance_test(self):
        """
        Conducts a performance test ()
        """
        num_episodes = 0
        total_score = 0
        for env in tqdm(self.envs):
            env.set_with_animation(False)
            for agent_locaiton in tqdm(generate_grid_location_list(self.n, self.n)):
                if agent_locaiton == env.item.location or agent_locaiton == env.goal_location:
                    continue

                env.initialize_for_new_episode(agent_location=agent_locaiton)
                start_location = env.agent.location  # Get the start location of the agent
                item_location = env.item.location  # Get intermediate location of the item

                # Calculate shortest distance from start to item to goal
                shortest_distance = (
                    self.calculate_manhattan_distance(start_location, item_location)
                    + 1
                    + self.calculate_manhattan_distance(item_location, env.goal_location)
                )

                current_state = env.get_state()
                num_steps = 0
                while not env.is_goal_state(current_state):
                    possible_actions = env.get_available_actions()
                    item_x, item_y = current_state.item_location
                    action = self.agent.choose_action(possible_actions, current_state, self.agent.trained_qval_matrices[self.n*item_x+item_y], is_training=False)
                    _, next_state = env.step(action)
                    current_state = next_state
                    num_steps += 1

                # Calculate and accumulate the score
                total_score += self.calculate_metrics_score(shortest_distance, num_steps)

                num_episodes += 1

        # Return the average score across all tests
        return total_score / num_episodes

if __name__ == "__main__":
    evl = Evaluation()
    evl.run_train()

    # Conduct the performance test
    average_score = evl.performance_test()
    print(f"Average performance score (1 is the best): {average_score:.4f}")

    # visualize randomly the environments and show the steps of the agent
    evl.visualize()

  self.start_to_item[x, y, action.value] += increment
  self.item_to_goal[x, y, action.value] += increment
100%|██████████| 25/25 [00:00<00:00, 17909.07it/s]
100%|██████████| 25/25 [00:00<00:00, 21098.11it/s]
100%|██████████| 25/25 [00:00<00:00, 18883.05it/s]
100%|██████████| 25/25 [00:00<00:00, 18013.67it/s]
100%|██████████| 25/25 [00:00<00:00, 15168.18it/s]
100%|██████████| 25/25 [00:00<00:00, 22790.18it/s]
100%|██████████| 25/25 [00:00<00:00, 22187.39it/s]
100%|██████████| 25/25 [00:00<00:00, 22584.02it/s]
100%|██████████| 25/25 [00:00<00:00, 23989.38it/s]
100%|██████████| 25/25 [00:00<00:00, 16391.68it/s]
100%|██████████| 25/25 [00:00<00:00, 26879.67it/s]
100%|██████████| 25/25 [00:00<00:00, 27094.99it/s]
100%|██████████| 25/25 [00:00<00:00, 31479.32it/s]
100%|██████████| 25/25 [00:00<00:00, 24510.89it/s]
100%|██████████| 25/25 [00:00<00:00, 21030.41it/s]
100%|██████████| 25/25 [00:00<00:00, 32635.42it/s]
100%|██████████| 25/25 [00:00<00:00, 31794.30it/s]
100%|██████████| 25/25 [00

Average performance score (1 is the best): 0.9699
