# Introduction to Deep Reinforcement Learning

**Author: Logan Thomson**

[![GitHub](https://img.shields.io/badge/GitHub-Connect-181717?logo=github&logoColor=white)](https://github.com/xycoord)
[![LinkedIn](https://img.shields.io/badge/LinkedIn-Connect-blue)](https://www.linkedin.com/in/logan-thomson-01a4942ab)

[![GitHub](https://img.shields.io/badge/Star_Repository-gold?logo=github&logoColor=white)](https://github.com/xycoord/deep-rl-course)
[![GitHub Issues](https://img.shields.io/badge/Submit_Issue-red?logo=github)](https://github.com/xycoord/deep-rl-course/issues)

Welcome to this introduction to Deep Reinforcement Learning. In this notebook we introduce the core abstractions and framework used in reinforcement learning as well as the policy gradients algorithm.

The approach is to both provide rigorous mathematical definitions and explanations along with implementations. I encourage students to fully read and understand all three of these and not rush ahead. I don't expect you to complete the notebook in one sitting if you are to fully understand all the content.

## Prerequisites

- Machine Learning (Gradient Descent, Neural Networks)
- Basic Probability Theory (Expectations and Distributions)
- Multivariate Calculus
- Python and Pytorch

## Contents

1. Reinforcement Learning Basics (Environments, Agents and Policies)
2. The Reinforcement Learning Loop (Experiences and Trajectories)
3. The Reinforcement Learning Objective
4. Policy Gradients algorithm (Theory and Implementation)
5. Variance
6. Reward-to-go



# Dependencies & Utils
Run to install and import all dependencies and set-up some utility functions for use with PyTorch. *For this notebook, a CPU runtime is sufficient.*


In [None]:
%%capture
!pip install swig
!pip install gymnasium[Box2D,mujoco]

In [None]:
from typing import Union, List, Callable
from abc import ABC, abstractmethod
from dataclasses import dataclass

import numpy as np
import torch
from torch import nn, optim, distributions

import gymnasium as gym

import itertools
from tqdm import tqdm
import matplotlib.pyplot as plt

## Pytorch Utils

In [None]:
device = torch.device("cpu")
dtype = torch.float32

Activation = Union[str, nn.Module]

_str_to_activation = {
    'relu': nn.ReLU(),
    'tanh': nn.Tanh(),
    'leaky_relu': nn.LeakyReLU(),
    'sigmoid': nn.Sigmoid(),
    'selu': nn.SELU(),
    'softplus': nn.Softplus(),
    'identity': nn.Identity(),
}

@dataclass
class MLPConfig:
    input_size: int
    output_size: int
    n_layers: int = 2
    size: int = 64
    activation: Activation = 'tanh'
    output_activation: Activation = 'identity'

    def __post_init__(self):
        if isinstance(self.activation, str):
            self.activation = _str_to_activation[self.activation]
        if isinstance(self.output_activation, str):
            self.output_activation = _str_to_activation[self.output_activation]

def build_mlp(config: MLPConfig) -> nn.Module:
    """
        Builds a feedforward neural network specified by config
    """

    layers = []
    in_size = config.input_size
    for _ in range(config.n_layers):
        layers.append(nn.Linear(in_size, config.size))
        layers.append(config.activation)
        in_size = config.size
    layers.append(nn.Linear(in_size, config.output_size))
    layers.append(config.output_activation)

    mlp = nn.Sequential(*layers)
    mlp.to(device)
    return mlp


def combined_shape(length, shape=None):
    if shape is None:
        return (length,)
    return (length, shape) if np.isscalar(shape) else (length, *shape)

def set_seed(seed):
    """Set all the seeds for reproducibility"""
    np.random.seed(seed)             # NumPy
    torch.manual_seed(seed)          # PyTorch on CPU
    torch.cuda.manual_seed(seed)     # PyTorch on GPU (single GPU)
    torch.cuda.manual_seed_all(seed) # PyTorch on all GPUs (multi-GPU)
    torch.backends.cudnn.deterministic = True  # Make CUDA deterministic
    torch.backends.cudnn.benchmark = False     # Disable CUDA benchmarking

# Reinforcement Learning Basics

> **Reinforcement Learning** is a framework in which an *agent* learns to perform tasks through trial-and-error interaction with its *environment*, receiving *rewards/penalties* as feedback from actions.

In this first section, we'll introduce all the formal definitions for the key terms in this definition (shown in *italics*) along with their implementations.

## Environments

> An **environment** is a 5-tuple $(\mathcal S, \mathcal A, p, p_0, r)$ where:
> - $\mathcal S$ is the State Space, the set of all states. A state $s\in\mathcal S$ is a complete description of the environment
> - $\mathcal A$ is the Action Space, the set of all actions an agent can take in the environment
> - $p: \mathcal S\times\mathcal A\times\mathcal S\to[0,1]$ is the transition distribution s.t. $p(s_{t+1} | s_t,a_t)$ is the probability that the environment transitions from state $s_t$ to state $s_{t+1}$ when action $a_t$ is taken
> - $p_0:\mathcal S\to[0,1]$ is the initial state distribution s.t. $p_0(s)$ is the probability that $s_0 = s$
> - $r:\mathcal S\times\mathcal A\to\mathbb R$ is the reward function s.t. $r(s_t,a_t)$ is the reward given when the environment is in state $s_t$ and action $a_t$ is taken

This formalization is a **Markov Decision Process (MDP)**, the mathematical framework used in reinforcement learning for modelling sequential decision-making problems. MDPs capture problems where outcomes are partly random and partly under the control of a decision maker. Their key characteristic is the Markov Property:

> The **Markov Property**: the probability of transitioning to the next state $s_{t+1}$ depends only on the current state $s_t$ and $a_t$, not on the history of previous states or actions.

For this tutorial, we will use `gymnasium` to make and work with environments. It is a library that comes with a collates many standard environments used for testing RL algorithms, all with a common interface. We have imported it as `gym` for convenience.

One of these is Cart Pole, a classic control problem with the aim of balancing the pole in the cart with left and right forces:
![cart_pole.gif](https://gymnasium.farama.org/_images/cart_pole.gif)

Let's make an instance using `gym.make`

In [None]:
cartpole_env = gym.make("CartPole-v1")
# Reset the environment to an initial state sampled from p_0
observation, info = cartpole_env.reset()

### Observations

In reality, we are rarely fortunate enough to know the whole state of the environment and the best we can get is a partial observation. Let's add this to the formalism:

> $\mathcal O$ is the observation space, the set of all possible observations. An observation $o\in\mathcal O$ is a partial description of the environment.

Let's take a look at the observation space of our environment observation we get of the initial state.

In [None]:
cartpole_env.observation_space

In [None]:
observation

This is a numpy ndarray with shape (4,). The [Docs](https://gymnasium.farama.org/environments/classic_control/cart_pole/) tell us that these values refer to:

`[Cart Position, Cart Velocity, Pole Angle, Pole Angular Velocity]`

and the `observation_space` property tells us the minimum and maximum values.

Cart Pole has a continuous state space and observation space but these can also be discrete. For example, consider what the state space of a noughts and crosses environment would be.

Throughout this course, I'll use states and observations somewhat interchangeably to maintain notational simplicity. When discussing theoretical concepts and derivations, I'll primarily refer to states ($s$) to align with standard RL literature. In the code implementations, however, I'll use observations since Gymnasium environments provide observations rather than full states.

It's worth noting that CartPole, which we'll use throughout this notebook, is a fully-observable environment where the observation effectively represents the complete state of the system. Therefore, with this environment, the distinction is less important. However, this isn't always the case.

### Actions and Action Spaces

In [None]:
cartpole_env.action_space

This tells us that the action space of Cart Pole is discrete, containing two actions: $\mathcal A :=\{0,1\}$

    0: Push cart to the left
    1: Push cart to the right

We can sample the action space:

In [None]:
cartpole_env.action_space.sample()

Admittedly, sampling $\{0,1\}$ is quite underwhelming. Just as with state and observation spaces, action spaces can also be continuous.

Let's look at a different example environment.

In [None]:
half_cheetah_env = gym.make("HalfCheetah-v5")
observation, info = half_cheetah_env.reset()

print("Observation Space:", half_cheetah_env.observation_space)
print("Initial Observation:", observation)
print("Action Space:", half_cheetah_env.action_space)
print("Sample Action:", half_cheetah_env.action_space.sample())

See the [Docs](https://gymnasium.farama.org/environments/mujoco/half_cheetah/) for more info on the Half Cheetah environment.

### Take a Step
Environments exist throughout time, transitioning between states when actions are taken. At each *timestep* $t$,

1. We receive an observation $o_t\in\mathcal O$ of the environment's state $s_t\in\mathcal S$

2. We select an action $a_t\in\mathcal A$

3. The environment transitions to a new state $s_{t+1}$ sampled as:

    $s_{t+1} \sim p(\cdot|s_t,a_t)$

4. The reward function provides a reward based on the action taken:

    $r_t = r(s_t, a_t)$

    Higher rewards indicate that a good thing has happened.

With `gymnasium` we take an action, incrementing the timestep, using `env.step(action)`

In [None]:
action = cartpole_env.action_space.sample()
observation, reward, terminated, truncated, info = cartpole_env.step(action)

This gives us an observation $o_{t+1}$ of the new state $s_{t+1}$ and the reward $r_t$.

In Cart Pole, the aim is to keep the pole upright for as long as possible. Hence, a reward of $+1$ is given at every timestep in which the pole is not on the floor.

### Episodes, Termination and Truncation

> A **Terminal State** is one that cannot be left (such as winning, losing, dying etc.)

This leads to a distinction between two categories of task:

> An **Episodic Task** has a *terminal state*
>
> e.g. a game of chess, a level in a video game, landing a rocket

>A **Continuing Task** has no *terminal state*
>   
> e.g. trading on the stock market, walking, keeping the pole upright

When an environment enters a terminal state, we say that the episode has ended. In `gymnasium`, we are notified of this by the `terminated` flag returning `True`.

> **Truncation** is the act of forcibly ending interaction with the environment before reaching a natural conclusion

We can make a continuing task episodic, or truncate an episodic task by enforcing a time limit (i.e. the maximum number of timesteps). Once the time limit is reached, the `truncated` flag returns `True`.

While both termination and truncation end an episode it is important to keep them distinct, since when an episode is truncated, it has not entered a terminal state.

Cart Pole has a terminal state (when the pole falls or the cart leaves the track) and a time limit of 500 steps.

## Agents and Policies

So far we have directly sampled actions from the action space however in RL, it is the role of the agent to select actions.

> An **agent** is an entity that interacts with an environment by taking actions according to its policy.

> A **policy**, $\pi:\mathcal O\to P(\mathcal A)$, is a function from observations to distributions from which an action may be sampled (selected) given an observation:
> $a_t \sim \pi(\cdot|o_t)$
>
> Equivalently, $\pi(a_t|o_t)$ is the probability that action $a_t$ is selected given observation $o_t$

Let's setup some abstract base classes `Policy` and `Agent` to ensure that all the policies and agents we write in this tutorial have the same interface and meet the definitions.

In [None]:
class Policy(ABC):

    @abstractmethod
    def get_action(self, observation: np.ndarray) -> np.ndarray:
        """ Select an action given the observation.
        Args:
            observation: an observation of the environment
        Returns:
            action: an action to take
        """

In [None]:
class Agent(ABC):

    @property
    @abstractmethod
    def policy(self) -> Policy:
        """ The agent's policy """

    @abstractmethod
    def update(self, experiences):
        """ Update the agent's policy """
        raise NotImplementedError

For the next sections, let's define a very simple agent and policy where the actions are sampled at random from the action space. This doesn't provide any new functionality, but it will allow us to implement and test some key bits of infrastructure before implementing proper agents.

In [None]:
class RandomPolicy(Policy):
    def __init__(self, action_space: gym.Space):
        self.action_space = action_space

    def get_action(self, observation: np.ndarray) -> np.ndarray:
        # Sample a random action from the action space
        action = self.action_space.sample()
        return action

In [None]:
class RandomAgent(Agent):
    def __init__(self, action_space: gym.Space):
        self._policy = RandomPolicy(action_space)

    @property
    def policy(self) -> Policy:
        return self._policy

    def update(self, experiences):
        pass

In [None]:
random_agent = RandomAgent(cartpole_env.action_space)

It's worth noting that while in some cases the agent is no more than a thin wrapper around the policy, the concepts are distinct. In more complex algorithms than we discuss in this part, the agent has additional functionality.

# The RL Loop

Let us now rewrite the 4 stages of a timestep from above except have the agent select the actions. This gives us the **RL Loop**:
1. The agent receives an observation $o_t\in\mathcal O$ from the environment
2. The agent selects an action $a_t$ given $o_t$ using it's policy $\pi$
  
   $a_t\sim\pi(\cdot|o_t)$
3. The environment transitions to a new state $s_{t+1}$ sampled as:

   $s_{t+1} \sim p(\cdot|s_t,a_t)$

4. The agent receives a reward $r_t$ from the enviroment for taking $a_t$:

   $r_t = r(s_t, a_t)$

In this section, we will implement the RL Loop.

## Experiences

In each cycle of the loop, i.e. at each timestep, the agent receives $o_t$, selects $a_t$ and receives $r_t$.

> An **experience** is the triple $(o_t, a_t, r_t)$

It is the atomic unit of agent-environment interaction. In reinforcement learning, the agent learns from experiences.

Let's create a dataclass to record experiences:

In [None]:
@dataclass
class ExperienceData:
    observations: torch.Tensor
    actions: torch.Tensor
    rewards: torch.Tensor

An experience is just a single sample and as we know from supervised learning, it can be helpful to learn from multiple samples at once i.e. a batch. For this reason, our experience dataclass is flexible enough to record either one or multiple experiences.

## Trajectories
> A **trajectory** $\tau$ is a sequence of experiences from an agent interacting with its environment according to its policy until it reaches a terminal state or is truncated.

Note that two trajectories from the same policy and environment may vary considerably due to the stochasiticity in both.

Let's make a dataclass for trajectories:

In [None]:
@dataclass
class Trajectory():
    observations: torch.Tensor
    actions: torch.Tensor
    rewards: torch.Tensor
    terminals: torch.Tensor
    truncateds: torch.Tensor
    total_reward: float
    length: int

While this appears similar to our `ExperienceData` class, trajectories have distinct properties that warrant their own class representation:
- **Sequential integrity**: experiences in a trajectory follow a strict temporal order, where each directly follows the previous one.
- **Bounded Completion**: a trajectory ends exactly once, either by termination or truncation. Hence, the final experience must have at least one of its terminated or truncated flags set to true, and all other experiences in the trajectory must have both flags set to false.

To enforce, or at least encourage these properties, we can make a `TrajectoryBuilder` class to record experiences as we run through the RL Loop.
This implementation:
- allows us to record an experience using add_experience
- keeps track of whether the trajectory is finished
- enforces Bounded Completion through assertions
- encourages Sequential Integrity since only one experience may be added at a time
- uses lists for efficiency until the trajectory is complete

In [None]:
class TrajectoryBuilder():

    def __init__(self):
        self._observations = []
        self._actions = []
        self._rewards = []
        self._terminals = []
        self._truncateds = []
        self._done = False
        self._length = 0

    def add_experience(self, observation, action, reward, terminal, truncated):
        assert not self._done, "Trajectory is already done. Call get_trajectory()"

        self._done = terminal or truncated

        self._observations.append(observation)
        self._actions.append(action)
        self._rewards.append(reward)
        self._terminals.append(self._done)
        self._truncateds.append(truncated)

        self._length += 1

        return self._done

    @property
    def done(self) -> bool:
        return self._done

    @property
    def trajectory(self) -> Trajectory:
        assert self._done, "Trajectory is not complete. It must be either terminated or truncated"

        def list_to_tensor(x):
            return torch.tensor(np.array(x), dtype=dtype, device=device, requires_grad=False)

        # Convert lists to tensors
        observations = list_to_tensor(self._observations)
        actions = list_to_tensor(self._actions)
        rewards = list_to_tensor(self._rewards)
        terminals = list_to_tensor(self._terminals)
        truncateds = list_to_tensor(self._truncateds)

        total_reward = rewards.sum()

        return Trajectory(
            observations,
            actions,
            rewards,
            terminals,
            truncateds,
            total_reward,
            self._length
        )

Now we're ready to implement the RL Loop!

In [None]:
def sample_trajectory(env: gym.Env, agent: Agent) -> Trajectory:
    """Sample a trajectory in the environment from a policy."""

    # Reset the environment to start a new trajectory
    observation, info = env.reset()

    traj_builder = TrajectoryBuilder()

    # RL Loop
    while not traj_builder.done:

        # Select the action based on the observation
        action = agent.policy.get_action(observation)

        # Use that action to take a step in the environment
        new_observation, reward, terminated, truncated, info = env.step(action)

        # Record the experience
        # Note that we record the o_t not o_t+1
        traj_builder.add_experience(observation, action, reward, terminated, truncated)

        # Update the observation for the next loop
        observation = new_observation

    return traj_builder.trajectory

Let's test it!

In [None]:
sample_trajectory(cartpole_env, random_agent)

# Measuring Performance

## Rewards

A key idea in RL is that rewards provide feedback to the agent such that it can learn to perform the task better. Formally:

> A **reward**, $r_t$, is a numerical feedback signal provided by the reward function at timestep $t$

> A **reward function**, $r: \mathcal S \times \mathcal A \to \mathbb R$, is a function such that the accumulation of rewards over a trajectory corresponds to performance on the task in that trajectory.

## Cumulative Reward

We can overload the definition of $r$ to accept trajectories giving us a measure of performance in a trajectory:

$$r(\tau) = r(s_0, a_0, ..., s_{T-1}, a_{T-1}) = \sum_{t=0}^{T-1} r(s_t, a_t)$$

Let's look at some examples of reward functions. For each, consider how the cumulative reward relates to the goal.

**Cart Pole**

     Goal: keep the pole upright for as long as possible
     Reward: +1 for every timestep the pole remains upright

**Lunar Lander**

     Goal: Safely land a spacecraft on the moon's surface with minimal fuel usage and proper positioning
     Reward:
        Increases/decreases based on proximity to landing pad (closer = higher reward)
        Increases/decreases based on spacecraft velocity (slower = higher reward)
        Decreases based on tilt angle (penalty for non-horizontal orientation)
        +10 for each leg in contact with the ground
        -0.03 for each frame a side engine is firing
        -0.3 for each frame the main engine is firing
        +100 bonus for landing safely
        -100 penalty for crashing

You may be tempted to characterise rewards as being received for "good actions" or "making progress towards the goal".
While rewards are given for the state-action pairs at a particular timestep, it's more accurate to start with the cumulative reward $r(\tau)$ being a measure of performance and working back from there to design a reward function.
This is because it is the task performance over the whole trajectory that actually matters rather than the actions taken at any individual timestep.
The relationship between immediate rewards and overall task performance can sometimes be non-obvious or counterintuitive, and designing effective reward functions plays a large part in reinforcement learning in practice.

## Trajectory Distribution

To measure the success of a policy $\pi_\theta$ parameterised by $\theta$ we also need a notion of the probability of a trajectory occuring under that policy, and transition distribution $p$.

$$p_\theta(\tau) = p_\theta(s_0, a_0, ..., s_{T-1}, a_{T-1}) = p(s_0)\pi_\theta(a_0|s_0)\prod_{t=1}^{T-1} p(s_t|s_{t-1}, a_{t-1})\pi_\theta(a_t|s_t)$$

This is just the chain rule of probabilities used to break down the trajectory into its individual steps.

## The RL Objective

In reinforcement learning, we seek to find policy parameters $\theta$ with maximal task performance. We do so by optimising the RL Objective - a measure of the policy's average performance. It is the expected cumulative reward over trajectories from the policy. Formally:

$$J(\theta)=\mathbb E_{\tau\sim p_\theta(\tau)}[r(\tau)]$$

In other words, this objective quantifies how well a policy performs across all possible trajectories it might generate, weighted by how likely each trajectory is to occur.

# Vanilla Policy Gradient Algorithm

In this section, we will introduce and implement the simplest algorithm to optimise the RL objective. Up to this point, we have considered the policy as an arbitrary parameterised function but in deep reinforcement learning, it is specifically a neural network. As with most deep learning, the tool of choice to optimise model parameters to maximise an objective is gradient ascent. Therefore, we need to find the gradient of $J(\theta)$.

$$\nabla_\theta J(\theta)= \nabla_\theta\mathbb E_{\tau\sim p_\theta(\tau)}[r(\tau)]$$

Express the expectation as an integral:

$$
= \nabla_\theta \int p_\theta(\tau)r(\tau) \rm d\tau
$$

Take gradient inside the integral:
$$
= \int\nabla_\theta p_\theta(\tau)r(\tau) \rm d\tau
$$

Since the agent doesn't know the transition probabilities, $\nabla_\theta p_\theta(\tau)$ cannot be found directly. However, as we will see, $\nabla_\theta \log p_\theta(\tau)$ can be computed without the transition probabilities. The next step, uses a convenient identity derived from the derivative of a logarithm:

$$
\nabla_\theta p_\theta(\tau)
= p_\theta(\tau)\frac{\nabla_\theta p_\theta(\tau)}{p_\theta(\tau)}
= p_\theta(\tau)\nabla_\theta \log p_\theta(\tau)
$$

Substituting this in gives:
$$
\nabla_\theta J(\theta)
= \int p_\theta(\tau)\nabla_\theta \log p_\theta(\tau)r(\tau) \rm d\tau
$$

Which is in fact just an expectation over trajectories:
$$ = \mathbb E_{\tau\sim p_\theta(\tau)}[\nabla_\theta \log p_\theta(\tau)r(\tau)]$$


Now we expand the gradient part to show that it doesn't require the transition probabilities:
$$
\nabla_\theta \log p_\theta(\tau)
= \nabla_\theta \log p(s_0)\pi_\theta(a_0|s_0)\prod_{t=1}^{T-1} p(s_t|s_{t-1}, a_{t-1})\pi_\theta(a_t|s_t)
\\=\nabla_\theta \big(\log p(s_0) + \log \pi_\theta(a_0|s_0) + \sum_{t=1}^{T-1} \log p(s_t|s_{t-1}, a_{t-1}) + \log \pi_\theta(a_t|s_t)\big)
$$

The $p$ terms don't depend on $\theta$ so their gradients are zero.
$$
=\nabla_\theta \big(\log \pi_\theta(a_0|s_0) + \sum_{t=1}^{T-1} \log \pi_\theta(a_t|s_t)\big)
\\
=\nabla_\theta \sum_{t=0}^{T-1} \log \pi_\theta(a_t|s_t)
$$

Pluging this back into the expectation and expanding the cumulative reward, we get:
$$\nabla_\theta J(\theta) = \mathbb E_{\tau\sim p_\theta(\tau)}\left[\left(\sum_{t=0}^{T-1} \nabla_\theta \log \pi_\theta(a_{t}|s_{t})\right)\left(\sum_{t=0}^{T-1} r(s_{t}, a_{t})\right)\right]$$

Since we can't enumerate all possible trajectories, we cannot compute this expectation exactly. However, we can use a Monte Carlo approximation by sampling $N$ trajectories:

$$
\approx \frac{1}{N}\sum_{i=1}^N \left(\sum_{t=0}^{T-1} \nabla_\theta \log \pi_\theta(a_{it}|s_{it})\right)\left(\sum_{t=0}^{T-1} r(s_{it}, a_{it})\right)
$$

Where $i$ is the index of the trajectory and $t$ is the timestep within each trajectory.

Finally, distributing the cumulative reward across the gradient sum gives:

$$
= \frac{1}{N}\sum_{i=1}^N \sum_{t=0}^{T-1} \left[\nabla_\theta\log \pi_\theta(a_{it}|s_{it}) \cdot \left(\sum_{k=0}^{T-1} r(s_{ik}, a_{ik})\right)\right]
$$


This can be implemented as a weighted log likelihood loss where the log likelihood of a state-action pair is weighted by the cumulative reward of the trajectory to which it belongs. We call this a surrogate loss, since it serves as a substitute objective whose gradient matches that of our true objective $J(\theta)$.

$$
L_{\text{surrogate}}(\theta) = \frac{1}{N}\sum_{i=1}^N \sum_{t=0}^{T-1} \log \pi_\theta(a_{it}|s_{it}) \cdot \left(\sum_{k=0}^{T-1} r(s_{ik}, a_{ik})\right)
$$

Unlike in supervised learning, where all examples are good examples and have weight 1, in the surrogate loss, the cumulative reward measures how good the policy decisions in that trajectory are. The rigorous derivation was essential to ensure that gradient ascent on this loss genuinely optimises our original objective, but this interpretation also provides a reassuring intuition for why it works. Indeed, when we used the convenient identity to remove the dependence on the transition distributions, we changed the objective from $J(\theta)$ to the surrogate loss. While these objectives are not equivalent, they have identical gradients, so gradient ascent on the surrogate loss will also maximise $J(\theta)$.


## Algorithm

Having derived the gradient, we can now formalise the policy gradient algorithm:

1. Sample $N$ trajectories $\{\tau^i\}$ by running the policy
2. Calculate the gradient
$
\nabla_\theta J(\theta) \approx \sum_i \sum_t \left[\nabla_\theta\log \pi_\theta(a_{it}|s_{it}) \cdot \left(\sum_k r(s_{ik}, a_{ik})\right)\right]
$
3. Take a gradient ascent step $\theta \leftarrow \theta + \alpha\nabla_\theta J(\theta)$

  where $\alpha$ is the learning rate

Repeat steps 1 to 3.

We omit the constant $\frac{1}{N}$ in the gradient since it will just scale the learning rate.

## Implemenation

Let's work through the algorithm step-by-step.

### Sampling Trajectories

The first step is to sample N trajectories. We can do this with a simple for loop and our sample_trajectory function.

In [None]:
def sample_N_trajectories(environment, agent, N) -> List[Trajectory]:
    trajectories = []

    for _ in range(N):
        trajectory = sample_trajectory(environment, agent)
        trajectories.append(trajectory)

    return trajectories

In environments with terminal states, we may find that early in training, the trajectories are much shorter than later on. This happens with Cart Pole. Therefore, with this sampling function the gradient steps early in training are based on fewer experiences than those later on. This leads to less stable gradient updates when the agent is learning most rapidly. Another approach is to fix the number of timesteps/experiences we sample per gradient update. As in supervised learning, this number is called the batch size.

To avoid truncating any trajectories, we fix a minimum batch size and sample just enough trajectories to fill it. This ensures we always have sufficient data for stable gradient estimates while respecting episode boundaries. Here's the updated implementation:

In [None]:
def sample_trajectories(environment, agent, batch_size) -> List[Trajectory]:
    trajectories = []

    timesteps_so_far = 0
    while timesteps_so_far < batch_size:
        trajectory = sample_trajectory(environment, agent)
        trajectories.append(trajectory)
        timesteps_so_far += trajectory.length

    return trajectories

Test it!

In [None]:
trajectories = sample_trajectories(cartpole_env, random_agent, 10)
print(trajectories)

### From Trajectories to Experiences

Previously, our ExperienceData class stored rewards for individual state-action pairs. However, our theoretical derivation shows that in the policy gradient calculation, these individual rewards aren't directly used. Instead, what matters is the weight applied to each log probability.
To better align with this understanding, let's update our ExperienceData class:

In [None]:
@dataclass
class ExperienceData:
    observations: torch.Tensor
    actions: torch.Tensor
    weights: torch.Tensor

We can now write a function which takes a list of sampled trajectories and returns an experience data object to update the policy with. We abstract the weights calculation using a function `calculate_weights` that we pass in.

In [None]:
def compile_trajectories(trajectories: list[Trajectory], calculate_weights: Callable[[torch.Tensor], torch.Tensor]) -> ExperienceData:

    # Use torch.cat to directly concatenate tensors from all trajectories
    observations = torch.cat([t.observations for t in trajectories], dim=0)
    actions = torch.cat([t.actions for t in trajectories], dim=0)

    # Calculate weights for each trajectory from its rewards
    weights = torch.cat([calculate_weights(t.rewards) for t in trajectories], dim=0)

    return ExperienceData(observations, actions, weights)

Let's implement the particular function we use to calculate the weights for a trajectory. This takes the sum of rewards across the trajectory and broadcasts it to the length of the trajectory such that there is one weight for each experience.

In [None]:
def sum_of_rewards(rewards: torch.Tensor) -> torch.Tensor:
    total_reward = rewards.sum()
    return torch.ones_like(rewards) * total_reward

Test it!

In [None]:
discrete_experience_data = compile_trajectories(trajectories, sum_of_rewards)
print(discrete_experience_data)

### Policy

#### Network Architecture

Our policy will use a fully connected MLP which takes an observation as input and outputs a distribution over actions. The policy handles both discrete and continuous action spaces using PyTorch's built-in distribution classes, which support sampling and differentiation.

For discrete action spaces, we use a `Categorical` distribution parameterized by a vector of logits. The MLP outputs this vector with one logit for each possible action. When we sample from this distribution, we get a single integer action.

For continuous action spaces, we use a `MultivariateNormal` distribution parameterized by a mean vector and covariance matrix. The MLP outputs the mean vector directly, with its size matching the action space dimension. The covariance matrix is diagonal, with each diagonal element being the variance for a corresponding action dimension. We learn these variances as separate parameters outside the MLP. This choice simplifies things by removing state-dependence from the variances. When we sample from this distribution, we get a continuous vector action.

Note that even though we will only demonstrate the algorithm on a discrete environement, we implement the code for continuous environments too for completeness.

To simplify construction of the MLP, we use the helper function `build_mlp` which returns a fully connected MLP with specified by an `MLPConfig`.

```
@dataclass
class MLPConfig:
    input_size: int
    output_size: int
    n_layers: int = 2
    size: int = 64
    activation: Activation = 'tanh'
    output_activation: Activation = 'identity'
```

#### Sampling Actions

Because we're using PyTorch distributions, sampling an action given an observation is as easy as running a forward pass and sampling the resulting distribution. It is worth noting that we do this with `@torch.no_grad()` since, in general, we only need the gradients when performing an update step.

#### Update

The `update` method follows the classic PyTorch gradient descent structure:
1. Zero Gradients
2. Forward Pass
3. Compute Loss
4. Backward Pass (compute gradients)
5. Optimiser Step

We perform the forward pass again here except with gradients on. Another possible implementation would be to perform only a single forward pass, when sampling an action. However, this would require remembering the `log_prob` throughout the pipeline, which is less clear.

Note that since PyTorch implements gradient *decent*, not gradient *ascent*, we negate the loss.

#### Optimizer

We use the Adam optimiser for more stable training; however, you can think of this as just a gradient decent optimiser which implements the gradient step.

In [None]:
class PGPolicy(Policy, nn.Module):
    def __init__(
        self,
        mlp_config: MLPConfig,
        discrete: bool,
        learning_rate: float,
    ):
        # Call both parent initializers
        Policy.__init__(self)
        nn.Module.__init__(self)

        # Define the network
        if discrete:
            self.logits_net = build_mlp(mlp_config)
            parameters = self.logits_net.parameters()
        else:
            self.mean_net = build_mlp(mlp_config)
            # For continuous policies, logvar needs to match the action dimension (output_size)

            # We use log variances as parameters for numerical stability
            # This ensures variances remain positive when transformed via exp()
            # and prevents collapse to extremely small values during training
            self.logvar = nn.Parameter(
                torch.zeros(mlp_config.output_size, dtype=dtype)
            )
            parameters = itertools.chain([self.logvar], self.mean_net.parameters())

        self.discrete = discrete

        # Initialise optimizer
        self.optimizer = optim.Adam(
            parameters,
            learning_rate,
        )

        # Move entire model to device
        self.to(device)


    def forward(self, observation: torch.FloatTensor) -> distributions.Distribution:
        """
        This function defines the forward pass of the network.
        """
        if self.discrete:
            logits = self.logits_net(observation)
            return distributions.Categorical(logits=logits)
        else:
            mean = self.mean_net(observation)
            covariance_matrix = torch.diag(torch.exp(self.logvar))
            return distributions.MultivariateNormal(mean, covariance_matrix)


    @torch.no_grad()
    def get_action(self, observation: np.ndarray) -> np.ndarray:
        """Takes a single observation (as a numpy array) and returns a single action (as a numpy array)."""
        observation = torch.tensor(observation, dtype=dtype, device=device)
        distribution = self.forward(observation)
        action = distribution.sample()
        action = action.to('cpu').detach().numpy()
        return action


    def update(self, experiences: ExperienceData) -> float:

        self.optimizer.zero_grad()

        distribution = self.forward(experiences.observations)
        log_probs = distribution.log_prob(experiences.actions)
        surrogate_loss = -(log_probs * experiences.weights).mean()

        surrogate_loss.backward()
        self.optimizer.step()

        return surrogate_loss.item()


### Agent

In the vanilla policy gradients algorithm, the agent is just a thin wrapper around the policy, which makes it very simple to implement.

In [None]:
class PGAgent(Agent):
    def __init__(self, policy_args):
        self._policy = PGPolicy(**policy_args)

    @property
    def policy(self) -> Policy:
        return self._policy

    def update(self, experiences: ExperienceData) -> float:
        return self._policy.update(experiences)

In [None]:
discrete = isinstance(cartpole_env.action_space, gym.spaces.Discrete)
ac_dim = cartpole_env.action_space.n if discrete else cartpole_env.action_space.shape[0]
ob_dim = cartpole_env.observation_space.shape[0]

mlp_config = MLPConfig(
    input_size=ob_dim,
    output_size=ac_dim,
    n_layers=2,
    size=64,
    activation='tanh',
    output_activation='identity',
)

policy_args = {
    "mlp_config": mlp_config,
    "discrete": discrete,
    "learning_rate": 5e-3,
}
pgagent = PGAgent(policy_args)

### Training Loop

Finally, we can put this all together into a training loop.

In [None]:
def train_agent_simple(agent, environment, batch_size, num_updates):

    for step in range(num_updates):
        trajectories = sample_trajectories(environment, agent, batch_size)
        experience_data = compile_trajectories(trajectories, sum_of_rewards)
        loss = agent.update(experience_data)

    environment.close()
    return agent

While this perfectly implements the algorithm, it would be useful to track how the training has gone and plot the metrics. This `Tracker` class lets us do just that. Don't worry too much about the implementation.

In [None]:
class Tracker:
    def __init__(self):
        self.metrics = {}

    def log(self, metrics_dict, step=None):
        """Log metrics at a specific step"""
        for key, value in metrics_dict.items():
            if key not in self.metrics:
                self.metrics[key] = []
            self.metrics[key].append(value)

    def plot(self, metric_name, y_max=500):
        """Plot a single metric"""
        if metric_name in self.metrics:
            plt.figure(figsize=(8, 4))
            plt.plot(self.metrics[metric_name])
            plt.title(metric_name.capitalize())
            plt.xlabel('Step')
            plt.ylabel(metric_name)
            plt.ylim(top=y_max)
            plt.grid(True)
            plt.show()

    def plot_all(self, y_max=500):
        """Plot all metrics"""
        for metric_name in self.metrics:
            self.plot(metric_name, y_max=y_max)

Add a tracker to the train agent and plot the metrics at the end of training.

In [None]:
def train_agent(agent, environment, batch_size, num_updates):

    tracker = Tracker()

    # tqdm renders a progress bar during training
    for step in tqdm(range(num_updates)):

        trajectories = sample_trajectories(environment, agent, batch_size)
        experience_data = compile_trajectories(trajectories, sum_of_rewards)
        surrogate_loss = agent.update(experience_data)

        average_cumulative_reward = np.mean([t.total_reward for t in trajectories])
        tracker.log({
            "Surrogate Loss": surrogate_loss,
            "Average Cumulative Reward": average_cumulative_reward,
        })

    tracker.plot_all()

    environment.close()
    return agent

### Seeds

In reinforcement learning, many components involve randomness: environment transitions, reward functions, neural network initialisation, and action sampling from policy distributions. This randomness can significantly impact training outcomes. By setting specific random seeds, we ensure that these random processes produce the same sequence of values each time, making experiments reproducible.

The `set_seed` function I've provided sets seeds for both PyTorch (controlling network initialisation and policy sampling) and NumPy (which Gymnasium uses internally). Additionally, we must set the seed when resetting each environment with `env.reset(seed=seed)`. Once seeded, both the neural networks and environment will behave deterministically unless explicitly reseeded.

To make sure you have a successful first run, I have pre-selected `seed=2` since it performed well in my tests.
However, when evaluating algorithms, we should try multiple seeds (typically 5-10) to account for the variance in performance that different random initialisations can produce. When comparing different algorithms or hyperparameters, use the same set of seeds across all variations to ensure fair comparisons.

In [None]:
seed = 2
set_seed(seed)
cartpole_env.reset(seed=seed)

### Train

With the seed set, it's time to create a new agent and train it.

In [None]:
pgagent = PGAgent(policy_args)
train_agent(pgagent, cartpole_env, batch_size=1000, num_updates=100)

### Surrogate Loss vs. Cumulative Reward

The two plots show our surrogate loss and the actual RL objective (cumulative reward) across the training run. Notice how these metrics differ: while their general shapes are similar, their scales and detailed patterns vary significantly. This illustrates our earlier theoretical point—these two objectives aren't identical, but they share the same gradients.

A crucial difference from supervised learning is that the surrogate loss doesn't consistently decrease, despite our use of gradient descent. As the policy improves, the distribution of experiences shifts, changing the experience weights. However, the optimizer treats these weights as constants that don't depend on the policy parameters. As a result, the loss isn't as reliable a metric in RL as it would be in supervised learning.

For this reason, we'll focus primarily on tracking and plotting the average cumulative reward in subsequent sections.

In [None]:
def train_agent(agent, environment, batch_size, num_updates):

    tracker = Tracker()

    # tqdm renders a progress bar during training
    for step in tqdm(range(num_updates)):

        trajectories = sample_trajectories(environment, agent, batch_size)
        experience_data = compile_trajectories(trajectories, sum_of_rewards)
        loss = agent.update(experience_data)

        average_cumulative_reward = np.mean([t.total_reward for t in trajectories])
        tracker.log({"Average Cumulative Reward": average_cumulative_reward})

    tracker.plot_all()

    environment.close()
    return agent

# Reducing Variance

Above, we used a seed which I knew to perform well. However, in practice we don't have this luxury. To demonstrate how performance varies with randomness, we'll train 5 agents using different seeds [0,1,2,3,4]. To isolate the effect of randomness during training specifically, we'll fix the policy initialisation seed (42) so all agents start with identical parameters, while varying only the training seed between runs.


In [None]:
# This experiment takes about 4mins to run.

for seed in range(5):
    # Constant policy initialisation
    set_seed(42)
    pgagent = PGAgent(policy_args)

    # Seeded training
    print(f"Seed {seed}")
    set_seed(seed)
    cartpole_env.reset(seed=seed)
    train_agent(pgagent, cartpole_env, batch_size=1000, num_updates=100)

As you can see, both the maximum cumulative reward and time taken to reach it vary significantly between seeds. This high variance stems from the Monte Carlo approximation used in the surrogate loss function. When only a few trajectories are sampled per gradient update, different training runs naturally encounter varied experiences, leading to inconsistent learning outcomes. The most straightforward approach to decrease this variance is simply to sample more trajectories per update. However, doing so is often expensive—especially in real-world environments where each sample requires physical action. As a result, the field has prioritized developing methods that make better use of limited samples.

In the last part of this notebook, we'll introduce a simple but very powerful variance reduction technique called reward-to-go, which has become a standard component in virtually all modern RL algorithms.

## Reward-to-go

In the current loss, the log probability of the policy action at timestep $t$ is weighted by the sum-of-rewards across *all* timesteps. However, this approach includes rewards that occurred before taking action at time
$t$, which couldn't possibly have been influenced by this action due to causality - actions can only affect future rewards, not past ones. If we want the weighting to reflect how good *that specific* action was, we should only consider rewards that could have resulted from it. Therefore, the weight should be only the sum of rewards from timestep $t$ onward, which is known as the reward-to-go.



### Unbiased Proof

So far, we've given an intuitive explanation of why reward-to-go is a good idea, but to rigorously justify it, we need to prove that using reward-to-go weights in the surrogate loss will still maximise the RL objective. To do this, we need to show that the expectations of the gradient estimators are equal. Formally,

$$
\mathbb E_{\tau\sim p_\theta}\left[ \sum_{t=0}^{T-1}  \nabla_\theta \log \pi_\theta(a_{t}|s_{t}) \sum_{t'=0}^{T-1} r(s_{t'}, a_{t'})\right] =
\mathbb E_{\tau\sim p_\theta}\left[ \sum_{t=0}^{T-1}  \nabla_\theta \log \pi_\theta(a_{t}|s_{t}) \sum_{t'=t}^{T-1} r(s_{t'}, a_{t'})\right]
$$

To prove this equality, we need to show that the contribution of terms where $t'< t$ to the left hand side expectation is zero.

First, rearrange the left-hand side to get:
$$
\mathbb E_{\tau\sim p_\theta}\left[
  \sum_{t=0}^{T-1} \sum_{t'=0}^{T-1}
    \nabla_\theta \log \pi_\theta(a_{t}|s_{t}) r(s_{t'}, a_{t'})
\right]
$$

Apply linearity of expectations:
$$
=\sum_{t=0}^{T-1} \sum_{t'=0}^{T-1}
\mathbb E_{s_t,a_t,s_{t'},a_{t'}}\left[
  \nabla_\theta \log \pi_\theta(a_{t}|s_{t}) r(s_{t'}, a_{t'})
\right]
$$
Note that we've changed to writing the expectations over only the relevant state-action pairs.

For all summands, we decompose the expectation by conditioning on $s_t$:
$$
\mathbb E_{s_t,a_t,s_{t'},a_{t'}}\Big[\nabla_\theta \log \pi_\theta(a_{t}|s_{t}) r(s_{t'}, a_{t'})\Big]
=
\mathbb E_{s_t}\bigg[\mathbb E_{a_t,s_{t'},a_{t'}|s_t}\Big[\nabla_\theta \log \pi_\theta(a_{t}|s_{t}) r(s_{t'}, a_{t'})\big|s_t\Big]\bigg]
$$

Now consider only the summands for which $t'< t$. By causality, $r(s_{t'},a_{t'})$ is independent of $a_t$, so we can factor out the expectation. This step only holds for $t'< t$!

$$
=
\mathbb E_{s_t}\bigg[
\mathbb E_{a_t|s_t}\Big[\nabla_\theta \log \pi_\theta(a_{t}|s_{t}) \big|s_t\Big]
\mathbb E_{s_{t'},a_{t'}|s_t}\Big[r(s_{t'}, a_{t'})\big|s_t\Big]
\bigg]
$$

The next step is to prove that this expectation is zero. To do this, we use a reverse of the log-probability gradient trick we used to derive the original surrogate loss.
$$
\begin{align}
  \mathbb E_{a_t|s_t}[\nabla_\theta \log \pi_\theta(a_t|s_t)|s_t]
&= \sum_{a_t} \pi_\theta(a_t|s_t)\nabla_\theta \log \pi_\theta(a_t|s_t)\\
&= \sum_{a_t} \pi_\theta(a_t|s_t)\frac{\nabla_\theta \pi_\theta(a_t|s_t)}{\pi_\theta(a_t|s_t)}\\
&= \nabla_\theta \sum_{a_t} \pi_\theta(a_t|s_t)\\
\end{align}
$$

This sum is equal to $1$, since it is the total probability. Therefore, the gradient is $0$.

This shows that the terms where $t' < t$ contribute zero to the gradient expectation. Since the terms where $t' \geq t$ are identical in both estimators, the reward-to-go estimator has the same expected gradient as the sum-of-rewards estimator.



### Reduced Variance

In the previous section, we proved that the terms removed in the reward-to-go estimator (where $t' < t$) have an expectation of zero. However, these terms still have non-zero variance. Since these past rewards cannot be causally influenced by the current action, they contribute only statistical noise to the gradient estimate without providing useful signal. By removing these zero-expectation terms, the reward-to-go estimator maintains the same expected gradient value while eliminating a source of variance.

### Implementation

To exemplify the improvements in performance we get from using reward-to-go, let's implement it and run the same seeded experiments as before.

First we need a `calculate_weights` function. We can implement this manually using the simple dynamic programming technique of working from the last timestep backwards.

In [None]:
def reward_to_go(rewards: torch.Tensor) -> torch.Tensor:
    T = len(rewards)
    rtg = torch.zeros_like(rewards)

    # Base case:
    rtg[T-1] = rewards[T-1]

    # Work backwards:
    for t in range(T-2, -1, -1):
        rtg[t] = rewards[t] + rtg[t+1]

    return rtg

To implement reward-to-go using Pytorch functions to avoid the overhead of python for loops, we can use `cumsum`. Unfortunately, `cumsum` accumulates from the first element rather than the last. Therefore we need to flip the array before and afterwards.

In [None]:
def reward_to_go(rewards: torch.Tensor) -> torch.Tensor:
    rewards_reversed = torch.flip(rewards, [0])
    rtg_reversed = torch.cumsum(rewards_reversed, 0)
    rtg = torch.flip(rtg_reversed, [0])
    return rtg

Let's test this implementation with a simple example:

In [None]:
rewards = torch.tensor([1.0, 2.0, 3.0, 4.0, 5.0])
reward_to_go(rewards)
# Expected result for reward-to-go:
# [1+2+3+4+5, 2+3+4+5, 3+4+5, 4+5, 5]
# [15.0, 14.0, 12.0, 9.0, 5.0]

Now update the training function:

In [None]:
def train_agent_rtg(agent, environment, batch_size, num_updates):

    tracker = Tracker()

    for step in tqdm(range(num_updates)):

        trajectories = sample_trajectories(environment, agent, batch_size)
        experience_data = compile_trajectories(trajectories, reward_to_go) # REWARD TO GO
        loss = agent.update(experience_data)

        average_cumulative_reward = np.mean([t.total_reward for t in trajectories])
        tracker.log({"Average Cumulative Reward": average_cumulative_reward})

    tracker.plot_all()

    environment.close()
    return agent

Test for variance:

In [None]:
for seed in range(5):
    # Constant policy initialisation
    set_seed(42)
    pgagent = PGAgent(policy_args)

    # Seeded training
    print(f"Seed {seed}")
    set_seed(seed)
    cartpole_env.reset(seed=seed)
    train_agent_rtg(pgagent, cartpole_env, batch_size=1000, num_updates=100)

Woah! That's a huge improvement in performance, well worth the long proof. Now all of the seeds we tested reach 500 cumulative reward, which means we've effectively solved the CartPole environment (reaching its maximum possible reward). It's worth noting that CartPole is considered one of the simpler RL benchmark environments. For more complex environments with higher-dimensional spaces, longer time horizons, and sparse rewards, even reward-to-go—while powerful—isn't enough on its own. We'll need additional variance reduction techniques to achieve stable learning in these  scenarios.

# Next Steps

Well done for getting through this introduction to deep RL! In the next parts of the course, we will explore more advanced variance reduction methods which we'll use to improve the policy gradients algorithm:
- Discounting
- Baselines
- Actor-Critic algorithms
- Generalised Advantage Estimation (GAE)

These techniques form the backbone of modern deep RL algorithms and will enable us to tackle increasingly complex environments beyond CartPole.

**Part 2: Discounting**

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1UULTQYnymQOpa7nuaw6mDXnvWRV9R_2y?usp=sharing)

# Feedback and Contact Information

If you found this notebook useful, please consider:
- Sharing it with friends and colleagues who might benefit
- Starring the repository on GitHub
- Connecting with me on LinkedIn

[![GitHub](https://img.shields.io/badge/Star_Repository-gold?logo=github&logoColor=white)](https://github.com/xycoord/deep-rl-course)
[![LinkedIn](https://img.shields.io/badge/LinkedIn-Connect-blue)](https://linkedin.com/in/logan-thomson-01a4942ab)

If you found anything confusing, I'd be happy to answer any questions:

[![GitHub Issues](https://img.shields.io/badge/Submit_Issue-red?logo=github)](https://github.com/xycoord/deep-rl-course/issues)
[![GitHub Discussions](https://img.shields.io/badge/Join_Discussion-green?logo=github)](https://github.com/xycoord/deep-rl-course/discussions)

I'm continuously working to improve this material and extend it with additional topics. Your feedback helps make this resource better for everyone!