# Reinforce

Tutorial: https://huggingface.co/learn/deep-rl-course/unit4/introduction

In [12]:
import torch

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

# Environment

First we will create the cartpole environment.

The observation_space of cartpole is a 4-dimensional float vector,
and the action_space is a discrete space with 2 possible actions (left or right).

In [13]:
import gym

env = gym.make("CartPole-v1")
observation_space_shape = env.observation_space.shape
action_space_size = env.action_space.n  # type: ignore
print("State size:", observation_space_shape)
print("Action size:", action_space_size)
state = env.reset()
print(f"Example state: {state}")
action_return = env.step(1)
print(f"Action return: {action_return}")

State size: (4,)
Action size: 2
Example state: (array([-8.2978457e-03, -1.8829281e-02, -2.8209276e-02, -3.4900859e-05],
      dtype=float32), {})
Action return: (array([-0.00867443,  0.17668563, -0.02820997, -0.30148304], dtype=float32), 1.0, False, False, {})



`np.bool8` is a deprecated alias for `np.bool_`.  (Deprecated NumPy 1.24)



# Model

This is the policy network, in the paper represented by $\pi_{\theta}(s_t)$

Meaning the policy $\pi$ given the parameters $\theta$ (which in this code
represents the weights and biases of self.input, self.hidden and self.output) when
doing a forward pass with the state $s$ at time $t$ as input.

The network is very simple feed forward network, with relu activation functions and a softmax output.

The output of the forward method is what the paper calls $\pi_{\theta}(a_i | s_t)$, which is a PDF due to the `softmax`.

The action method is a translation from a numpy state vector into an int action, using the forward pass of the network and the REINFORCE score function.


In [14]:
from typing import NewType
import numpy.typing as npt
import numpy as np

# Lets make some types to make type annotation easier
State = NewType("State", npt.NDArray[np.float64])
Action = NewType("Action", int)
Reward = NewType("Reward", float)

In [15]:
from typing import List, Tuple
from torch import nn


class Policy(nn.Module):
    """A classic policy network is one which takes in a state
    and returns a probability distribution over the action space"""

    def __init__(
        self, state_size: int, action_size: int, hidden_sizes: List[int]
    ) -> None:
        """
        This is a very simple feed forward network
        with an input of size state_size, and output of size action_size
        and ReLU activations between the layers
        """
        super().__init__()
        assert len(hidden_sizes) > 0, "Need at least one hidden layer"
        network = [nn.Linear(state_size, hidden_sizes[0]), nn.ReLU()]
        for i in range(len(hidden_sizes) - 1):
            network.append(nn.Linear(hidden_sizes[i], hidden_sizes[i + 1]))
            network.append(nn.ReLU())
        network.append(nn.Linear(hidden_sizes[-1], action_size))
        network.append(nn.Softmax())
        self.network = nn.Sequential(*network)

    def forward(self, state: torch.Tensor) -> torch.Tensor:
        """Takes a state tensor and returns a probability distribution along the action space"""
        return self.network(state)

    def act(self, state: State) -> Tuple[Action, float]:
        """Same as forward, instead of returning the entire distribution, we
        return the maximum probability action
        along with the log probability of that action
        """
        # First we got to convert out of numpy and into pytorch
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)

        # Now we can run the forward pass, whos output is a probability distribution
        # along the action space
        pdf = self.forward(state_tensor).cpu()

        # Now we want to get the action that corresponds to the highest probability
        action_idx = np.argmax(pdf)

        # We return the action and the log probability of the action
        return Action(action_idx.item()), np.log(pdf[action_idx])

# Training - REINFORCE

This is the training loop for the REINFORCE algorithm.

Training is done by assembling a sample of trajectories, which are lists of tuples of (state, action, reward).

The algorithm is as follows:

