# CSC-52081-EP Lab6: Policy Gradient

<img src="https://raw.githubusercontent.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/refs/heads/main/assets/logo.jpg" style="float: left; width: 15%" />

[CSC-52081-EP-2026](https://moodle.ip-paris.fr/course/view.php?id=10691) Lab session #6

2019-2026 JÃ©rÃ©mie Decock

[![Open in Google Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/jeremiedecock/polytechnique-csc-52081-ep-2026-students/blob/main/lab6_rl4_policy_gradient.ipynb)

[![My Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main?filepath=lab6_rl4_policy_gradient.ipynb)

[![NbViewer](https://raw.githubusercontent.com/jupyter/design/main/logos/Badges/nbviewer_badge.svg)](https://nbviewer.jupyter.org/github/jeremiedecock/polytechnique-csc-52081-ep-2026-students/blob/main/lab6_rl4_policy_gradient.ipynb)

[![Local](https://img.shields.io/badge/Local-Save%20As...-blue)](https://github.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/raw/main/lab6_rl4_policy_gradient.ipynb)

## Introduction

The aim of this lab is to provide an in-depth exploration of policy-based reinforcement learning techniques, with a particular focus on the *Monte Carlo Policy Gradient (REINFORCE)*.

As for previous labs, you can either:
- open, edit and execute the notebook in *Google Colab* following this link: https://colab.research.google.com/github/jeremiedecock/polytechnique-csc-52081-ep-2026-students/blob/main/lab6_rl4_policy_gradient.ipynb ; this is the **recommended** choice as you have nothing to install on your computer
- open, edit and execute the notebook in *MyBinder* (if for any reason the Google Colab solution doesn't work): https://mybinder.org/v2/gh/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main?filepath=lab6_rl4_policy_gradient.ipynb
- download, edit and execute the notebook on your computer if Python3 and JupyterLab are already installed: https://github.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/raw/main/lab6_rl4_policy_gradient.ipynb

If you work with Google Colab or MyBinder, **remember to save or download your work regularly or you may lose it!**

## Lab Submission

Please submit your completed notebook in [Moodle : "Lab 6 - Submission"](https://moodle.ip-paris.fr/mod/assign/view.php?id=367817).

### Submission Guidelines

1. **File Naming:** Rename your notebook as follows: **`firstname_lastname-06.ipynb`** where `firstname` and `lastname` match your email address. *Example: `jesse_read-06.ipynb`*
2. **Clear Output Cells:** To reduce file size (**must be under 500 KB**), clear all output cells before submitting. This includes rendered images, videos, plots, and dataframes...
   - **JupyterLab:**
     - Click **"Kernel" â†’ "Restart Kernel and Clear Outputs of All Cells..."**
     - Then go to **"File" â†’ "Save Notebook As..."**
   - **Google Colab:**
     - Click **"Edit" â†’ "Clear all outputs"**
     - Then go to **"File" â†’ "Download" â†’ "Download.ipynb"**
   - **VSCode:**
     - Click **"Clear All Outputs"**
     - Then **save your file**
3. **Upload Your File:** Only **`.ipynb`** files are accepted.

**Note:** Bonus parts (if any) are optional, as their name suggests.


## Setup the Python environment

This notebook relies on several libraries including `gymnasium[classic-control]` (v1.0.0), `ipywidgets`, `matplotlib`, `moviepy`, `numpy`, `pandas`, `pygame`, `seaborn`, `torch`, and `tqdm`.
A complete list of dependencies can be found in the following [requirements-lab6.txt](https://raw.githubusercontent.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main/requirements-lab6.txt) file.

### If you use Google Colab

If you use Google Colab, execute the next cell to install required libraries.

In [None]:
import sys
import subprocess


def is_colab():
    return "google.colab" in sys.modules


def run_subprocess_command(cmd):
    # run the command
    process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
    # print the output
    for line in process.stdout:
        print(line.decode().strip())


if is_colab():
    # run_subprocess_command("apt install xvfb x11-utils")
    run_subprocess_command(
        "pip install -r https://raw.githubusercontent.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main/requirements-lab6-google-colab.txt"
    )

### If you have downloaded the notebook on your computer and execute it in your own Python environment

To set up the necessary dependencies, run the following commands to establish a [Python virtual environment (venv)](https://docs.python.org/3/library/venv.html) that includes all the essential libraries for this lab.

#### On POSIX systems (Linux, MacOSX, WSL, ...)

```bash
python3 -m venv env-lab6
source env-lab6/bin/activate
python3 -m pip install --upgrade pip
python3 -m pip install -r https://raw.githubusercontent.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main/requirements-lab6.txt
```

#### On Windows

```bash
python3 -m venv env-lab6
env-lab6\Scripts\activate.bat
python3 -m pip install --upgrade pip
python3 -m pip install -r https://raw.githubusercontent.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main/requirements-lab6.txt
```

### Run CSC-52081-EP notebooks locally in a dedicated Docker container

If you are familiar with Docker, an image is available on Docker Hub for this lab:

```bash
docker run -it --rm --user root -p 8888:8888 -e NB_UID=$(id -u) -e NB_GID=$(id -g) -v "${PWD}":/home/jovyan/work jdhp/csc-52081-ep:latest
```

### Import required packages

In [None]:
import gymnasium as gym
import numpy as np
# from numpy.typing import NDArray
import pandas as pd
from pathlib import Path
import torch
from typing import cast, List, Tuple, Union

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt

import seaborn as sns
from tqdm.notebook import tqdm

In [None]:
from IPython.display import Video
from ipywidgets import interact

In [None]:
import warnings

warnings.filterwarnings("ignore", category=UserWarning)

In [None]:
sns.set_context("talk")

In [None]:
FIGS_DIR = Path("figs/") / "lab6"       # Where to save figures (.gif or .mp4 files)
PLOTS_DIR = Path("figs/") / "lab6"      # Where to save plots (.png or .svg files)
MODELS_DIR = Path("models/") / "lab6"   # Where to save models (.pth files)

In [None]:
if not FIGS_DIR.exists():
    FIGS_DIR.mkdir(parents=True)
if not PLOTS_DIR.exists():
    PLOTS_DIR.mkdir(parents=True)
if not MODELS_DIR.exists():
    MODELS_DIR.mkdir(parents=True)

## Define some parameters

### Number of trainings

To achieve more representative outcomes at the conclusion of each exercise, we average the results across multiple training sessions. The `DEFAULT_NUMBER_OF_TRAININGS` variable specifies the number of training sessions conducted before the results are displayed.

We recommend setting a lower value (such as 2 or 3) during the development and testing phases of your implementations. Once you have completed your work and are confident in its functionality, you can increase the number of training sessions to minimize the variance in results. Be aware that a higher number of training sessions will extend the execution time, so adjust this setting in accordance with your computer's capabilities.

Additionally, you have the option to assign a specific value to the `NUMBER_OF_TRAININGS` variable for each exercise directly within the cells where the training loop is defined.

In [None]:
DEFAULT_NUMBER_OF_TRAININGS = 3

## Define the video selector widget

The `video_selector` function, defined in the next cell, will be used in the following exercises to display different episodes of the trained agent.

In [None]:
def video_selector(file_path: List[Path]):
    return Video(file_path, embed=True, html_attributes="controls autoplay loop")

## PyTorch Refresher and Cheat Sheet

In this lab, we will be implementing our reinforcement learning algorithms using PyTorch.
If you need a refresher, you might find this [PyTorch Cheat Sheet](https://web.archive.org/web/20241215022731/https://pytorch.org/tutorials/beginner/ptcheat.html) helpful. It provides a quick reference for many of the most commonly used PyTorch functions and concepts, and can be a valuable resource as you work through this lab.

Autograd, the core concept behind automatic differentiation in PyTorch, is introduced [here](https://pytorch.org/docs/stable/autograd.html).

You can also refer to the [official documentation](https://pytorch.org/docs/stable/index.html).

## PyTorch setup

PyTorch can run on both CPUs and GPUs. The following cell will determine the device PyTorch will use. If a GPU is available, PyTorch will use it; otherwise, it will use the CPU.

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Set the device to CUDA if available, otherwise use CPU

For utilizing a GPU on Google Colab, you also have to activate it following the steps outlined [here](https://colab.research.google.com/notebooks/gpu.ipynb).

In [None]:
print("Available GPUs:")
if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        print(f"- Device {i}: {torch.cuda.get_device_name(i)}")
else:
    print("- No GPU available.")

If you have a recent GPU and want to use it, you may need to install a specific version of PyTorch compatible with your CUDA version. For this, you will have to edit the [requirements-lab6.txt](https://raw.githubusercontent.com/jeremiedecock/polytechnique-csc-52081-ep-2026-students/main/requirements-lab6.txt) file and replace the current version of PyTorch with one compatible with your CUDA version. Check the [official PyTorch website](https://pytorch.org/get-started/locally/) for more information.

Note that a GPU is not very useful in this lab because CartPole is a simple and fast problem to solve, and CUDA spends more time transferring data between the CPU and GPU than processing it directly on the CPU.

You can uncomment the next cell to explicitly instruct PyTorch to train neural networks using the CPU.

In [None]:
# DEVICE = "cpu"

In [None]:
print(f"PyTorch will train and test neural networks on {DEVICE}")

## Setup the Gymnasium environment

As in the previous lab, we will try to solve the CartPole-v1 environment (see: https://gymnasium.farama.org/environments/classic_control/cart_pole/), which offers a continuous state space and a discrete action space.

**Reminder**: the CartPole task consists of keeping a pole upright by moving the cart to which it is attached via a joint.
No friction is considered.
The task is considered solved if the pole stays upright (within 15 degrees) for 500 steps on average over 100 episodes, while keeping the cart position within reasonable bounds.
The state is given by $\{x,\dot{x},\phi,\dot{\phi}\}$ where $x$ is the position of the cart and $\phi$ is the angle between the pole and the vertical.
There are only two possible actions: $a \in \{0, 1\}$ where $a = 0$ means "push the cart to the LEFT" and $a = 1$ means "push the cart to the RIGHT".

In [None]:
env = gym.make("CartPole-v1", render_mode="rgb_array")

cartpole_observation_space = cast(gym.spaces.Box, env.observation_space)
cartpole_action_space = cast(gym.spaces.Discrete, env.action_space)

cartpole_observation_dim:int = cartpole_observation_space.shape[0]    # Number of *dimensions* in the observation space (i.e. number of *elements* in the observation vector)
cartpole_num_actions:int = cast(int, cartpole_action_space.n.item())  # Number of possible actions

print(f"State space size is: {cartpole_observation_space}")
print(f"Action space size is: {cartpole_action_space}")
print("Actions are: {" + ", ".join([str(a) for a in range(cartpole_num_actions)]) + "}")

env.close()

## Notation Table

The following table summarizes the main symbols and notations used throughout this lab.

| Symbol                                                                  | Meaning                                                                                |
|-------------------------------------------------------------------------|----------------------------------------------------------------------------------------|
| $s, s_t$                                                                | State (or observation) / state (or observation) at time step $t$                       |
| $a, a_t$                                                                | Action / action at time step $t$                                                       |
| $r_t$ or $r(s_t, a_t)$                                                  | Reward received at time step $t$                                                       |
| $T$                                                                     | Horizon (maximum number of steps per episode)                                          |
| $\pi_{\boldsymbol{\theta}}$                                             | Parameterized stochastic policy with parameters $\boldsymbol{\theta}$                  |
| $\pi_{\boldsymbol{\theta}}(a \mid s)$                                   | Probability of taking action $a$ in state $s$ under policy $\pi_{\boldsymbol{\theta}}$ |
| $\boldsymbol{\theta} \in \mathbb{R}^d$                                  | Policy parameters (neural network weights)                                             |
| $\alpha$                                                                | Learning rate                                                                          |
| $J(\boldsymbol{\theta})$                                                | Optimization criterion (expected total return)                                         |
| $V^{\pi_{\boldsymbol{\theta}}}(s)$                                      | State-value function under policy $\pi_{\boldsymbol{\theta}}$                          |
| $Q^{\pi_{\boldsymbol{\theta}}}(s, a)$                                   | Action-value function under policy $\pi_{\boldsymbol{\theta}}$                         |
| $G_t = \sum_{k=t}^{T-1} r_k$                                            | Monte Carlo return from time step $t$                                                  |
| $\nabla_{\boldsymbol{\theta}} \log \pi_{\boldsymbol{\theta}}(a \mid s)$ | Score function (gradient of the log-policy)                                            |
| $\tau$                                                                  | Trajectory $\{ s_0, a_0, r_0, s_1, a_1, \dots, r_{T-1}, s_T \}$                        |
| $\mathbb{E}_{\pi_{\boldsymbol{\theta}}}[\cdot]$                         | Expectation under policy $\pi_{\boldsymbol{\theta}}$                                   |

## Part 1: Monte Carlo Policy Gradient (REINFORCE)

### The Policy Gradient theorem

We will solve the CartPole environment using a policy gradient method which directly searches, within a family of parameterized policies $\pi_\theta$, for an optimal policy.

This method performs gradient ascent in the policy space so that the total return is maximized.
We will restrict our work to episodic tasks, *i.e.* tasks that have a starting state and last for a finite and fixed number of steps $T$, called the horizon.

More formally, we define an optimization criterion that we want to maximize:

$$J(\theta) = \mathbb{E}_{\pi_\theta}\left[\sum_{t=0}^{T-1} r(s_t,a_t)\right],$$

where $\mathbb{E}_{\pi_\theta}$ means $a \sim \pi_\theta(\cdot|s)$ and $T$ is the horizon of the episode.
In other words, we want to maximize the value of the starting state: $V^{\pi_\theta}(s)$.
The policy gradient theorem tells us that:

$$
\nabla_\theta J(\theta) = \nabla_\theta V^{\pi_\theta}(s) = \mathbb{E}_{\pi_\theta} \Bigl[\nabla_\theta \log \pi_\theta (a|s) ~ Q^{\pi_\theta}(s,a) \Bigr],
$$

where the $Q$-function is defined as:

$$Q^{\pi_\theta}(s,a) = \mathbb{E}_{\pi_\theta} \left[\sum_{t=0}^{T-1} r(s_t,a_t)|s=s_0, a=a_0\right].$$

The policy gradient theorem is particularly effective because it allows gradient computation without needing to model the system's dynamics, as long as the $Q$-function (or an estimate of it) is available. By simply applying the policy and observing one-step transitions, sufficient information can be gathered. Using stochastic gradient ascent and substituting $Q^{\pi_\theta}(s_t,a_t)$ with a Monte Carlo estimate $G_t = \sum_{t'=t}^{T-1} r(s_{t'},a_{t'})$ for a single trajectory, we obtain the REINFORCE algorithm.

The REINFORCE algorithm, introduced by Williams in 1992, is a Monte Carlo policy gradient method. It updates the policy in the direction that maximizes rewards, using full-episode returns as an unbiased estimate of the gradient. Each step involves generating an episode using the current policy, computing the gradient estimate, and updating the policy parameters. This algorithm is simple yet powerful, and it's particularly effective in environments where the policy gradient is noisy or the dynamics are complex.

For further reading and a deeper understanding, refer to Williams' seminal paper (https://link.springer.com/article/10.1007/BF00992696) and the comprehensive text on reinforcement learning by Richard S. Sutton and Andrew G. Barto: "Reinforcement Learning: An Introduction", chap.13 (http://incompleteideas.net/book/RLbook2020.pdf).

Here is the REINFORCE algorithm.

### Monte Carlo policy gradient (REINFORCE)

<b>REQUIRE</b> <br>
$\quad$ A differentiable policy $\pi_{\boldsymbol{\theta}}$ <br>
$\quad$ A learning rate $\alpha \in \mathbb{R}^+$ <br>
<b>INITIALIZATION</b> <br>
$\quad$ Initialize parameters $\boldsymbol{\theta} \in \mathbb{R}^d$ <br>
<br>
<b>FOR EACH</b> episode <br>
$\quad$ Generate full trace $\tau = \{ \boldsymbol{s}_0, \boldsymbol{a}_0, r_0, \boldsymbol{s}_1, \boldsymbol{a}_1, \dots, r_{T-1}, \boldsymbol{s}_T \}$ following $\pi_{\boldsymbol{\theta}}$ <br>
$\quad$ <b>FOR</b> $~ t=0,\dots,T-1$ <br>
$\quad\quad$ $G \leftarrow \sum_{k=t}^{T-1} r_k$ <br>
$\quad\quad$ $\boldsymbol{\theta} \leftarrow \boldsymbol{\theta} + \alpha ~ \underbrace{G ~ \nabla_{\boldsymbol{\theta}} \ln \pi_{\boldsymbol{\theta}}(\boldsymbol{a}_t|\boldsymbol{s}_t)}_{\nabla_{\boldsymbol{\theta}} J(\boldsymbol{\theta})}$ <br>
<br>
<b>RETURN</b> $\boldsymbol{\theta}$

### Exercise 1: REINFORCE for discrete action spaces (CartPole)

#### Policy Implementation

We will implement a stochastic policy to control the cart using a simple one-layer neural network. Given the simplicity of the problem, a single layer will suffice. We will not incorporate a bias term in this layer.

This neural network will output the probabilities of each possible action (in this case, there are only two actions: "push left" or "push right") given the input vector $s$ (the 4-dimensional state vector).

**Task 1.1**: Implement the `PolicyNetwork` defined as follows.

The network takes an input tensor representing the state of the environment and outputs a tensor of action probabilities.
The network has the following components:

- `layer1`: This is a linear (fully connected) layer that takes `n_observations` as input and outputs `n_actions`. It does not include a bias term due to the symmetry of the CartPole problem around 0.

- `forward` method: This method defines the forward pass of the network. It takes a state tensor as input and returns a tensor of action probabilities. It first applies the linear layer to the input state tensor to get the logits (the raw, unnormalized scores for each action), and then applies the [softmax function](https://docs.pytorch.org/docs/stable/generated/torch.softmax.html#torch-softmax) to the logits to get the action probabilities. The [softmax function](https://docs.pytorch.org/docs/stable/generated/torch.softmax.html#torch-softmax) ensures that the action probabilities are positive and sum to 1, so they can be interpreted as probabilities.

This network is quite simple and may not perform well on complex tasks with large state or action spaces. However, it can be a good starting point for simple reinforcement learning tasks, and can be easily extended with more layers or different types of layers (such as convolutional layers for image inputs) to handle more complex tasks.

In [None]:
class PolicyNetwork(torch.nn.Module):
    """
    A neural network used as a policy for the REINFORCE algorithm.

    Attributes
    ----------
    layer1 : torch.nn.Linear
        A fully connected layer.

    Methods
    -------
    forward(state: torch.Tensor) -> torch.Tensor
        Define the forward pass of the PolicyNetwork.
    """

    def __init__(self, n_observations: int, n_actions: int):
        """
        Initialize a new instance of PolicyNetwork.

        Parameters
        ----------
        n_observations : int
            The size of the observation space.
        n_actions : int
            The size of the action space.
        """
        super(PolicyNetwork, self).__init__()

        self.layer1 = ... # TODO


    def forward(self, state_tensor: torch.Tensor) -> torch.Tensor:
        """
        Calculate the probability of each action for the given state.

        Parameters
        ----------
        state_tensor : torch.Tensor
            The input tensor (state).
            The shape of the tensor should be (N, dim),
            where N is the number of states vectors in the batch
            and dim is the dimension of state vectors.

        Returns
        -------
        torch.Tensor
            The output tensor (the probability of each action for the given state).
        """

        logits = ... # TODO
        out = ...    # TODO

        return out

#### Implement the `sample_discrete_action` function

**Task 1.2**: Complete the `sample_discrete_action` function. This function is used to sample a discrete action based on a given state and a policy network. It first converts the state into a tensor and passes it through the policy network to get the parameters of the action probability distribution. Then, it creates a [categorical distribution](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical) from these parameters and [samples an action](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical.sample) from this distribution. It also [calculates the log probability](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical.log_prob) of the sampled action according to the distribution. The function returns the sampled action and its log probability.

**Practical tips**: To complete this task, you need to be familiar with the following PyTorch concepts:

- [torch.distributions.Categorical](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical): the categorical distribution.
- [torch.distributions.Categorical.sample()](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical.sample): sample from the categorical distribution.
- [torch.distributions.Categorical.log_prob()](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical.log_prob): calculate the log probability of an action.

In [None]:
def sample_discrete_action(
    policy_nn: PolicyNetwork, state: np.ndarray
) -> Tuple[int, torch.Tensor]:
    """
    Sample a discrete action based on the given state and policy network.

    This function takes a state and a policy network, and returns a sampled action and its log probability.
    The action is sampled from a categorical distribution defined by the output of the policy network.

    Parameters
    ----------
    policy_nn : PolicyNetwork
        The policy network that defines the probability distribution of the actions.
    state : np.ndarray
        The state based on which an action needs to be sampled.

    Returns
    -------
    Tuple[int, torch.Tensor]
        The sampled action and its log probability.
    """

    # Convert the state into a tensor, specify its data type as float32, and send it to the device (CPU or GPU).
    # The unsqueeze(0) function is used to add an extra dimension to the tensor to match the input shape required by the policy network.
    state_tensor = ... # TODO

    # Pass the state tensor through the policy network to get the parameters of the action probability distribution.
    actions_probability_distribution_params = ... # TODO

    # Create the categorical distribution used to sample an action from the parameters obtained from the policy network.
    # See https://pytorch.org/docs/stable/distributions.html#categorical
    actions_probability_distribution = ... # TODO

    # Sample an action from the categorical distribution.
    sampled_action_tensor = ... # TODO

    # Convert the tensor containing the sampled action into a Python integer.
    sampled_action = ... # TODO

    # Calculate the log probability of the sampled action according to the categorical distribution.
    sampled_action_log_probability = ... # TODO

    # Return the sampled action and its log probability.
    return sampled_action, sampled_action_log_probability

**Task 1.3**: Test the `sample_discrete_action` function on a random state using an untrained policy network.

In [None]:
env = gym.make("CartPole-v1")

policy_nn = ... # TODO

state = ... # TODO
theta = ... # TODO
action, action_log_probability = ... # TODO

print("state:", state)
print("theta:", theta)
print("sampled action:", action)
print("log probability of the sampled action:", action_log_probability)

env.close()

#### Implement the `sample_one_episode` function

Remember that in the REINFORCE algorithm, we need to generate a complete trajectory, denoted as $\tau = \{ \boldsymbol{s}_0, \boldsymbol{a}_0, r_0, \boldsymbol{s}_1, \boldsymbol{a}_1, \dots, r_{T-1}, \boldsymbol{s}_T \}$. This trajectory includes the states, actions, and rewards at each time step, as outlined in the REINFORCE algorithm.

**Task 1.4**: Your task is to implement the `sample_one_episode` function. This function should play one episode using the given policy $\pi_\theta$ and return its rollouts. The function should adhere to a fixed horizon $T$, which represents the maximum number of steps in the episode.

In [None]:
def sample_one_episode(
    env: gym.Env, policy_nn: PolicyNetwork, max_episode_duration: int
) -> Tuple[List[np.ndarray], List[int], List[float], List[torch.Tensor]]:
    """Execute one episode in `env` using the policy defined by `policy_nn`.

    Parameters
    ----------
    env : gym.Env
        The environment to play in.
    policy_nn : PolicyNetwork
        The policy neural network.
    max_episode_duration : int
        The maximum duration of the episode.

    Returns
    -------
    Tuple[List[np.ndarray], List[int], List[float], List[torch.Tensor]]
        The states, actions, rewards, and log-probability of the action for each time step in the episode.
    """
    state_t, info = env.reset()

    episode_states = []
    episode_actions = []
    episode_log_prob_actions = []
    episode_rewards = []
    episode_states.append(state_t)

    for t in range(max_episode_duration):

        # Sample a discrete action and its log probability from the policy network based on the current state
        action_t, log_prob_action_t = ... # TODO

        # Execute the sampled action in the environment, which returns the new state, reward, and whether the episode has terminated or been truncated
        state_t, reward_t, terminated, truncated, info = env.step(action_t)

        # Check if the episode is done, either due to termination (reaching a terminal state) or truncation (reaching a maximum number of steps)
        done = terminated or truncated

        # Append the new state, action, action log probability and reward to their respective lists

        episode_states.append(state_t)
        episode_actions.append(action_t)
        episode_log_prob_actions.append(log_prob_action_t)
        episode_rewards.append(float(reward_t))

        if done:
            break

    return episode_states, episode_actions, episode_rewards, episode_log_prob_actions

**Task 1.5:** Test this function on the untrained agent.

In [None]:
VIDEO_PREFIX_EX1_REINFORCE_UNTRAINED = "lab6_ex1_reinforce_untrained"

NUM_EPISODES = 3

file_path_list = [
    FIGS_DIR / f"{VIDEO_PREFIX_EX1_REINFORCE_UNTRAINED}-episode-{episode_index}.mp4"
    for episode_index in range(NUM_EPISODES)
]

for file_path in file_path_list:
    file_path.unlink(missing_ok=True)

env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
    env,
    video_folder=str(FIGS_DIR),
    name_prefix=VIDEO_PREFIX_EX1_REINFORCE_UNTRAINED,
    episode_trigger=lambda x: True,
)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=NUM_EPISODES)

for episode_index in range(NUM_EPISODES):
    policy_nn = ... # TODO
    episode_states, episode_actions, episode_rewards, episode_log_prob_actions = ... # TODO

print(f"Episode time taken: {env.time_queue}")
print(f"Episode total rewards: {env.return_queue}")
print(f"Episode lengths: {env.length_queue}")

env.close()

print("\nSelect the episode to play here ðŸ‘‡\n")

interact(video_selector, file_path=file_path_list);

In [None]:
episode_states

In [None]:
episode_actions

In [None]:
episode_rewards

#### Implement a test function

**Task 1.6**: Implement the `avg_return_on_multiple_episodes` function that tests the given policy $\pi_\theta$ over `num_test_episode` episodes (with a fixed horizon $T$) and returns the average return across these episodes.

The function `avg_return_on_multiple_episodes` is designed to play multiple episodes of a given environment using a specified policy neural network and to compute the average return. It takes as input the environment to play in, the policy neural network to use, the number of episodes to play, and the maximum duration of an episode.
In each episode, it uses the `sample_one_episode` function to play the episode and collect the rewards. The function then returns the average of these cumulative returns.

`avg_return_on_multiple_episodes` will be used for evaluating the performance of a policy over multiple episodes.

In [None]:
def avg_return_on_multiple_episodes(
    env: gym.Env,
    policy_nn: PolicyNetwork,
    num_test_episode: int,
    max_episode_duration: int,
) -> float:
    """
    Play multiple episodes of the environment and calculate the average return.

    Parameters
    ----------
    env : gym.Env
        The environment to play in.
    policy_nn : PolicyNetwork
        The policy neural network.
    num_test_episode : int
        The number of episodes to play.
    max_episode_duration : int
        The maximum duration of an episode.

    Returns
    -------
    float
        The average return.
    """

    ... # TODO

    return average_return

**Task 1.7:** Test this function on the untrained agent.

In [None]:
env = gym.make("CartPole-v1")

policy_nn = ... # TODO
average_return = ... # TODO

print(average_return)

env.close()

#### Implement the train function

**Task 1.8**: Implement the `train_reinforce_discrete` function, used to train a policy network using the REINFORCE algorithm in the given environment. This function takes as input the environment, the number of training episodes, the number of tests to perform per episode, the maximum duration of an episode, and the learning rate for the optimizer.

The function first initializes a policy network and an Adam optimizer. Then, for each training episode, it generates an episode using the current policy and calculates the return at each time step. It uses this return and the log probability of the action taken at that time step to compute the loss, which is the negative of the product of the return and the log probability. This loss is used to update the policy network parameters using gradient ascent.

After each training episode, the function tests the current policy by playing a number of test episodes and calculating the average return. This average return is added to a list for monitoring purposes.

The function returns the trained policy network and the list of average returns for each episode. This function encapsulates the main loop of the REINFORCE algorithm, including the policy update step.

In [None]:
def train_reinforce_discrete(
    env: gym.Env,
    num_train_episodes: int,
    num_test_per_episode: int,
    max_episode_duration: int,
    learning_rate: float,
) -> Tuple[PolicyNetwork, List[float]]:
    """
    Train a policy using the REINFORCE algorithm.

    Parameters
    ----------
    env : gym.Env
        The environment to train in.
    num_train_episodes : int
        The number of training episodes.
    num_test_per_episode : int
        The number of tests to perform per episode.
    max_episode_duration : int
        The maximum length of an episode.
    learning_rate : float
        The learning rate for the Adam optimizer.

    Returns
    -------
    Tuple[PolicyNetwork, List[float]]
        The final trained policy and the average evaluation returns computed after each training episode.
    """
    episode_avg_return_list = []

    policy_nn = PolicyNetwork(cartpole_observation_dim, cartpole_num_actions).to(DEVICE)
    optimizer = torch.optim.Adam(policy_nn.parameters(), lr=learning_rate)

    for episode_index in tqdm(range(num_train_episodes)):

        # Generate an episode following the current policy
        _, _, episode_reward_list, episode_log_prob_action_list = ... # TODO

        # Iterate over the episode
        for t in range(len(episode_reward_list)):
            # Calculate the return at time t
            future_return = ... # TODO

            # Convert the future_return to a PyTorch tensor
            returns_tensor = ... # TODO

            # Get the log probability of the action taken at time t
            log_prob_actions_tensor = ... # TODO

            # Gradient descent on the negated objective (equivalent to gradient ascent on J(Î¸))
            loss = ... # TODO

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Test the current policy
        test_avg_return = avg_return_on_multiple_episodes(
            env=env,
            policy_nn=policy_nn,
            num_test_episode=num_test_per_episode,
            max_episode_duration=max_episode_duration,
        )

        # Monitoring
        episode_avg_return_list.append(test_avg_return)

    return policy_nn, episode_avg_return_list

#### Train the agent

In [None]:
env = gym.make("CartPole-v1")

NUMBER_OF_TRAININGS = DEFAULT_NUMBER_OF_TRAININGS  # Change the default (global) value here if you want a specific number of trainings for this exercise
reinforce_trains_result_list: List[List[Union[int, float]]] = [[], [], []]

for train_index in range(NUMBER_OF_TRAININGS):
    # Train the agent
    reinforce_policy_nn, episode_reward_list = train_reinforce_discrete(
        env=env,
        num_train_episodes=150,
        num_test_per_episode=5,
        max_episode_duration=500,
        learning_rate=0.005,
    )

    reinforce_trains_result_list[0].extend(range(len(episode_reward_list)))
    reinforce_trains_result_list[1].extend(episode_reward_list)
    reinforce_trains_result_list[2].extend([train_index for _ in episode_reward_list])

reinforce_trains_result_df = pd.DataFrame(
    np.array(reinforce_trains_result_list).T,
    columns=["num_episodes", "mean_final_episode_reward", "training_index"],
)
reinforce_trains_result_df["agent"] = "REINFORCE"

# Save the action-value estimation function of the last train

torch.save(reinforce_policy_nn, MODELS_DIR / "lab6_reinforce_policy_network.pth")

env.close()

#### Plot results

In [None]:
g = sns.relplot(
    x="num_episodes",
    y="mean_final_episode_reward",
    kind="line",
    hue="agent",
    estimator=None,
    units="training_index",
    data=reinforce_trains_result_df,
    height=7,
    aspect=2,
    alpha=0.5,
)
plt.savefig(PLOTS_DIR / "lab6_reinforce_cartpole_trains_result.png")

In [None]:
all_trains_result_df = pd.concat(
    [
        reinforce_trains_result_df,
    ]
)
g = sns.relplot(
    x="num_episodes",
    y="mean_final_episode_reward",
    kind="line",
    hue="agent",
    data=all_trains_result_df,
    height=7,
    aspect=2,
)
plt.savefig(PLOTS_DIR / "lab6_reinforce_cartpole_trains_result_agg.png")

#### Test final policy

In [None]:
VIDEO_PREFIX_EX1_REINFORCE_TRAINED = "lab6_ex1_reinforce_trained"

NUM_EPISODES = 3

file_path_list = [
    FIGS_DIR / f"{VIDEO_PREFIX_EX1_REINFORCE_TRAINED}-episode-{episode_index}.mp4"
    for episode_index in range(NUM_EPISODES)
]

for file_path in file_path_list:
    file_path.unlink(missing_ok=True)

env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
    env,
    video_folder=str(FIGS_DIR),
    name_prefix=VIDEO_PREFIX_EX1_REINFORCE_TRAINED,
    episode_trigger=lambda x: True,
)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=NUM_EPISODES)

for episode_index in range(NUM_EPISODES):
    episode_states, episode_actions, episode_rewards, episode_log_prob_actions = ... # TODO

print(f"Episode time taken: {env.time_queue}")
print(f"Episode total rewards: {env.return_queue}")
print(f"Episode lengths: {env.length_queue}")

env.close()

print("\nSelect the episode to play here ðŸ‘‡\n")

interact(video_selector, file_path=file_path_list);

In [None]:
reinforce_trains_result_df

#### Score

In [None]:
score_ex1 = reinforce_trains_result_df[["num_episodes", "mean_final_episode_reward"]].groupby("num_episodes").mean().max()
score_ex1

### Exercise 2: REINFORCE with Baseline

In the basic REINFORCE algorithm, the policy parameters are updated in proportion to the product of the gradient of the policy log-probability and the cumulative reward (return) from a state-action pair. However, this approach can lead to high variance in policy updates, making learning slower.

In this exercise, a *baseline* is introduced to reduce this variance. It is a value subtracted from the cumulative reward when calculating the policy gradient. The key property of the baseline is that it does not affect the expected value of the gradient estimate, which means it doesn't bias the learning process but reduces the variance of the updates.

The baseline can be thought of as a reference point or an "average" expectation of reward. By comparing the actual rewards to this baseline, we can determine whether the outcomes of certain actions are better or worse than this "average" performance.

A common choice for the baseline is the value function of the current policy, $\hat{V}_{\boldsymbol{\omega}}$. By using the value function as a baseline, the algorithm adjusts the policy towards actions that perform better than the average.

To incorporate the baseline into REINFORCE, you modify the update rule. Instead of using the total return $G$â€‹ directly, you subtract the baseline value $\hat{V}_{\boldsymbol{\omega}}$ from $G$â€‹ in the policy gradient estimate.

By centering the rewards around a baseline, the variance of the policy gradient estimates is reduced. This leads to more stable and efficient learning, as the updates are less noisy and more focused on improving relative to the average performance.

#### REINFORCE with Baseline

<b>REQUIRE</b> <br>
 $\quad$ A differentiable policy $\pi_{\boldsymbol{\theta}}$ <br>
 $\quad$ A differentiable baseline function $\hat{V}_{\boldsymbol{\omega}}(\boldsymbol{s})$ <br>
 $\quad$ A learning rate $\alpha_1 \in \mathbb{R}^+$ for the policy <br>
 $\quad$ A learning rate $\alpha_2 \in \mathbb{R}^+$ for the baseline <br>
<b>INITIALIZATION</b> <br>
 $\quad$ Initialize parameters $\boldsymbol{\theta} \in \mathbb{R}^d$ <br>
 $\quad$ Initialize parameters $\boldsymbol{\omega} \in \mathbb{R}^d$ <br>
<br>
<b>FOR EACH</b> episode <br>
 $\quad$ Generate full trace $\tau = \{ \boldsymbol{s}_0, \boldsymbol{a}_0, r_0, \boldsymbol{s}_1, \boldsymbol{a}_1, \dots, r_{T-1}, \boldsymbol{s}_T \}$ following $\pi_{\boldsymbol{\theta}}$ <br>
 $\quad$ <b>FOR</b> $~ t=0,\dots,T-1$ <br>
  $\quad\quad$ $G \leftarrow \sum_{k=t}^{T-1} r_k$ <br>
  $\quad\quad$ $\delta_t \leftarrow G - \hat{V}_{\boldsymbol{\omega}}(\boldsymbol{s}_t)$ <br>
  $\quad\quad$ $\boldsymbol{\theta} \leftarrow \boldsymbol{\theta} + \alpha_1 ~ \delta_t ~ \nabla_{\boldsymbol{\theta}} \ln \pi_{\boldsymbol{\theta}}(\boldsymbol{a}_t|\boldsymbol{s}_t)$ <br>
  $\quad\quad$ $\boldsymbol{\omega} \leftarrow \boldsymbol{\omega} + \alpha_2 ~ \delta_t \nabla_{\boldsymbol{\omega}}\hat{V}_{\boldsymbol{\omega}}(\boldsymbol{s}_t) $ <br>
<br>
<b>RETURN</b> $\boldsymbol{\theta}$

**Task 2.1**: Implement the `ValueNetwork` ($\hat{V}_{\boldsymbol{\omega}}$ in the algorithm) defined as follow.

`ValueNetwork` is a simple linear model. It takes an input tensor representing the state of the environment and outputs a tensor representing the estimated value of that state. The input tensor's shape should be (N, dim), where N is the number of state vectors in the batch and dim is the dimension of the state vectors.

The network has the following components:
- `linear_layer`: This is a linear (fully connected) layer that takes `n_observations` as input and outputs a single value.
- `forward` method: This method defines the forward pass of the network. It takes a state tensor as input and returns a tensor representing the estimated value of the state. It applies the linear layer to the input to get the final output.

This network is quite simple and may not perform well on complex tasks with large state spaces. However, it can be a good starting point for simple reinforcement learning tasks, and can be easily extended with more layers or different types of layers (such as convolutional layers for image inputs) to handle more complex tasks.

In [None]:
class ValueNetwork(torch.nn.Module):
    """
    A linear model that estimates the value of a state.

    Parameters
    ----------
    n_observations : int
        The number of observations in the state.

    Attributes
    ----------
    linear_layer : torch.nn.Linear
        A fully connected layer.
    """

    def __init__(self, n_observations: int):
        super(ValueNetwork, self).__init__()

        self.linear_layer = ... # TODO


    def forward(self, observation_tensor: torch.Tensor) -> torch.Tensor:
        """
        Perform a forward pass through the network.

        Parameters
        ----------
        observation_tensor : torch.Tensor
            The input tensor representing the observation.

        Returns
        -------
        torch.Tensor
            The output tensor representing the value of the observation.
        """

        x = ... # TODO

        return x

#### Implement the train function

**Task 2.2**: Implement the `train_reinforce_baseline_discrete` function, used to train a policy network and a value network using the REINFORCE with baseline algorithm in a given environment.

The function first initializes a policy network and a value network, along with their respective Adam optimizers. Then, for each training episode, it generates an episode using the current policy and calculates the return at each time step. It uses this return, the log probability of the action taken at that time step, and the estimated value of the state to compute the policy and value losses. These losses are used to update the policy and value network parameters using gradient ascent. The value loss is typically defined as the squared difference between the estimated return and the actual return.

In [None]:
def train_reinforce_baseline_discrete(
    env: gym.Env,
    num_train_episodes: int,
    num_test_per_episode: int,
    max_episode_duration: int,
    policy_learning_rate: float,
    value_learning_rate: float,
) -> Tuple[PolicyNetwork, List[float]]:
    """
    Train a policy using the REINFORCE with baseline algorithm.

    Parameters
    ----------
    env : gym.Env
        The environment to train in.
    num_train_episodes : int
        The number of training episodes.
    num_test_per_episode : int
        The number of tests to perform per episode.
    max_episode_duration : int
        The maximum length of an episode.
    policy_learning_rate : float
        The policy learning rate.
    value_learning_rate : float
        The value learning rate.

    Returns
    -------
    Tuple[PolicyNetwork, List[float]]
        The final trained policy and the average returns for each episode.
    """
    episode_avg_return_list = []

    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n.item()

    policy_nn = PolicyNetwork(state_size, action_size).to(DEVICE)
    policy_optimizer = torch.optim.Adam(policy_nn.parameters(), lr=policy_learning_rate)

    value_nn = ValueNetwork(state_size).to(DEVICE)
    value_optimizer = torch.optim.Adam(value_nn.parameters(), lr=value_learning_rate)

    value_loss_fn = torch.nn.MSELoss()

    for episode_index in tqdm(range(num_train_episodes)):
        # Generate an episode following the current policy
        episode_state_list, _, episode_reward_list, episode_log_prob_action_list = (
            sample_one_episode(
                env=env, policy_nn=policy_nn, max_episode_duration=max_episode_duration
            )
        )

        # Iterate over the episode
        for t in range(len(episode_reward_list)):

            # Calculate the return G_t at time step t
            G_t = ... # TODO

            # Convert G_t to a PyTorch tensor
            returns_tensor = ... # TODO

            # Get the log probability of the action taken at time t
            log_prob_actions_tensor = ... # TODO

            # Convert the state s_t to a PyTorch tensor
            state_tensor = ... # TODO

            # Compute the baseline value V(s_t)
            state_value = ... # TODO

            # Compute the advantage: delta_t = G_t - V(s_t)
            advantage = ... # TODO

            # Compute the policy loss (negative because we maximize J(Î¸))
            policy_loss = ... # TODO

            # Update the policy network
            policy_optimizer.zero_grad()
            policy_loss.backward()
            policy_optimizer.step()

            # Compute the value loss (MSE between predicted value and actual return)
            value_loss = ... # TODO

            # Update the value network
            value_optimizer.zero_grad()
            value_loss.backward()
            value_optimizer.step()

        # Test the current policy
        test_avg_return = avg_return_on_multiple_episodes(
            env=env,
            policy_nn=policy_nn,
            num_test_episode=num_test_per_episode,
            max_episode_duration=max_episode_duration
        )

        # Monitoring
        episode_avg_return_list.append(test_avg_return)

    return policy_nn, episode_avg_return_list

#### Train the agent

In [None]:
env = gym.make("CartPole-v1")

NUMBER_OF_TRAININGS = DEFAULT_NUMBER_OF_TRAININGS  # Change the default (global) value here if you want a specific number of trainings for this exercise
reinforce_baseline_trains_result_list = [[], [], []]

for train_index in range(NUMBER_OF_TRAININGS):
    # Train the agent
    reinforce_baseline_policy_nn, episode_reward_list = (
        train_reinforce_baseline_discrete(
            env=env,
            num_train_episodes=150,
            num_test_per_episode=5,
            max_episode_duration=500,
            policy_learning_rate=0.005,
            value_learning_rate=0.1,
        )
    )

    reinforce_baseline_trains_result_list[0].extend(range(len(episode_reward_list)))
    reinforce_baseline_trains_result_list[1].extend(episode_reward_list)
    reinforce_baseline_trains_result_list[2].extend(
        [train_index for _ in episode_reward_list]
    )

reinforce_baseline_trains_result_df = pd.DataFrame(
    np.array(reinforce_baseline_trains_result_list).T,
    columns=["num_episodes", "mean_final_episode_reward", "training_index"],
)
reinforce_baseline_trains_result_df["agent"] = "REINFORCE baseline"

# Save the action-value estimation function of the last train

torch.save(
    reinforce_baseline_policy_nn,
    MODELS_DIR / "lab6_reinforce_with_baseline_policy_network.pth",
)

env.close()

#### Plot results

In [None]:
g = sns.relplot(
    x="num_episodes",
    y="mean_final_episode_reward",
    kind="line",
    hue="agent",
    estimator=None,
    units="training_index",
    data=reinforce_baseline_trains_result_df,
    height=7,
    aspect=2,
    alpha=0.5,
)
plt.savefig(PLOTS_DIR / "lab6_reinforce_with_baseline_cartpole_trains_result.png")

In [None]:
g = sns.relplot(
    x="num_episodes",
    y="mean_final_episode_reward",
    hue="agent",
    kind="line",
    data=reinforce_baseline_trains_result_df,
    height=7,
    aspect=2,
)
plt.savefig(PLOTS_DIR / "lab6_reinforce_with_baseline_cartpole_trains_result_agg.png")

#### Test final policy

In [None]:
VIDEO_PREFIX_EX2_REINFORCE_WITH_BASELINE_TRAINED = "lab6_reinforce_with_baseline_trained"

NUM_EPISODES = 3

file_path_list = [
    FIGS_DIR / f"{VIDEO_PREFIX_EX2_REINFORCE_WITH_BASELINE_TRAINED}-episode-{episode_index}.mp4"
    for episode_index in range(NUM_EPISODES)
]

for file_path in file_path_list:
    file_path.unlink(missing_ok=True)

env = gym.make("CartPole-v1", render_mode="rgb_array")
env = gym.wrappers.RecordVideo(
    env,
    video_folder=str(FIGS_DIR),
    name_prefix=VIDEO_PREFIX_EX2_REINFORCE_WITH_BASELINE_TRAINED,
    episode_trigger=lambda x: True,
)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=NUM_EPISODES)

for episode_index in range(NUM_EPISODES):
    episode_states, episode_actions, episode_rewards, episode_log_prob_actions = ... # TODO

print(f"Episode time taken: {env.time_queue}")
print(f"Episode total rewards: {env.return_queue}")
print(f"Episode lengths: {env.length_queue}")

env.close()

print("\nSelect the episode to play here ðŸ‘‡\n")

interact(video_selector, file_path=file_path_list);

In [None]:
reinforce_baseline_trains_result_df

#### Score

In [None]:
score_ex2 = reinforce_baseline_trains_result_df[["num_episodes", "mean_final_episode_reward"]].groupby("num_episodes").mean().max()
score_ex2