# Reinforcement Learning

![alt text](https://upload.wikimedia.org/wikipedia/commons/thumb/1/1b/Reinforcement_learning_diagram.svg/300px-Reinforcement_learning_diagram.svg.png).

## Introduction

Reinforcement Learning is a special form of machine learning, where an agent interacts with an environment, conducts observations on the effects of actions and collects rewards.

The goal of reinforcement learning is to learn an optimal policy, so that given a state an agent is able to decide what it should do next.

In today's workshop we will look into three fundamental algorithms that are capable of solving MDPs, namely [Policy Iteration](https://en.wikipedia.org/wiki/Markov_decision_process#Policy_iteration), [Value Iteration](https://en.wikipedia.org/wiki/Markov_decision_process#Value_iteration), and [Q-Learning](https://en.wikipedia.org/wiki/Q-learning).

## Objectives

After this workshop you should know:

- The relevant pieces for a reinforcement learning system
- The basics of *[gym](https://gym.openai.com/envs/#classic_control)* to conduct your own RL experiments
- Why Policy Iteration can be slower than Value Iteration (remove this)
- The differences of value and policy iteration compared with Q-Learning
- How Q-Learning converges towards a stable policy

## MDP

A Markov decision process is a 4-tuple $(S,A,P_{a},R_{a})$

![MPD](mdp.png "MDP")

## Problem

Winter is here. You and your friends were tossing around a frisbee at the park when you made a wild throw that left the frisbee out in the middle of the lake. The water is mostly frozen, but there are a few holes where the ice has melted. If you step into one of those holes, you'll fall into the freezing water. At this time, there's an international frisbee shortage, so it's absolutely imperative that you navigate across the lake and retrieve the disc. (However, the ice is slippery, so you won't always move in the direction you intend.)

## Setup

To begin we'll need to install all the required python package dependencies.



In [None]:
#!pip install --quiet gym

### Imports and Helper Functions

#### Imports

In [None]:
from typing import Tuple, List
from enum import Enum

# Python imports
import random
import heapq
import collections

# Reinforcement Learning environments
import gymnasium as gym

# Scientific computing
import numpy as np

# Plotting library
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.colors import LinearSegmentedColormap

In [None]:
ALPHA_VALUE = 0.7
TEXT_FONT_SIZE = 10


class Action(Enum):
    """
    An enumeration for the possible actions in the FrozenLake environment.
    """

    LEFT = 0
    DOWN = 1
    RIGHT = 2
    UP = 3


def generate_checkerboard(
    img: np.ndarray, v: np.ndarray
) -> Tuple[np.ndarray, Tuple[int, int]]:
    """
    Generates a checkerboard pattern by mapping matrix V onto image img.
    """
    size_y, size_x = img.shape[:2]
    interpolation_factor_y = size_y // v.shape[0]
    interpolation_factor_x = size_x // v.shape[1]

    # Broadcasting the smaller matrix V to the size of img
    checkerboard = np.repeat(
        np.repeat(v, interpolation_factor_y, axis=0), interpolation_factor_x, axis=1
    )

    return checkerboard, (interpolation_factor_x, interpolation_factor_y)


def generate_colormap():
    """
    Generates a default colormap.
    """
    return LinearSegmentedColormap.from_list(
        "custom_cmap", [(0, "red"), (0.5, "white"), (1, "green")]
    )


def add_labels(
    ax, shape: Tuple[int, int], labels: List, interpolation_factors: Tuple[int, int]
):
    """
    Adds labels to the cells of the visualization.
    Parameters:
        ax (matplotlib.axes.Axes): The axes on which to add labels.
        shape (tuple): The shape of the grid.
        labels (list): The labels to add.
        interpolation_factors (tuple): The interpolation factors for positioning labels.
    """
    for i in range(shape[0]):
        for j in range(shape[1]):
            ax.text(
                interpolation_factors[0] * (j + 0.5),
                interpolation_factors[1] * (i + 0.5),
                labels[i, j],
                ha="center",
                va="center",
                fontsize=TEXT_FONT_SIZE,
                fontweight="bold",
                alpha=ALPHA_VALUE,
            )


def visualize_v(env, v: np.ndarray, ax, title: str) -> None:
    """Visualizes the value function v of the given environment."""
    v = v.reshape(env.unwrapped.desc.shape)
    V_img, interp_factors = generate_checkerboard(env.render(), v)
    V_img = ax.imshow(V_img, cmap=generate_colormap(), alpha=0.5)

    labels = np.vectorize(lambda x: f"{x:.2f}")(v)

    add_labels(ax, env.unwrapped.desc.shape, labels, interp_factors)

    visualize_env(env, ax, title)


def visualize_p(env, v: np.ndarray, p: np.ndarray, ax, title: str) -> None:
    """Visualizes the policy p and the of the given environment."""
    v = v.reshape(env.unwrapped.desc.shape)
    V_img, interp_factors = generate_checkerboard(env.render(), v)
    V_img = ax.imshow(V_img, cmap=generate_colormap(), alpha=0.5)

    s = np.arange(env.unwrapped.observation_space.n)
    labels = np.vectorize(lambda x: Action(p[x]).name)(s).reshape(env.unwrapped.desc.shape)

    add_labels(ax, env.unwrapped.desc.shape, labels, interp_factors)

    visualize_env(env, ax, title)


def visualize_env(env, ax, title: str) -> None:
    """
    Visualizes the FrozenLake environment.
    """
    ax.imshow(env.render(), alpha=ALPHA_VALUE)

    ax.axis("off")
    ax.set_title(title)


#### Helper Functions

In [None]:
# Define the default figure size
plt.rcParams["figure.figsize"] = [16, 4]

def compute_v_from_q(env, q: float) -> float:
    """Compute the v function given the q function, maximizing over the actions of a given state."""
    v = np.zeros(env.observation_space.n)
    i = 0
    for row in env.unwrapped.desc:
        j = 0
        for _ in row:
            s = i * env.unwrapped.desc.shape[0] + j
            v[s] = np.max(q[s, :])
            j += 1
        i += 1
    return v


def compute_policy_from_q(env, q: float) -> float:
    """Compute the policy function given the q function, finding the action that yields the maximum of a given state."""
    policy = np.zeros(env.observation_space.n)
    i = 0
    for row in env.unwrapped.desc:
        j = 0
        for _ in row:
            s = i * env.unwrapped.desc.shape[0] + j
            policy[s] = np.argmax(q[s, :])
            j += 1
        i += 1
    return policy

#### Policy Evaluation

In [None]:
def evaluate_episode(env, policy: np.ndarray, discount_factor: float) -> float:
    """Evaluates a policy by running it until termination and collect its reward"""
    state, _ = env.reset()
    total_return = 0
    step = 0
    while True:
        state, reward, done, _, _ = env.step(int(policy[state]))
        # Calculate the total
        total_return += discount_factor**step * reward
        step += 1
        if done:
            break
    return total_return


def evaluate_policy(
    env, policy: np.ndarray, discount_factor: float, number_episodes: int
) -> float:
    """Evaluates a policy by running it n times"""
    return np.mean(
        [evaluate_episode(env, policy, discount_factor) for _ in range(number_episodes)]
    )

#### Policy and Value Iteraton Parameters

In [None]:
# Set parameters
MAX_ITERATIONS = 1000
NUM_EPISODES = 100
DISCOUNT_FACTOR = 0.95

### Environment

In [None]:
# Deterministic environments
env_name = "FrozenLake-v1"
# env_name = 'FrozenLake8x8-v1'

Create the environment with the previously selected name

In [None]:
# env = gym.make(env_name)
env = gym.make(
    env_name,
    is_slippery=False,
    render_mode="rgb_array",
)

env.reset()
_, ax = plt.subplots()
visualize_env(env, ax=ax, title="Frozen Lake Environment")

#### Understanding the Environment (Object)

**TASK :**
Analyze the environment object and figure out its *observation-* and *actionspace* as well as its *reward range*.

What is the size of the observation space?

In [None]:
env.observation_space

What is the size of the action space?

In [None]:
env.action_space

What is the range of rewards?

In [None]:
env.reward_range

### Uncertainty in Execution

In [None]:
for action in Action:
    print('{:15} = {}'.format(action.name, action.value))

In [None]:
_, axs = plt.subplots(2, 3, figsize=(12, 6))

s, _ = env.reset()
print(f"the initial state is: {s}")
visualize_env(env, ax=axs[0, 0], title=f"Start | state is: {s}")

axs = axs.reshape(-1)[1:]
axs[-1].axis("off")

for action, ax in zip(Action, axs):
    env.reset()
    # skip the first axis
    print(f"executing action {action.value}, should go {action.name}")
    s1, r, d, _, _ = env.step(action.value)
    print(f"new state is: {s1} done: {d}")
    visualize_env(env, ax=ax, title=f"{action.name} | state is: {s1}")

## Policy Evaluation

In [None]:
def policy_evaluation(env, 
    policy: np.ndarray, discount_factor: float, mode: str
) -> Tuple[np.ndarray, int]:
    """Iteratively evaluate the value function under the given policy"""
    # Initialize the state value function
    v = np.zeros(env.observation_space.n)
    iteration = 0
    while True:
        iteration += 1
        prev_v = np.copy(v)
        for s in range(env.observation_space.n):
            if mode == "policy_iteration":
                v[s] = evaluate_action(env, s, v, prev_v, policy,discount_factor)
            elif mode == "value_iteration":
                v[s] = evaluate_max_action(env, s, v, prev_v, discount_factor)
        if np.sum((np.fabs(prev_v - v))) <= 1e-4:
            break
    return v, iteration


def evaluate_action(
        env,
    s: int,
    v: np.ndarray,
    prev_v: np.ndarray,
    policy: np.ndarray,
    discount_factor: float,
) -> float:
    # Retrieve the action under the current policy
    a = policy[s]
    expected_reward = 0
    expected_discounted_return = 0
    # Calculate the expected reward and the expected discounted return | p = probability
    for p, s1, r, _ in env.unwrapped.P[s][a]:
        ### TASK: define the expected_reward and the expected_discounted_return
        expected_reward += p * r
        expected_discounted_return += discount_factor * p * prev_v[s1]
    # Calculate the V-Value
    return expected_reward + expected_discounted_return


def evaluate_max_action(env, 
    s: int, v: np.ndarray, prev_v: np.ndarray, discount_factor: float
) -> float:
    # Initialize the action value function
    q = np.zeros([env.observation_space.n, env.action_space.n])
    # Iterate over each action
    for a in range(env.action_space.n):
        expected_reward = 0
        expected_discounted_return = 0
        # Calculate the expected reward and the expected discounted return | p = probability
        for p, s1, r, _ in env.unwrapped.P[s][a]:
            ### TASK: define the expected_reward and the expected_discounted_return
            expected_reward += p * r
            expected_discounted_return += discount_factor * p * prev_v[s1]
        # Calculate the Q-Value
        q[s, a] = expected_reward + expected_discounted_return
    ### TASK: define the value function and the policy with respect to q
    # Choose the max q value over all actions
    return np.max(q[s, :])

## Policy Improvement

In [None]:
def policy_improvement(env, 
    v: np.ndarray, policy: np.ndarray, discount_factor: float
) -> np.ndarray:
    """Improve the policy given a value-function"""
    # Initialize the policy
    policy = np.zeros(env.observation_space.n)
    # Initialize the action value function
    q = np.zeros([env.observation_space.n, env.action_space.n])
    for s in range(env.observation_space.n):
        for a in range(env.action_space.n):
            q[s, a] = np.sum(
                [p * (r + discount_factor * v[s1]) for p, s1, r, _ in env.unwrapped.P[s][a]]
            )
        policy[s] = np.argmax(q[s, :])
    return policy

## Policy Iteration
![Policy Iteration](policy_iteration.png "Policy Iteration")

### Algorithm

**TASK :**
Add the missing steps for the policy iteration algorithm.

In [None]:
def policy_iteration(
    env, discount_factor: float, max_iterations: int
) -> Tuple[np.ndarray, np.ndarray]:
    """Policy-Iteration algorithm"""
    # Initialize the policy
    policy = np.zeros(env.observation_space.n) * 2
    for i in range(max_iterations):
        # TASK: evaluate the current policy
        v, iteration = policy_evaluation(
            env, policy, discount_factor, "policy_iteration"
        )
        # TASK: define the new policy
        new_policy = policy_improvement(env, v, policy, discount_factor)

        new_v, _ = policy_evaluation(
            env, new_policy, discount_factor, "policy_iteration"
        )
        if np.all(policy == new_policy):
            print(f"Policy-Iteration converged at iteration #{i}")
            break
        # Plot the current policy
        title_p = f"Policy Improvement #{i+1}"
        title_v = f"#Policy Evaluations {iteration}"
        _, ax = plt.subplots(1, 2)
        visualize_v(env, v, ax[0], title_v)
        visualize_p(env, new_v, new_policy, ax[1], title_p)
        # visualize_env(env, v=new_v, p=None, fig=fig, ax=ax[0], title=title_v)
        # visualize_env(env, v=new_v, p=new_policy, fig=fig, ax=ax[1], title=title_p)
        policy = new_policy
    return policy, v

Run the algorithm and evaluate the result.

In [None]:
# Determine the optimal value function and policy given the model of the environment
policy_opt, v_opt = policy_iteration(env, DISCOUNT_FACTOR, 1000)

# Evalutate the found value function and policy given the model of the environment
policy_return = evaluate_policy(env, policy_opt, DISCOUNT_FACTOR, NUM_EPISODES)
print(f"Average return of the policy: {policy_return:.2f}")

## Value Iteration

![Value Iteration](value_iteration.png "Value Iteration")


### Algorithm
**TASK :**
Add the missing calculations for the *expected_reward* the *expected_discounted_return*, *v[s]* and *policy[s]*.

In [None]:
def value_iteration(env, discount_factor: float, max_iterations: int) -> Tuple[np.ndarray, np.ndarray]:
    """Value-Iteration algorithm"""
    # Initialize the policy
    policy = np.zeros(env.observation_space.n)
    for i in range(max_iterations):
        # TASK: evaluate the current policy
        v, iteration = policy_evaluation(env,
            policy, discount_factor, "value_iteration"
        )
        # TASK: define the new policy
        new_policy = policy_improvement(env, v, policy, discount_factor)

        new_v, _ = policy_evaluation(
            env, new_policy, discount_factor, "policy_iteration"
        )
        if np.all(policy == new_policy):
            print(f"Policy-Iteration converged at iteration #{i}")
            break
        # Plot the current policy
        title_p = f"Policy Improvement #{i + 1}"
        title_v = f"#Policy Evaluations {iteration}"
        _, ax = plt.subplots(1, 2)
        visualize_v(env, v, ax[0], title_v)
        visualize_p(env, new_v, new_policy, ax[1], title_p)
        policy = new_policy
    return policy, v

Run the algorithm and evaluate the result.

In [None]:
# Determine the optimal value function and policy given the model of the environment
policy_opt, v_opt = value_iteration(env, DISCOUNT_FACTOR, 1000)

# Evalutate the found value function and policy given the model of the environment
policy_return = evaluate_policy(env, policy_opt, DISCOUNT_FACTOR, NUM_EPISODES)
print(f"Average return of the policy: {policy_return:.2f}")

## Q-Learning

![Q-Learning](q_learning.png "Q-Learning")

### Temporal Difference Error
### $\delta_t = \underbrace{r_{t}}_{\text{reward}} + \underbrace{\gamma}_{\text{discount factor}} \cdot \underbrace{\max_{a}Q(s_{t+1}, a)}_{\text{estimate of optimal future value}} - \underbrace{Q(s_{t}, a_{t})}_{\text{estimate of optimal current value}}$

### Temporal Difference Update
### $Q^{new}(s_{t},a_{t}) \leftarrow \underbrace{Q(s_{t},a_{t})}_{\text{old value}} + \underbrace{\alpha}_{\text{learning rate}} \cdot \underbrace{\delta_t}_\text{temporal difference error}$

### Transiton Tuple
For ease of use we define a transition tuple that allows us to combine all the relevant information from one state to another.

In [None]:
# p = priority (only needed for (prioritized) experience replay)
# s = state
# a = action
# s1 = successor state
# r = reward
# td_e = temporal difference error
Transition = collections.namedtuple("Transition", ("p", "s", "a", "s1", "r", "td_e"))

### Replay Memory and Prioritized Experience Replay (optional)

Experience Replay and prioritization of specific experiences are common techniques to make the training more data efficient.

* [Paper - Experience Replay, 1992](https://link.springer.com/content/pdf/10.1007%2FBF00992699.pdf)
* [Paper - Prioritized Experience Replay, 2015](https://arxiv.org/abs/1511.05952)

In [None]:
class ReplayMemory:
    def __init__(self, config):
        # transitions memory
        self.transitions = []
        # size of the memory
        self.memory_size = config.memory_size
        # size of the batches
        self.batch_size = config.batch_size
        # flag for prioritized experience replay
        self.prioritized = config.prioritized

    def push(self, transition: Transition):
        # if the memory is not yet full add the new transition
        if len(self.transitions) < self.memory_size:
            heapq.heappush(self.transitions, transition)
        # if the memory is full remove the smallest transition and add the new transition
        else:
            del self.transitions[-1]
            heapq.heappush(self.transitions, transition)

    def replay(self):
        if self.prioritized:
            return heapq.nsmallest(self.batch_size, self.transitions)
        else:
            return random.sample(sorted(self.transitions), self.batch_size)

    def __len__(self):
        return len(self.transitions)

### The Q Agent
So far we have only defined simple function calls with
```python
def function_name(arg1, arg2):
    # compute something with arg1 and arg2 and return something
    if arg2 > 0:
        something = other_function(arg1) - arg2
    else:
        something = arg1
    return something
```
However for more complex tasks it is advisable to write object oriented code using classes. Classes provide a means of bundling data and functionality together. Creating a new class creates a new type of object, allowing new instances of that type to be made. Each class instance can have attributes attached to it for maintaining its state. Class instances can also have methods (defined by its class) for modifying its state.



Hence we create a class **QAgent** that incorporates all the methods needed for Q-Learning.

```python
class QAgent:
    
    def __init__(self): # constructor method that gets called when the object is being created
        
    def td_error(self): # Temporal Difference Error
        
    def td_update(self): # Temporal Difference Update
    
    def train(self, env): # Train the agent     
```

**TASK :**
Add the missing formulars for the TD-error and the TD-update.

In [None]:
class QAgent:
    def __init__(self, config):
        # Maximum length of training
        self.training_length = config.training_length
        # Maximum length of an episode
        self.episode_length = config.episode_length
        # TD error update step size
        self.learning_rate = config.learning_rate
        # TD error update step size
        self.discount_factor = config.discount_factor
        # Enabling experience replay
        self.replay_memory_enabled = True if config.config_replay_memory else False
        # Initialize the replay memory of the agent
        if self.replay_memory_enabled:
            self.replay_memory = ReplayMemory(config.config_replay_memory)

    def td_error(self, q: float, s: int, a: int, s1: int, r: float) -> float:
        """Calculates the temporal difference error given the current model and transition"""
        # TASK: return the TD-Error
        td_e = r + self.discount_factor * np.max(q[s1, :]) - q[s, a]
        return td_e

    def td_update(self, q: float, t) -> float:
        """Calculates the adjusted action value (q) given the td error from a single transition"""
        # TASK: return the update for the q value
        q = q + self.learning_rate * t.td_e
        return q

    def td_replay(self, q: float, q_target: float) -> float:
        # Use the replay memory to run additional updates
        if len(self.replay_memory) >= self.replay_memory.batch_size:
            for t in self.replay_memory.replay(self.replay_memory.batch_size):
                # Recalculate the temporal difference error for this transition
                td_e = self.td_error(q_target, t.s, t.a, t.s1, t.r)
                # Create an updated transition tuple
                updated_t = Transition(-td_e, t.s, t.a, t.s1, t.r, td_e)
                # Save the transition in replay memory
                self.replay_memory.push(updated_t)
                # Update model / q table
                q[t.s, t.a] = self.td_update(q_target[t.s, t.a], updated_t)
        return q

    def epsilon_greedy_noise(self, env, s: int, episode: int) -> Tuple[int, float]:
        epsilon = np.random.randn(1, env.action_space.n) * (1.0 / (episode + 1))
        a = np.argmax(self.q_target[s, :] + epsilon)
        return a, epsilon

    def epsilon_greedy_linear(self, env, s: int, episode: int) -> Tuple[int, float]:
        epsilon = 1 - (episode + 1) / self.training_length
        if epsilon > np.random.rand():
            a = np.random.randint(env.action_space.n)
        else:
            a = np.argmax(self.q_target[s, :])
        return a, epsilon

    def train(self, env):
        # Initialize the model / q table with zeros/random
        self.q = np.zeros([env.observation_space.n, env.action_space.n])
        # Create a target model / q table
        self.q_target = self.q

        ### METRICS
        # create lists to contain various metrics that should be tracked during the training process
        self.metrics = {
            "return": np.zeros(self.training_length),
            "q_avg": np.zeros(self.training_length),
            "epsilon": np.zeros(self.training_length),
            "td_error": np.zeros(self.training_length),
        }

        for episode in range(self.training_length):
            # Reset the environment and retrieve the initial state
            s, _ = env.reset()
            # Set the 'done' flag to false
            d = False
            # Set the step of the episode to 0
            step = 0
            # Start the Q-Learning algorithm
            while step < self.episode_length:
                # Derive action from current policy (epsilon_greedy noise)
                # a, epsilon = self.epsilon_greedy_linear(env, s , episode)
                a, epsilon = self.epsilon_greedy_noise(env, s, episode)

                # Execute the action and generate a succesor state as well as receive an immediate reward
                s1, r, d, _, _ = env.step(a)

                # Calculate the temporal difference error
                td_e = self.td_error(self.q_target, s, a, s1, r)

                # Create a transition tuple
                transition = Transition(-(td_e + 0.001), s, a, s1, r, td_e)

                # Save the transition in replay memory
                if self.replay_memory_enabled:
                    self.replay_memory.push(transition)

                # Update model / q table
                self.q[s, a] = self.td_update(self.q_target[s, a], transition)

                # Assign the current state the value of the successor state
                s = s1

                # Increment the step
                step += 1

                ### METRICS
                # Accumulate the episode return
                self.metrics["return"][episode] += self.discount_factor**step * r
                # Track the temporal difference error
                self.metrics["td_error"][episode] += td_e
                # Track the max epsilon values
                self.metrics["epsilon"][episode] += np.max(epsilon)
                # Track the average q values
                self.metrics["q_avg"][episode] = np.average(self.q)

                # If we reached a terminal state abort the while loop reset the environment and start over
                if d == True or step == 100:
                    # At the end of the episode update the target model with the current model

                    # If experience replay is enabled replay the experience collected so far
                    if self.replay_memory_enabled:
                        self.q_target = self.td_replay(self.q, self.q_target)
                    else:
                        self.q_target = self.q

                    ### METRICS
                    self.metrics["epsilon"][episode] /= step
                    self.metrics["q_avg"][episode] /= step
                    self.metrics["td_error"][episode] /= step
                    break

### Configuration Tuple
For ease of use we define a configuration tuple that allows us to combine all the relevant configuration from into one object.

In [None]:
ConfigQAgent = collections.namedtuple(
    "ConfigQAgent",
    (
        "learning_rate",
        "training_length",
        "episode_length",
        "discount_factor",
        "config_replay_memory",
    ),
)

ConfigReplayMemory = collections.namedtuple(
    "ConfigReplayMemory", ("memory_size", "batch_size", "prioritized")
)

### Configure and Train the Agent


In [None]:
# Agent 1
config_replay_memory = ConfigReplayMemory(500, 50, False)
config_q_agent = ConfigQAgent(0.1, 400, 50, DISCOUNT_FACTOR, None)

q_agent = QAgent(config_q_agent)
q_agent.train(env)
policy = compute_policy_from_q(env, q_agent.q_target)

### Visualize Metrics

In [None]:
print(f"Average return: {evaluate_policy(env, policy, q_agent.discount_factor, 1000):.2f}")
print(
    f"Score over time: {sum(q_agent.metrics['return']) / q_agent.training_length:.2f}"
)

fig, axi = plt.subplots(1, 2)
v = compute_v_from_q(env, q_agent.q_target)
visualize_p(env, v, policy, axi[0], title="Policy for the Frozen Lake")
visualize_v(env, v, axi[1], title="State Value Function for the Frozen Lake")

fig, ax = plt.subplots(1, 4)
# Plot the return over time
ax[0].plot(range(q_agent.training_length), q_agent.metrics["return"], ".")
ax[0].set(xlabel="episode", ylabel="reward", title="Return")
ax[0].grid()

# Plot the Q value over time
ax[1].plot(range(q_agent.training_length), q_agent.metrics["q_avg"], ".")
ax[1].set(xlabel="episode", ylabel="Q Value", title="Average Q Value")
ax[1].grid()

# Plot the epsilon over time
ax[2].plot(range(q_agent.training_length), q_agent.metrics["epsilon"], ".")
ax[2].set(xlabel="episode", ylabel="epsilon", title="Epsilon")
ax[2].grid()

# Plot the td error over time
ax[3].plot(range(q_agent.training_length), q_agent.metrics["td_error"], ".")
ax[3].set(xlabel="episode", ylabel="TD Error", title="TD Error")
ax[3].grid()

### Evaluate different Hyperparameters (optional)

In [None]:
def evaluate_training_episodes(number_evaluation_points: int, number_evaluations: int):
    ### Evaluate different episode lengths
    training_lengths = np.linspace(1, 501, number_evaluation_points, dtype=int)
    returns = np.zeros(number_evaluation_points)
    for i in range(number_evaluation_points):
        config_q_agent = ConfigQAgent(
            0.1, training_lengths[i], 100, DISCOUNT_FACTOR, None
        )
        for j in range(number_evaluations):
            q_agent = QAgent(config_q_agent)
            q_agent.train(env)
            returns[i] += np.max(q_agent.metrics["return"])
        returns[i] /= number_evaluations

    fig, ax = plt.subplots()
    ax.set(xlabel="#Episodes", ylabel="Max. Return", title="#Episodes vs. Return")
    ax.plot(training_lengths, returns, "-o")


evaluate_training_episodes(10, 10)