1. Start with policy model $\pi_{\theta}$
2. repeat:
    1. Generate an episode $S_0, A_0, r_1, ..., S_{T-1}, A_{T-1}, r_{T-1}$ following $\pi_{\theta}$
    2. for t from T-1 to 0:
        1. $G_t = \sum_{k=t}^{T-1} \gamma^{k-t} r_k$
    3. $L(\theta) = \frac{1}{T} \sum_{t=0}^{T-1} G_t \log \pi_{\theta}(a_t | s_t)$
    4. Optimize $\pi_{\theta}$ using $\nabla_{\theta} L(\theta)$

In [16]:
from dataclasses import dataclass


# SAR stands for State, Action, Reward
@dataclass
class SAR:
    state: State
    action: Action
    reward: Reward
    log_prob: float


# A list of SAR representing a single episode
Trajectory = NewType("Trajectory", List[SAR])
# A list of just the rewards from a single episode
RewardTrajectory = NewType("RewardTrajectory", List[Reward])

In [17]:
def collect_episode(policy: Policy) -> Tuple[Trajectory, Reward]:
    """Returns the trajectory and the sum of all rewards."""
    state, _ = env.reset()
    done = False
    trajectory = []
    while not done:
        action, log_prob = policy.act(state)
        state, reward, done, _, _ = env.step(action)
        trajectory.append(
            SAR(
                state=State(state),
                action=action,
                reward=Reward(reward),
                log_prob=log_prob,
            )
        )
    return Trajectory(trajectory), Reward(sum(sar.reward for sar in trajectory))

This represents the formula $R(\tau)$ in the tutorial. It's a simple reward decay formula.

In [18]:
import pytest


def cumulative_discounted_reward(
    trajectory: RewardTrajectory, gamma: float = 0.5
) -> Reward:
    if len(trajectory) == 0:
        raise ValueError("Trajectory needs at least one item.")
    if len(trajectory) == 1:
        return 0.0
    out = trajectory[1]
    if len(trajectory) == 2:
        return out
    for i in range(2, len(trajectory)):
        out += gamma * trajectory[i]
        gamma *= gamma
    return out


# Its important to test equations like this!
@pytest.mark.parametrize(
    "test_input,expected",
    [([0], 0), ([1, 1], 1), ([1, 1, 1], 1.5), ([1, 1, 1, 1], 1.75)],
)
def test_cumulative_discounted_reward(
    test_input: RewardTrajectory, expected: float
) -> None:
    assert cumulative_discounted_reward(test_input, gamma=0.5) == expected

# Run Tests

In [19]:
import ipytest

ipytest.autoconfig()

ipytest.run("-vv")

platform darwin -- Python 3.10.6, pytest-8.1.1, pluggy-1.4.0 -- /Users/ryan.peach/Library/Caches/pypoetry/virtualenvs/continuing-education-vJKa4-To-py3.10/bin/python
cachedir: .pytest_cache
rootdir: /Users/ryan.peach/Documents/ryanpeach/continuing_education
configfile: pyproject.toml
plugins: anyio-4.3.0
[1mcollecting ... [0mcollected 8 items

t_e4ca5c3e2c49403986246215c689d707.py::test_cumulative_return[test_input0-0] [32mPASSED[0m[32m          [ 12%][0m
t_e4ca5c3e2c49403986246215c689d707.py::test_cumulative_return[test_input1-1] [32mPASSED[0m[32m          [ 25%][0m
t_e4ca5c3e2c49403986246215c689d707.py::test_cumulative_return[test_input2-1.5] [32mPASSED[0m[32m        [ 37%][0m
t_e4ca5c3e2c49403986246215c689d707.py::test_cumulative_return[test_input3-1.75] [32mPASSED[0m[32m       [ 50%][0m
t_e4ca5c3e2c49403986246215c689d707.py::test_cumulative_discounted_reward[test_input0-0] [32mPASSED[0m[32m [ 62%][0m
t_e4ca5c3e2c49403986246215c689d707.py::test_cumulative_disco

<ExitCode.OK: 0>