# Testing environment complexity on learning.

We did a simple q-learning agent to understand the basics. Now we will write code that is used to benchmark the performance of the agent across different environments of different complexities.

This code can be found in the Gymnasium Documentation as well. You can access it [here](https://gymnasium.farama.org/tutorials/training_agents/FrozenLake_tuto/#sphx-glr-tutorials-training-agents-frozenlake-tuto-py).

We have properly gone through the code and understood before adding it to our project as it helps us to highlight the key observations we made throughout our project in the form of plots and graphs.

## Initial Setup
Install the required libraries.
These include the following:

- `gymnasium`: for the environment

- `matplotlib`: for plotting

- `numpy`: for arrays

- `pandas`: for dataframes

- `seaborn`: for heatmaps

- `tqdm`: for progress bars

In [None]:
! pip install matplotlib numpy pandas seaborn tqdm gymnasium

Next we import them into our project.

In [8]:
from pathlib import Path  # to work with file paths
from typing import NamedTuple  # for type safety and for better code documentation

import matplotlib.pyplot as plt  # for plotting purposes
import numpy as np  # to work with arrays
import pandas as pd  # to work with DataFrames
import seaborn as sns  # to make heatmaps
from tqdm import tqdm  # to visualise iterables like loops in the form of a progress bar

import gymnasium as gym  # reinforcement learning library
# to make custom sized maps
from gymnasium.envs.toy_text.frozen_lake import generate_random_map

sns.set_theme()  # setting the deffault theme

Next we create a class called Params which contains all the parameters we will use in our code. This inherits from the NamedTuple class to ensure type safety. Then we initialise it with some default values.

In [9]:
class Params(NamedTuple):
    total_episodes: int  # Total episodes
    learning_rate: float  # Learning rate
    gamma: float  # Discounting rate
    epsilon: float  # Exploration probability
    map_size: int  # Number of tiles of one side of the squared environment
    seed: int  # Define a seed so that we get reproducible results
    is_slippery: bool  # If true the player will move in intended direction with probability of 1/3 else will move in either perpendicular direction with equal probability of 1/3 in both directions
    n_runs: int  # Number of runs
    action_size: int  # Number of possible actions
    state_size: int  # Number of possible states
    proba_frozen: float  # Probability that a tile is frozen
    savefig_folder: Path  # Root folder where plots are saved


params = Params(
    total_episodes=1000,
    learning_rate=0.8,
    gamma=0.95,
    epsilon=0.1,
    map_size=5,
    seed=123,
    is_slippery=False,
    n_runs=20,
    action_size=None,
    state_size=None,
    proba_frozen=0.9,
    savefig_folder=Path("plots"),
)

Next we intialise the random number generator using the seed. We also make a directory to save our plots.

In [10]:
rng = np.random.default_rng(params.seed)

params.savefig_folder.mkdir(parents=True, exist_ok=True)

Now we create a Qlearning class which we will use to train our agent. We also create an EpsilonGreedy class which we will use to explore or exploit our agent.

In [11]:
class Qlearning:
    def __init__(self, learning_rate, gamma, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.reset_qtable()

    def update(self, state, action, reward, new_state):
        delta = (
            reward
            + self.gamma * np.max(self.qtable[new_state, :])
            - self.qtable[state, action]
        )
        q_update = self.qtable[state, action] + self.learning_rate * delta
        return q_update

    def reset_qtable(self):
        self.qtable = np.zeros((self.state_size, self.action_size))

    def adjust_reward(self, reward):
        return reward

class EpsilonGreedy:
    def __init__(self, epsilon):
        self.epsilon = epsilon

    def choose_action(self, action_space, state, qtable):
        explor_exploit_tradeoff = rng.uniform(0, 1)

        if explor_exploit_tradeoff < self.epsilon:
            action = action_space.sample()

        else:

            if np.all(qtable[state, :]) == qtable[state, 0]:
                action = action_space.sample()
            else:
                action = np.argmax(qtable[state, :])
        return action


The class Qlearning contains the following functions:

- `update`: Updates the Q-table for a given state-action pair based on the reward and the maximum Q-value of the next state.

- `reset_qtable`: Resets the Q-table to zero.

- `adjust_reward`: Reward shaping can be done here in order to improve the performance of the agent. But for now we are using the baseline reward structure.

The class EpsilonGreedy contains the following function:

- `choose_action`: Choose an action based on the exploration-exploitation tradeoff.


Before visualising, we can process the data collected into dataframes which are easier to use with seaborn.

In [12]:
def postprocess(episodes, params, rewards, steps, map_size):
    res = pd.DataFrame(
        data={
            "Episodes": np.tile(episodes, reps=params.n_runs),
            "Rewards": rewards.flatten(),
            "Steps": steps.flatten(),
        }
    )
    res["cum_rewards"] = rewards.cumsum(axis=0).flatten(order="F")
    res["map_size"] = np.repeat(f"{map_size}x{map_size}", res.shape[0])

    st = pd.DataFrame(data={"Episodes": episodes, "Steps": steps.mean(axis=1)})
    st["map_size"] = np.repeat(f"{map_size}x{map_size}", st.shape[0])
    return res, st


Now the visualisation functions which we will use to visualise our agent's performance as graphs and heatmaps are created. 
First is the function to extract the directions from the Q-table.

In [13]:
def qtable_directions_map(qtable, map_size):
    qtable_val_max = qtable.max(axis=1).reshape(map_size, map_size)
    qtable_best_action = np.argmax(qtable, axis=1).reshape(map_size, map_size)
    directions = {0: "←", 1: "↓", 2: "→", 3: "↑"}
    qtable_directions = np.empty(qtable_best_action.flatten().shape, dtype=str)
    eps = np.finfo(float).eps
    for idx, val in enumerate(qtable_best_action.flatten()):
        if qtable_val_max.flatten()[idx] > eps:

            qtable_directions[idx] = directions[val]
    qtable_directions = qtable_directions.reshape(map_size, map_size)
    return qtable_val_max, qtable_directions

Next is the function to create the heatmap of the Q-table and then the distribution graph for states and actions.

In [14]:
def plot_q_values_map(qtable, env, map_size):
    qtable_val_max, qtable_directions = qtable_directions_map(qtable, map_size)

    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))
    ax[0].imshow(env.render())
    ax[0].axis("off")
    ax[0].set_title("Last frame")

    sns.heatmap(
        qtable_val_max,
        annot=qtable_directions,
        fmt="",
        ax=ax[1],
        cmap=sns.color_palette("Blues", as_cmap=True),
        linewidths=0.7,
        linecolor="black",
        xticklabels=[],
        yticklabels=[],
        annot_kws={"fontsize": "xx-large"},
    ).set(title="Learned Q-values\nArrows represent best action")
    for _, spine in ax[1].spines.items():
        spine.set_visible(True)
        spine.set_linewidth(0.7)
        spine.set_color("black")
    img_title = f"frozenlake_q_values_{map_size}x{map_size}.png"
    fig.savefig(params.savefig_folder / img_title, bbox_inches="tight")
    plt.show()

