# INF639 Lab4: Inverse Reinforcement Learning

<img src="https://raw.githubusercontent.com/jeremiedecock/polytechnique-inf581-2023/master/logo.jpg" style="float: left; width: 15%" />

[INF639-2023](https://moodle.polytechnique.fr/course/view.php?id=17866) Lab session #4

2022-2023 Mohamed ALAMI

[![Open in Google Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/jeremiedecock/polytechnique-inf639-2023-students/blob/master/lab4_irl.ipynb)

[![My Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/jeremiedecock/polytechnique-inf639-2023-students/master?filepath=lab4_irl.ipynb)

[![NbViewer](https://raw.githubusercontent.com/jupyter/design/master/logos/Badges/nbviewer_badge.svg)](https://nbviewer.jupyter.org/github/jeremiedecock/polytechnique-inf639-2023-students/blob/master/lab4_irl.ipynb)

[![Local](https://img.shields.io/badge/Local-Save%20As...-blue)](https://github.com/jeremiedecock/polytechnique-inf639-2023-students/raw/master/lab4_irl.ipynb)

## Introduction

The purpose of this lab is to introduce some classic algorithms of Imitation learning and Inverse Reinforcement Learning. We will see how they work, their caveats and benefits.

You can either:
- open, edit and execute the notebook in *Google Colab* following this link: https://colab.research.google.com/github/jeremiedecock/polytechnique-inf639-2023-students/blob/master/lab4_irl.ipynb ; this is the **recommended** choice as you have nothing to install on your computer
- open, edit and execute the notebook in *MyBinder* (if for any reason the Google Colab solution doesn't work): https://mybinder.org/v2/gh/jeremiedecock/polytechnique-inf639-2023-students/master?filepath=lab4_irl.ipynb
- download, edit and execute the notebook on your computer if Python3 and JypyterLab are already installed: https://github.com/jeremiedecock/polytechnique-inf639-2023-students/raw/master/lab4_irl.ipynb

If you work with Google Colab or MyBinder, **remember to save or download your work regularly or you may lose it!**

## Setup the Python environment

### Install required libraries

In [None]:
# These installs are necessary for packages compatibility and visualization
!sudo apt install swig
!apt-get install -y xvfb x11-utils
!pip install pyvirtualdisplay==0.2.* PyOpenGL==3.1.* PyOpenGL-accelerate==3.1.*
!apt-get update && apt-get install ffmpeg freeglut3-dev xvfb
!pip install stable-baselines3[extra] pyglet==1.5.27
!pip install sb3_contrib
!pip install box2d-py
!pip install gym[box2d]==0.25

### Import required packages

In [None]:
import stable_baselines3
print(stable_baselines3.__version__)
import gym
print(gym.__version__)
from gym import spaces
import numpy as np
import time
from tqdm import trange, tqdm_notebook
import matplotlib.pyplot as plt
from IPython import display
import matplotlib
from matplotlib.pyplot import figure
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import pyvirtualdisplay
from typing import List, Tuple, Dict, Any, Callable, Optional
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
from torch.utils.data import DataLoader

## Define utility functions

These functions are necessary to produce videos of your agent at work.

In [None]:
def update_scene(num: int, frames: List, patch) -> Tuple:
    """
    Update the scene for animation.

    Parameters
    ----------
    num : int
        The frame number.
    frames : list
        The list of frames.
    patch : matplotlib object
        The image to b updated.

    Returns
    -------
    tuple
        A tuple containing the updated image.
    """
    patch.set_data(frames[num])
    return patch,


def plot_animation(frames: List, repeat: bool = False, interval: int = 40) -> FuncAnimation:
    """
    Plot an animation.

    Parameters
    ----------
    frames : list
        The list of frames.
    repeat : bool, optional
        Whether to repeat the animation, by default False.
    interval : int, optional
        The interval between frames, by default 40.

    Returns
    -------
    FuncAnimation
        The animation object.
    """
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim


def generate_video(env: gym.Env, model: torch.nn.Module, n_timesteps: int = 300, initial_state=None) -> matplotlib.animation.ArtistAnimation:
    """
    Generate a video of the environment based on the model's actions.

    Parameters
    ----------
    env : gym.Env
        The environment to generate the video from.
    model : torch.nn.Module
        The model that predicts the actions.
    n_timesteps : int, optional
        The number of timesteps to run the simulation for, by default 300.
    initial_state : array_like, optional
        The initial state to start the simulation from, by default None.

    Returns
    -------
    matplotlib.animation.ArtistAnimation
        The generated video as a matplotlib animation.

    Raises
    ------
    Exception
        If the model fails to predict an action.
    """
    if initial_state is None:
        obs = env.reset()
    else:
        env = env.unwrapped  # to access the inner functionalities of the class
        env.state = initial_state
        obs = env.state

    figure(figsize=(8, 6), dpi=80)

    # use False con Xvfb
    _display = pyvirtualdisplay.Display(visible=False, size=(1400, 900))
    _ = _display.start()
    #model = model.float()

    frames = []

    for t in range(n_timesteps):
        #BEGIN ANSWER

        # TODO: ... (here you have to play for n_timesteps)

        #END ANSWER

        frame = env.render(mode='rgb_array')
        frames.append(frame)
        #env.render()
        time.sleep(.025)

    anim = plot_animation(frames)
    return anim

## Part 1: Behavioral Cloning

*Behavioral Cloning* ([D. A. Pomerleau, *Efficient Training of Artificial Neural Networks for Autonomous Navigation*, Neural Computation, vol. 3, no. 1, pp. 88–97, 1991](https://cours.etsmtl.ca/sys843/REFS/ORG/pomerleau_alvinn.pdf)) represents the most fundamental approach to imitation learning. The principle is simple: an *expert* plays a game perfectly and the learning agent has just to act as a copy cat. For each state, it has to predict what the expert would have done.
Usually the expert demonstrations are typically retrieved from recording human behaviour, but for the purpose of the exercise, we well consider fully trained RL models as expert and we will try to recover their optimal policy through behavioral cloning.

The algorithm is recalled below.

<b>Input</b>:<br>
	$\quad\quad$ Environment and expert model<br>
<b>Algorithm parameter</b>:<br>
	$\quad\quad$ number of episodes $n$; number of epochs: epochs<br>
<br>
Train the expert model and get $\pi_{\text{exp}}$ <br>
For episode in range($n$): <br>
$\quad$ Initialise environment at state $s$ <br>
$\quad$ While not done: <br>
$\quad \quad$ action $a = \pi_{exp}(s)$ <br>
$\quad \quad$  $s_{1}$, done = env($a$) <br>
$\quad \quad$ store ($s,a$) in dictionary<br>
$\quad \quad$ $s=s_1$ <br>

Initialise model $\pi$ <br>
For $k$ in range(epochs) <br>
$\quad$ Train $\pi$ for each $(s,a)$ in dictionary such as $\pi(s)=a$

You can find additional information about *Behavioral Cloning* in section 18.1 of *Algorithms for decision making* by M.J. Kochenderfer, T.A. Wheeler and K.H. Wray at MIT press (freely available online at https://algorithmsbook.com/files/dm.pdf).

### Import required packages

#### Cartpole

We will use The Cartpole environment as a first example. More details on this environment are available [here](https://gymnasium.farama.org/environments/classic_control/cart_pole/).

In [None]:
env = gym.make("CartPole-v1", render_mode="rgb_array")

The aim of this lab is not to train the expert so we can either consider models that are already trained on Cartpole or use Stable_baselines, a framework that allows to train several heavily tested famous algorithms in one line. More details [here](https://stable-baselines.readthedocs.io/en/master/).

You can either train the expert model or load a pretrained model.

Uncomment the following lines if you want to train the expert model (should take 10 min max):

In [None]:
## We first initialise the model by saying that we will use the PPO algorithm and use a classic
## MLP parameterisation
# cartpole_expert = PPO(MlpPolicy, env, verbose=1)

## Then the model can easily be trained in just one line
# cartpole_expert.learn(total_timesteps=250000)

## You can save or load your model easily
# cartpole_expert.save("lab4_cartpole.zip")

Otherwise, uncomment the following cell to load a pretrained expert model:

In [None]:
#!wget https://github.com/jeremiedecock/polytechnique-inf639-2023-students/raw/master/lab4_cartpole.zip

In [None]:
cartpole_expert = PPO.load("lab4_cartpole.zip")

In [None]:
env = gym.make('CartPole-v1', new_step_api=True)
anim = generate_video(env, cartpole_expert)
HTML(anim.to_html5_video())

This function is used to create expert trajectories that will be used in our Behavior Cloning model

In [None]:
def generate_expert_traj(model: torch.nn.Module, env: gym.Env, n_episodes: int = 100) -> Dict[str, np.array]:
    """
    Generate expert trajectories using the provided model and environment.

    Parameters
    ----------
    model : torch.nn.Module
        The model that predicts the actions.
    env : gym.Env
        The environment to generate the trajectories from.
    n_episodes : int, optional
        The number of episodes to run the simulation for, by default 100.

    Returns
    -------
    dict
        A dictionary containing the actions, observations, rewards, episode returns, and episode starts.
    """
    # Sanity check
    assert (isinstance(env.observation_space, spaces.Box) or
            isinstance(env.observation_space, spaces.Discrete)), "Observation space type not supported"

    assert (isinstance(env.action_space, spaces.Box) or
            isinstance(env.action_space, spaces.Discrete)), "Action space type not supported"

    actions = []
    observations = []
    rewards = []
    episode_returns = np.zeros((n_episodes,))
    episode_starts = []  #Index of new episodes initial states

    obs = env.reset()
    episode_starts.append(True)
    reward_sum = 0.0
    idx = 0

    for ep_idx in tqdm_notebook(range(n_episodes), desc='Epoch', leave=True):

        #BEGIN ANSWER

        # TODO: ... (play the game using your expert model and store observations, actions and rewards in lists)

        #END ANSWER

    if isinstance(env.observation_space, spaces.Box):
        observations = np.concatenate(observations).reshape((-1,) + env.observation_space.shape)
    elif isinstance(env.observation_space, spaces.Discrete):
        observations = np.array(observations).reshape((-1, 1))

    if isinstance(env.action_space, spaces.Box):
        actions = np.concatenate(actions).reshape((-1,) + env.action_space.shape)
    elif isinstance(env.action_space, spaces.Discrete):
        actions = np.array(actions).reshape((-1, 1))

    rewards = np.array(rewards)
    episode_starts = np.array(episode_starts[:-1])

    assert len(observations) == len(actions)

    numpy_dict = {
        'actions': actions,
        'obs': observations,
        'rewards': rewards,
        'episode_returns': episode_returns,
        'episode_starts': episode_starts
    }

    for key, val in numpy_dict.items():
        print(key, val.shape)

    env.close()

    return numpy_dict

In [None]:
# Generate expert trajectories
cartpole_demos = generate_expert_traj(cartpole_expert, env)

The Behavioral Cloning model is defined in the following cell.

In [None]:
class Net(nn.Module):
    """
    A feed-forward neural network.

    Attributes
    ----------
    fc1 : torch.nn.Linear
        The first fully connected layer.
    fc2 : torch.nn.Linear
        The second fully connected layer.
    fc3 : torch.nn.Linear
        The third fully connected layer.

    Methods
    -------
    forward(x)
        Passes the input through the network.
    """

    def __init__(self):
        """
        Initializes the layers of the network.
        """
        super(Net, self).__init__()

        self.fc1 = nn.Linear(4, 32)
        self.fc2 = nn.Linear(32, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):
        """
        Passes the input through the network.

        Parameters
        ----------
        x : torch.Tensor
            The input to the network.

        Returns
        -------
        torch.Tensor
            The output of the network.
        """
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))

        return x

This function transforms the dataset of expert state-actions pairs into a Dataset class. That makes it easier to train in batches

In [None]:
class NumpyDataset(data.Dataset):
    def __init__(self, expert_trajectories_dict: Dict[str, np.array], transform: Optional[Callable] = None):
        """
        Initialize the dataset.

        Parameters
        ----------
        array : Dict[str, torch.Tensor]
            The input data as a dictionary.
        transform : Optional[Callable], optional
            A function/transform that takes in an torch.Tensor and returns a transformed version.
        """
        super().__init__()
        self.obs = torch.FloatTensor(expert_trajectories_dict["obs"])
        self.actions = torch.FloatTensor(expert_trajectories_dict["actions"])
        self.transform = transform

    def __len__(self) -> int:
        """
        Get the length of the dataset.

        Returns
        -------
        int
            The length of the dataset.
        """
        return len(self.obs)

    def __getitem__(self, index: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Get the data at the given index.

        Parameters
        ----------
        index : int
            The index to get the data from.

        Returns
        -------
        Tuple[torch.Tensor, torch.Tensor]
            The observation and action at the given index.
        """
        obs = self.obs[index]
        action = self.actions[index]
        if self.transform:
            obs = self.transform(obs)
            action = self.transform(action)
        return obs, action

In [None]:
loader_args = dict(batch_size=128, shuffle=True)
train_dset = NumpyDataset(cartpole_demos)
train_loader = data.DataLoader(train_dset, **loader_args)
device = "cpu"

In [None]:
def plot_train_curves(epochs: int, train_losses: List[float], test_losses: Optional[List[float]] = None, title: str = '') -> None:
    """
    Plot training and testing losses over epochs.

    Parameters
    ----------
    epochs : int
        The number of epochs.
    train_losses : List[float]
        The list of training losses.
    test_losses : Optional[List[float]], optional
        The list of testing losses, by default None.
    title : str, optional
        The title of the plot, by default ''.
    """
    x = np.linspace(0, epochs, len(train_losses))
    plt.figure()
    plt.plot(x, train_losses, label='train_loss')
    if test_losses:
        plt.plot(x, test_losses, label='test_loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(title)
    plt.legend()
    plt.show()


def train(model: nn.Module, train_loader: DataLoader, criterion: nn.Module, optimizer: optim.Optimizer) -> float:
    """
    Train the model for one epoch.

    Parameters
    ----------
    model : nn.Module
        The model to train.
    train_loader : DataLoader
        The DataLoader for the training data.
    criterion : nn.Module
        The loss function.
    optimizer : optim.Optimizer
        The optimizer.

    Returns
    -------
    float
        The average loss for this epoch.
    """
    # the criterion parameter is a placeholder for the loss function you might choose afterwards
    model.train()
    total_loss = 0

    for obs, action in train_loader:
        #BEGIN ANSWERS

        # TODO: ... (train the model and return the average loss)

        #END ANSWER

        return avg_loss.item()


def train_epochs(model: nn.Module, train_loader: DataLoader, criterion: nn.Module, test_loader: Optional[DataLoader] = None, train_args: Optional[Dict[str, int]] = None, plot: bool = True) -> Tuple[List[float], List[float]]:
    """
    Train the model for multiple epochs.

    Parameters
    ----------
    model : nn.Module
        The model to train.
    train_loader : DataLoader
        The DataLoader for the training data.
    criterion : nn.Module
        The loss function.
    test_loader : Optional[DataLoader], optional
        The DataLoader for the testing data, by default None.
    train_args : Optional[Dict[str, int]], optional
        The training arguments (epochs and learning rate), by default None.
    plot : bool, optional
        Whether to plot the training curve, by default True.

    Returns
    -------
    Tuple[List[float], List[float]]
        The training and testing losses for each epoch.
    """
    # training parameters
    epochs, lr = train_args['epochs'], train_args['lr']
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses, test_losses = [], []
    for epoch in tqdm_notebook(range(epochs), desc='Epoch', leave=False):
        model.train()
        train_loss = train(model, train_loader, criterion, optimizer)
        train_losses.append(train_loss)

    if plot:
        plot_train_curves(epochs, train_losses, test_losses, title='Training Curve')
    return train_losses, test_losses

In [None]:
criterion = nn.BCELoss()

cartpole_BC_model = Net()
train_args = dict(epochs=3000, lr=0.0001)
train_losses, test_losses = train_epochs(cartpole_BC_model, train_loader, criterion, train_args=train_args)

In [None]:
anim = generate_video(env, cartpole_BC_model)
HTML(anim.to_html5_video())

Even with a very quick training, we see that the agent is pretty good. But is it that good ?

Now let's try with a different initial state

In [None]:
obs = env.reset()
state_init = np.array([1.0, obs[1], obs[2], obs[3]]) #we slightly move the initial position of the cart

anim = generate_video(env, cartpole_BC_model, initial_state=state_init)
HTML(anim.to_html5_video())

When changing the initial state, we put the agent in a situation the expert has never seen; therefore the agent is lost and does not know what is the good action to perform.

Actually it is not even necessary to change the initial state. The agent is never absolutely perfect, so with a long enough horizon, it will make some mistakes that stack up and drive it into states that the expert has never seen (as it only explores best states).  

### Lunar Lander

Let's test with the Lunar Lander environment. The documentation of this environment is available here: https://gymnasium.farama.org/environments/box2d/lunar_lander/.

In [None]:
env = gym.make('LunarLander-v2')

You can either train the expert model or load a pretrained model.

Uncomment the following lines if you want to train the expert model (should take 10 min max):

In [None]:
# model = PPO(MlpPolicy, env, verbose=1)
# model.learn(total_timesteps=250000)

# model.save("lab4_lander.zip")

Otherwise, uncomment the following cell to load a pretrained expert model:

In [None]:
#!wget https://github.com/jeremiedecock/polytechnique-inf639-2023-students/raw/master/lab4_lander.zip

In [None]:
expert_lander = PPO.load("lab4_lander.zip")

In [None]:
env = gym.make('LunarLander-v2', new_step_api=True)
anim = generate_video(env, expert_lander, 400)
HTML(anim.to_html5_video())

In [None]:
Lander_demos = generate_expert_traj(expert_lander, env)

The Behavioral Cloning model is defined in the following cell.

In [None]:
class Lander_Net(nn.Module):
    """
    A feed-forward neural network for the Lunar Lander task.

    Attributes
    ----------
    fc1 : torch.nn.Linear
        The first fully connected layer.
    fc2 : torch.nn.Linear
        The second fully connected layer.
    fc3 : torch.nn.Linear
        The third fully connected layer.
    fc4 : torch.nn.Linear
        The fourth fully connected layer.
    fc5 : torch.nn.Linear
        The fifth fully connected layer.

    Methods
    -------
    forward(x)
        Passes the input through the network.
    """

    def __init__(self):
        """
        Initializes the layers of the network.
        """
        super(Lander_Net, self).__init__()

        self.fc1 = nn.Linear(8, 24)
        self.fc2 = nn.Linear(24, 64)
        self.fc3 = nn.Linear(64, 128)
        self.fc4 = nn.Linear(128, 256)
        self.fc5 = nn.Linear(256, 4)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Passes the input through the network.

        Parameters
        ----------
        x : torch.Tensor
            The input to the network.

        Returns
        -------
        torch.Tensor
            The output of the network.
        """
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = F.softmax(self.fc5(x))

        return x

In [None]:
loader_args = dict(batch_size=128, shuffle=True)
train_dset = NumpyDataset(Lander_demos)
train_loader = data.DataLoader(train_dset, **loader_args)

#BEGIN ANSWER
criterion = ... # TODO: choose the right loss function
#END ANSWER

Lander_BC_model = Lander_Net()
train_args = dict(epochs=2000, lr=0.001)
train_losses, test_losses = train_epochs(Lander_BC_model, train_loader, criterion, train_args=train_args)

In [None]:
env = gym.make('LunarLander-v2', new_step_api=True)
anim = generate_video(env, Lander_BC_model, 500)
HTML(anim.to_html5_video())

### Lunar Lander stochastic

Now let's try something else. We will add wind in the environment to make the dynamics of the environement stochastic.

In [None]:
env = gym.make('LunarLander-v2', enable_wind=True)

Again, you can either train the expert model or load a pretrained model.

Uncomment the following lines if you want to train the expert model (should take 10 min max):

In [None]:
# model = PPO(MlpPolicy, env, verbose=1)
# model.learn(total_timesteps=250000)
# model.save("lab4_windy_lander.zip")

Or, uncomment the following cell to load a pretrained expert model:

In [None]:
#!wget https://github.com/jeremiedecock/polytechnique-inf639-2023-students/raw/master/lab4_windy_lander.zip

In [None]:
expert_windy_lander = PPO.load("lab4_windy_lander.zip")

In [None]:
env = gym.make('LunarLander-v2', enable_wind=True, new_step_api=True)
anim = generate_video(env, expert_windy_lander, 400)
HTML(anim.to_html5_video())

In [None]:
Lander_demos = generate_expert_traj(expert_windy_lander, env)

In [None]:
device = "cpu"
loader_args = dict(batch_size=128, shuffle=True)
train_dset = NumpyDataset(Lander_demos)
train_loader = data.DataLoader(train_dset, **loader_args)

#BEGIN ANSWER
criterion = ... # TODO: choose the right loss function
#END ANSWER

Windy_Lander_BC_model = Lander_Net()
train_args = dict(epochs=2000, lr=0.001)
train_losses, test_losses = train_epochs(Windy_Lander_BC_model, train_loader, criterion, train_args=train_args)

In [None]:
anim = generate_video(env, Windy_Lander_BC_model, 500)
HTML(anim.to_html5_video())

The agent fails because stochastic environments increases "mistakes" as well as the chances to end in non optimal states that the expert never encountered

## Part 2: DAgger

To mitigate that problem, a classic solution is to use DAgger algorithm ([S. Ross, G. J. Gordon, and J. A. Bagnell, *A Reduction of Imitation Learning and Structured Prediction to No-Regret Online Learning*, in International Conference on Artificial Intelligence and Statistics (AISTATS), vol. 15, 2011](http://proceedings.mlr.press/v15/ross11a/ross11a.pdf)). The idea is simple. We just have to create a feedback loop between the agent and the expert. After each roll or episode of the agent, its trajectory (the list of states) it encountered are given back to the expert, who attribute to them the right action to perform. The nex state-action pairs are then added to the original dataset, and a new training is done on the aggregated dataset

Below is the algorithm's pseudocode: <br>
<b>Input</b>:<br>
environment, expert model, agent model, number of iterations, number of trajectories to sample from the agent at each iteration <br>

<b>Algorithm</b>: <br>

Initialise dataset <br>
<b>FOR</b> $i$ in range(number of iterations): <br>
$\quad$ traj $\xleftarrow{}$ generate model trajectories <br>
$\quad$ get states from traj <br>
$\quad$ get expert actions for states <br>
$\quad$ add states and actions to dataset <br>
$\quad$ train agent on dataset


You can find additional information about *DAgger* in section 18.2 of *Algorithms for decision making* by M.J. Kochenderfer, T.A. Wheeler and K.H. Wray at MIT press (freely available online at https://algorithmsbook.com/files/dm.pdf).

In [None]:
def get_expert_actions(model: nn.Module, states: np.ndarray) -> np.ndarray:
    """
    Extract expert actions from given states.

    Parameters
    ----------
    model : nn.Module
        The model to predict actions.
    states : np.ndarray
        The states to predict actions for.

    Returns
    -------
    np.ndarray
        The predicted actions.
    """
    # BEGIN ANSWER
    actions = ...  # TODO
    # END ANSWER
    return actions

def DAgger(env: gym.Env, expert: nn.Module, model: nn.Module, n_iterations: int = 10, n_traj: int = 10, iter_plot: int = 1) -> None:
    """
    Implementation of the DAgger algorithm.

    Parameters
    ----------
    env : gym.Env
        The environment to perform the algorithm in.
    expert : nn.Module
        The expert model.
    model : nn.Module
        The model to train.
    n_iterations : int, optional
        The number of iterations to perform, by default 10.
    n_traj : int, optional
        The number of trajectories to generate, by default 10.
    iter_plot : int, optional
        The number of iterations between plots, by default 1.
    """
    dataset = {'obs': None, 'actions': None}
    losses = []
    # BEGIN ANSWER
    criterion = ...   # TODO
    # END ANSWER

    for i in tqdm_notebook(range(n_iterations)):
        # BEGIN ANSWER

        # TODO...

        train_losses, test_losses = ... # TODO

        # END ANSWER

        if losses == []:
            losses = train_losses
        else:
            losses = np.concatenate((losses, np.array(train_losses)))
        if n_iterations % iter_plot == 0:
            plt.plot(losses)
            plt.show()

In [None]:
Dagger_Lander = Lander_Net()
device = "cpu"
loader_args = dict(batch_size=128, shuffle=True)

train_args = dict(epochs=100, lr=0.001)

env = gym.make('LunarLander-v2', new_step_api=True)
DAgger(env, expert_lander, Dagger_Lander)

In [None]:
anim = generate_video(env, Dagger_Lander, 500)
HTML(anim.to_html5_video())

In [None]:
Windy_Dagger_Lander = Lander_Net()
device = "cpu"
loader_args = dict(batch_size=128, shuffle=True)
train_args = dict(epochs=100, lr=0.001)

DAgger(env, expert_windy_lander, Windy_Dagger_Lander, n_iterations = 10, n_traj=50, iter_plot=1)

In [None]:
anim = generate_video(env, Windy_Dagger_Lander, 500)
HTML(anim.to_html5_video())

## Part 3: MaxEnt IRL (bonus)

It would be great if we could avoid having the expert in the loop while making sure that different initial states or stochastic dynamics have a limited impact on the agent performance. That is the aim of Inverse Reinforcement Learning. Instead of learning a policy given a reward, we learn a reward given expert demonstrations. The aim is to retrieve the reward function that the expert followed and then use that learned reward to train a classic RL algorithm.

Below is the *Maximum Entropy Inverse Reinforcement Learning* pseudocode ([B. D. Ziebart, A. Maas, J. A. Bagnell, and A. K. Dey, *Maximum Entropy Inverse Reinforcement Learning*, in AAAI Conference on Artificial Intelligence (AAAI), 2008](https://cdn.aaai.org/AAAI/2008/AAAI08-227.pdf)).

<b>Initialise</b>:<br>
- feature_matrix as an identity matrix of dimension number_of_states <br>
- expert feature expectations as a vector of size number_of_states
- environment to get initial state $s$<br>
- q_table $Q$<br>
- $\theta$ and learner_feature_expectations as vectors of size number_of_states ($\theta$ values are negative)<br>
- $\gamma$ the discount factor
- learning_rate
- update_freq: frequency of reward update
<b>FOR</b> episode in number_episodes: <br>
$\quad$ <b> IF </b> episode % update==0: <br>
$\quad \quad$ gradient = expert_feature_expectations - learner_feature_expectations/episode <br>
$\quad \quad$ $\theta$ += learning_rate*gradient <br>
$\quad \quad$ clip $\theta$: all values that are greater to 0 are fixed to 0 <br>
$\quad$ <b> While</b> True: <br>
$\quad \quad$ Draw $a\sim\mathcal{U}[0,1]$ <br>
$\quad \quad$ <b>IF</b>: $a<\epsilon$ <br>
$\quad \quad\quad$ Draw action randomly for action space <br>
$\quad \quad$ <b> ELSE </b>: action = $\arg\max_a$ q_table[$s$] <br>
$\quad \quad$ next_state = env(action) <br>
$\quad \quad$ reward = $\theta$.feature_matrix[next_state]$^T$ <br>
$\quad \quad$ $Q(s,action) = reward+\gamma \max_aQ(next state,a)$ <br>
$\quad \quad$  learner_feature_expectations += feature_matrix(state) <br>
$\quad \quad$ <b>IF</b> done: break

You can find additional information about *Maximum Entropy Inverse Reinforcement Learning* in section 18.5 of *Algorithms for decision making* by M.J. Kochenderfer, T.A. Wheeler and K.H. Wray at MIT press (freely available online at https://algorithmsbook.com/files/dm.pdf).

Let's test with the Mountain Car environment. The documentation of this environment is available here: https://gymnasium.farama.org/environments/classic_control/mountain_car/.

In [None]:
env = gym.make("MountainCar-v0")

You can either train the expert model or load a pretrained model.

Uncomment the following lines if you want to train the expert model (should take 10 min max):

In [None]:
# model = PPO(MlpPolicy, env, verbose=1)
# model.learn(total_timesteps=250000)
# model.save("lab4_mountain_car.zip")

Otherwise, uncomment the following cell to load a pretrained expert model:

In [None]:
#!wget https://github.com/jeremiedecock/polytechnique-inf639-2023-students/raw/master/lab4_mountain_car.zip

In [None]:
expert_car = PPO.load("lab4_mountain_car.zip")

In [None]:
env = gym.make("MountainCar-v0", new_step_api=True)
anim = generate_video(env, expert_car, 400)
HTML(anim.to_html5_video())

In [None]:
car_demos = generate_expert_traj(expert_car, env)

In [None]:
# The state space of mountain car is continous. As we use Q-learning we need finite state spaces. We therefore discretize it
# The state space will become an array n_bins*n_bins (one dimension per state dimension)
n_actions = env.action_space.n
n_bins = 30
n_states = n_bins**2
feature_matrix = np.eye((n_states))

def feature_vector(state: np.ndarray, env: gym.Env = env, bins: int = n_bins) -> Tuple[np.ndarray, int]:
    """
    For each state, this function associates it to the corresponding state index and builds the feature vector.
    The feature vector is a vector of shape n_bins with a 1 at the state index.

    Parameters
    ----------
    state : np.ndarray
        The state to convert into a feature vector.
    env : gym.Env, optional
        The environment the state belongs to, by default env.
    bins : int, optional
        The number of bins to use for discretization, by default n_bins.

    Returns
    -------
    Tuple[np.ndarray, int]
        The feature vector and the vector id.
    """
    try:
        idx = np.array(np.arange(bins**2)).reshape(bins,bins)
        bin_length = (env.observation_space.high - env.observation_space.low)/(bins-1)
        bin_id = (state - env.observation_space.low)/bin_length
        vector_id = idx[int(bin_id[0]), int(bin_id[1])]
    except:
        print(state, env.observation_space.high, env.observation_space.low,  bin_length, bin_id)
    return feature_matrix[vector_id], vector_id


def process_demos(demos: Dict[str, np.ndarray]) -> Dict[str, List[np.ndarray]]:
    """
    Classify the state-action pairs that belong to the same trajectory.

    Parameters
    ----------
    demos : Dict[str, np.ndarray]
        The demonstrations containing state-action pairs.

    Returns
    -------
    Dict[str, List[np.ndarray]]
        The demonstrations classified by trajectory.
    """
    demonstrations = {}
    demonstrations['obs'] = []
    demonstrations["actions"] = []
    new_episodes_idx = np.where(demos["episode_starts"]==True)[0]
    for i in range(new_episodes_idx.shape[0]-1):
        demonstrations['obs'].append(demos['obs'][new_episodes_idx[i]:new_episodes_idx[i+1]])
        demonstrations['actions'].append(demos['actions'][new_episodes_idx[i]:new_episodes_idx[i+1]])
    return demonstrations


def expert_feature_expectations(feature_matrix: np.ndarray, demonstrations: Dict[str, List[np.ndarray]], n_states: int = n_states) -> np.ndarray:
    """
    Build expert_feature_expectations from demonstrations.
    Feature expectation is a matrix of size (n_demonstrations, n_states), each line is the sum of all feature_vector for each state encountered by the expert
    in a given trajectory. More specifically, the feature vector of state 1 can be something like [1,0,0]. If the expert followed the trajectory:
    state1, state2, state1, state 3, then the feature expectation for this demonstration is [2,1,1]. Then we take the mean of the matrix for all demonstrations
    to get the expert_feature expectations.

    Parameters
    ----------
    feature_matrix : np.ndarray
        The feature matrix to use for building the feature expectations.
    demonstrations : Dict[str, List[np.ndarray]]
        The demonstrations to use for building the feature expectations.
    n_states : int, optional
        The number of states, by default n_states.

    Returns
    -------
    np.ndarray
        The expert feature expectations.
    """
    n_demonstrations = len(demonstrations['obs'])
    feature_expectations = np.zeros((n_demonstrations,n_states))

    #BEGIN ANSWER
    
    # TODO...

    feature_expectations = ... # TODO

    #END ANSWER

    return feature_expectations

In [None]:
demonstrations = process_demos(car_demos)
expert_expectations = expert_feature_expectations(feature_matrix, demonstrations)
print(expert_expectations.shape)

In [None]:
def maxent_irl(expert: np.ndarray, learner: np.ndarray, theta: np.ndarray, learning_rate: float) -> np.ndarray:
    """
    This function calculates the gradient and updates theta.

    Parameters
    ----------
    expert : np.ndarray
        The expert feature expectations.
    learner : np.ndarray
        The learner feature expectations.
    theta : np.ndarray
        The current theta values.
    learning_rate : float
        The learning rate for the theta update.

    Returns
    -------
    np.ndarray
        The updated theta values.
    """
    #BEGIN ANSWER
    
    # TODO...

    #END ANSWER

    return theta


def get_reward(state: np.ndarray, theta: np.ndarray) -> float:
    """
    Calculates the irl_reward of a given state.

    Parameters
    ----------
    state : np.ndarray
        The state for which to calculate the reward.
    theta : np.ndarray
        The current theta values.

    Returns
    -------
    float
        The calculated reward.
    """
    #BEGIN ANSWER
    irl_reward = ... # TODO
    #END ANSWER
    return irl_reward


def update_q_table(q_table: np.ndarray, state: int, action: int, reward: float, next_state: int) -> np.ndarray:
    """
    Performs Q-Learning update.

    Parameters
    ----------
    q_table : np.ndarray
        The current Q-table.
    state : int
        The current state.
    action : int
        The action taken.
    reward : float
        The reward received.
    next_state : int
        The state transitioned to.

    Returns
    -------
    np.ndarray
        The updated Q-table.
    """
    #BEGIN ANSWER

    # TODO...

    #END ANSWER
    return q_table

In [None]:
env = gym.make("MountainCar-v0")
learner_feature_expectations = np.zeros(n_states)
theta = - np.random.uniform(size=(n_states,))
q_table = np.zeros((n_states, n_actions))
gamma = 0.99
q_learning_rate = 0.03
theta_learning_rate = 0.05
eps = 0.2

episodes, scores, mean_scores = [], [], []

for episode in tqdm_notebook(range(30000),desc="Episode", leave=True):
    state = env.reset()
    score = 0

    if episode !=0 and episode % 5000 == 0:
        learner = learner_feature_expectations / episode
        theta = maxent_irl(expert_expectations, learner, theta, theta_learning_rate)

    if episode !=0 and episode % 1000 == 0:
        plt.plot(mean_scores)
        plt.show()

    while True:
        _, state_id = feature_vector(state)
        a = np.random.uniform()
        if a < eps:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state_id])

        next_state, reward, done, _ = env.step(action)
        irl_reward = get_reward(state, theta)
        _, next_state_id = feature_vector(next_state)
        q_table = update_q_table(q_table, state_id, action, irl_reward, next_state_id)
        learner_feature_expectations += feature_matrix[int(state_id)]
        score += reward
        state = next_state

        if done:
            scores.append(score)
            episodes.append(episode)
            mean_scores.append(np.mean(scores))
            break

In [None]:
def generate_car_video(env: gym.Env, q_table: np.ndarray, n_timesteps: int = 300) -> matplotlib.animation.FuncAnimation:
    """
    Generate a video of the car's movement based on the Q-table.

    Parameters
    ----------
    env : gym.Env
        The environment in which the car is moving.
    q_table : np.ndarray
        The Q-table guiding the car's movements.
    n_timesteps : int, optional
        The number of timesteps to include in the video, by default 300.

    Returns
    -------
    matplotlib.animation.FuncAnimation
        The animation object representing the video.
    """
    obs = env.reset()
    figure(figsize=(8, 6), dpi=80)

    # use False con Xvfb
    _display = pyvirtualdisplay.Display(visible=False, size=(1400, 900))
    _ = _display.start()

    frames = []
    for t in range(n_timesteps):
        state_id = feature_vector(obs)[1]
        action = np.argmax(q_table[state_id])
        obs, rewards, done, info = env.step(action)
        frame = env.render(mode='rgb_array')
        frames.append(frame)
        time.sleep(.025)

    anim = plot_animation(frames)
    return anim

In [None]:
anim = generate_car_video(env, q_table)
HTML(anim.to_html5_video())

## Further readings

We were able to solve mountain car using our learned reward. A final remark is that each time the reward is updates, the MDP has to be solved with the new reward. Solving an RL problem is usually hard enough so solving it multiple times is nearly computationally  infeasible for environment that are bigger than mountain car.

Getting rid of that constraint is one of the main research topic on this subject. Several approaches solved it. Those interested can have a look at:
- [GAIL](https://arxiv.org/abs/1606.03476)
- [Guided Cost Learning](https://arxiv.org/abs/1603.00448)