def plot_states_actions_distribution(states, actions, map_size):
    labels = {"LEFT": 0, "DOWN": 1, "RIGHT": 2, "UP": 3}

    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))
    sns.histplot(data=states, ax=ax[0], kde=True)
    ax[0].set_title("States")
    sns.histplot(data=actions, ax=ax[1])
    ax[1].set_xticks(list(labels.values()), labels=labels.keys())
    ax[1].set_title("Actions")
    fig.tight_layout()
    img_title = f"frozenlake_states_actions_distrib_{map_size}x{map_size}.png"
    fig.savefig(params.savefig_folder / img_title, bbox_inches="tight")
    plt.show()


We also have a function which will plot the sum of cumulative rewards and average number of steps over episodes for each map size which gives us an idea of how well the agent is training.

In [15]:
def plot_steps_and_rewards(rewards_df, steps_df):
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 5))
    sns.lineplot(
        data=rewards_df, x="Episodes", y="cum_rewards", hue="map_size", ax=ax[0]
    )
    ax[0].set(ylabel="Cumulated rewards")

    sns.lineplot(data=steps_df, x="Episodes",
                 y="Steps", hue="map_size", ax=ax[1])
    ax[1].set(ylabel="Averaged steps number")

    for axi in ax:
        axi.legend(title="map size")
    fig.tight_layout()
    img_title = "frozenlake_steps_and_rewards.png"
    fig.savefig(params.savefig_folder / img_title, bbox_inches="tight")
    plt.show()


With all that out of the way we can write the run function which will train our agent for multiple runs to account for the randomness. The run function will then save the Q-table after each run along with episode rewards and the number of steps taken for each episode.<br>

We then write the code to call all the functions defined to train our agent for different map sizes.

In [None]:
def run_env():
    rewards = np.zeros((params.total_episodes, params.n_runs))
    steps = np.zeros((params.total_episodes, params.n_runs))
    episodes = np.arange(params.total_episodes)
    qtables = np.zeros((params.n_runs, params.state_size, params.action_size))
    all_states = []
    all_actions = []

    for run in range(params.n_runs):
        learner.reset_qtable()

        for episode in tqdm(
            episodes, desc=f"Run {run}/{params.n_runs} - Episodes", leave=False
        ):
            state = env.reset(seed=params.seed)[0]
            step = 0
            done = False
            total_rewards = 0

            while not done:
                action = explorer.choose_action(
                    action_space=env.action_space, state=state, qtable=learner.qtable
                )

                all_states.append(state)
                all_actions.append(action)

                new_state, reward, terminated, truncated, info = env.step(
                    action)
                done = terminated or truncated
                adjusted_reward = learner.adjust_reward(reward)

                learner.qtable[state, action] = learner.update(
                    state, action, adjusted_reward, new_state
                )

                total_rewards += adjusted_reward
                step += 1

                state = new_state

            rewards[episode, run] = total_rewards
            steps[episode, run] = step
        qtables[run, :, :] = learner.qtable

    return rewards, steps, episodes, qtables, all_states, all_actions

map_sizes = [4, 7, 9, 11]
res_all = pd.DataFrame()
st_all = pd.DataFrame()

for map_size in map_sizes:
    env = gym.make(
        "FrozenLake-v1",
        is_slippery=params.is_slippery,
        render_mode="rgb_array",
        desc=generate_random_map(
            size=map_size, p=params.proba_frozen, seed=params.seed
        ),
    )

    params = params._replace(action_size=env.action_space.n)
    params = params._replace(state_size=env.observation_space.n)
    env.action_space.seed(
        params.seed
    )
    learner = Qlearning(
        learning_rate=params.learning_rate,
        gamma=params.gamma,
        state_size=params.state_size,
        action_size=params.action_size,
    )
    explorer = EpsilonGreedy(
        epsilon=params.epsilon,
    )

    print(f"Map size: {map_size}x{map_size}")
    rewards, steps, episodes, qtables, all_states, all_actions = run_env()
    print(f"Average Steps {np.mean(steps)} for map {map_size}x{map_size}")
    print(f"Average Rewards {np.mean(rewards)} for map {map_size}x{map_size}")

    res, st = postprocess(episodes, params, rewards, steps, map_size)
    res_all = pd.concat([res_all, res])
    st_all = pd.concat([st_all, st])
    qtable = qtables.mean(axis=0)

    plot_states_actions_distribution(
        states=all_states, actions=all_actions, map_size=map_size
    )
    plot_q_values_map(qtable, env, map_size)

    env.close()

plot_steps_and_rewards(res_all, st_all)

## How do we interpret the findings from the plots and graphs?
The initial two plots give us an idea of agent behavior not necessarily how well it performs. The next set of images will tell us a bit about policy performance. If the last frame shows the agent at the goal position, it usually means that the policy derived during training is good. However, this is not always the case but it offers at least some idea about our training. The heatmap provides us with a visual representation of the policy generated by the agent after training. The arrows represent the best action at that state.<br>

The final graph summarises the learning behavior of our agent over all the map sizes we have trained it on. The sum of cumulative rewards should increase if our agent is learning and the average number of steps should decrease if the agent is learning. This will also give us an idea of learning behaviour with environmental complexity.
