In [None]:
!cp notebook.ipynb notebook-solution

# Reinforcement Learning with Gymnasium

Reinforcement Learning (RL) is one of the three main machine learning paradigms, alongside supervised and unsupervised learning. Unlike the other two, RL focuses on training an agent to interact with its environment by making decisions that maximize cumulative rewards. Through trial and error, the agent learns the optimal actions to take in different situations.

A powerful extension of this approach is Reinforcement Learning with Human Feedback (RLHF), where human input helps refine the agent’s behavior at each step, leading to more aligned and effective decision-making.

RL has a wide range of applications, from self-driving cars and automated trading to game-playing AI and robotic control. When combined with deep neural networks, it becomes Deep Reinforcement Learning, enabling breakthroughs in complex problem-solving.

In this code-along, we’ll dive into Gymnasium, an open-source Python library for developing and benchmarking RL algorithms. I’ll guide you through setting it up, exploring different RL environments, and implementing a simple agent to apply an RL algorithm in Python.

Let’s get started! 🚀

## What is Gymnasium?

[Gymnasium](https://gymnasium.farama.org/) is an open-source Python library designed to support the development and evaluation of reinforcement learning (RL) algorithms. It provides a robust framework that simplifies RL research and experimentation by offering:

- A diverse range of environments, from simple games to complex real-world simulations.
- Intuitive APIs and wrappers for seamless interaction with environments.
- Flexibility to create custom environments while leveraging the standardized API framework.

With Gymnasium, developers can easily build and test RL algorithms using API calls to:

- Send the agent’s chosen actions to the environment.
- Retrieve the environment’s state and reward after each action.
- Train the RL model efficiently.
- Evaluate the model’s performance in different scenarios.

This structured approach makes Gymnasium a powerful tool for both beginners and experienced researchers in RL.

Since this code-along is recorded at a certain point in time, we'll install specific versions of the required dependencies.

In [289]:
!pip install torch==2.3.1 gymnasium==1.1.1

Defaulting to user installation because normal site-packages is not writeable


In [290]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributions as distributions
import numpy as np
import gymnasium as gym  

## Task 1: Setting up a Gymnasium Environment

A [Gymnasium Environment](https://gymnasium.farama.org/api/env/) is a controlled setting where an RL agent interacts, learns, and makes decisions to achieve a goal. Environments provide a structured way to model various real-world and simulated scenarios, making them essential for developing and testing reinforcement learning (RL) algorithms.

For this code-along we'll use the [CartPole-v1](https://gymnasium.farama.org/environments/classic_control/cart_pole/) environment. Our goal is to develop a simple neural network that is able keep the inverted pendulumn upright by the control the left-to-right motion of the cart on which it stands.

An episode ends if one of the following conditions occur:

1. Termination: Pole Angle is greater than ±12°
2. Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
3. Truncation: Episode length is greater than 500 (200 for v0)

We'll specify `render_mode="rgb_array"` to be able to visualize the state using matplotlib later on. 

![Cartpole](cartpole.png)


In [291]:
for i in gym.envs.registry.keys():
    print(i)

CartPole-v0
CartPole-v1
MountainCar-v0
MountainCarContinuous-v0
Pendulum-v1
Acrobot-v1
phys2d/CartPole-v0
phys2d/CartPole-v1
phys2d/Pendulum-v0
LunarLander-v3
LunarLanderContinuous-v3
BipedalWalker-v3
BipedalWalkerHardcore-v3
CarRacing-v3
Blackjack-v1
FrozenLake-v1
FrozenLake8x8-v1
CliffWalking-v0
Taxi-v3
tabular/Blackjack-v0
tabular/CliffWalking-v0
Reacher-v2
Reacher-v4
Reacher-v5
Pusher-v2
Pusher-v4
Pusher-v5
InvertedPendulum-v2
InvertedPendulum-v4
InvertedPendulum-v5
InvertedDoublePendulum-v2
InvertedDoublePendulum-v4
InvertedDoublePendulum-v5
HalfCheetah-v2
HalfCheetah-v3
HalfCheetah-v4
HalfCheetah-v5
Hopper-v2
Hopper-v3
Hopper-v4
Hopper-v5
Swimmer-v2
Swimmer-v3
Swimmer-v4
Swimmer-v5
Walker2d-v2
Walker2d-v3
Walker2d-v4
Walker2d-v5
Ant-v2
Ant-v3
Ant-v4
Ant-v5
Humanoid-v2
Humanoid-v3
Humanoid-v4
Humanoid-v5
HumanoidStandup-v2
HumanoidStandup-v4
HumanoidStandup-v5
GymV21Environment-v0
GymV26Environment-v0


In [292]:
env = gym.make('CartPole-v1', render_mode="rgb_array")

With the environment created, we can explore some attributes of it. For example, the observation space and action space.

In [293]:
env.observation_space

Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)

In [294]:
observation, info = env.reset()
print(observation)
print(info)

[ 0.03427567  0.03290458  0.0417716  -0.04361821]
{}


In [295]:
env.action_space

Discrete(2)

In [296]:
env.action_space.n

2

In [297]:
env.action_space.sample()

1

## Task 2: Create a Neural Network

For our policy, we'll create a simple neural network with 2 linear layers both with ReLU activation. The first layer has an input dimension equal to the dimension of the observation space. The output dimension of this layer is an arbitrary size, but the larger this number, the bigger the chance of overfitting.

Lastly, the output of the hidden layer is mapped to the output layer that has an output dimension equal to the size of the action space. This is then passed through a [softmax](https://pytorch.org/docs/stable/generated/torch.nn.Softmax.html) operator which results in a set of probabilities that sum to 1.

In [340]:
input_dim = env.observation_space.shape[0]
hidden_dim = 64
output_dim = env.action_space.n

def create_policy():
    return nn.Sequential(
        nn.Linear(input_dim, hidden_dim).double(),
        nn.ReLU(),
        nn.Linear(hidden_dim, hidden_dim).double(),
        nn.ReLU(),
        nn.Linear(hidden_dim, output_dim).double(),
        nn.Softmax()
    )

Let's see how we could use this policy with a state from the environment. First, check out the state.

In [299]:
state, info = env.reset()
state

array([-0.02814596,  0.03694265,  0.04702631,  0.01847005], dtype=float32)

As described in the [docs](https://gymnasium.farama.org/environments/classic_control/cart_pole/#observation-space), the state object maps to `[Cart Position, Cart Velocity, Pole Angle, Pole Angular Velocity]`. If we convert this array into a tensor, we can pass it to the policy instance we just defined.

In [300]:
policy = create_policy()
action_probabilities = policy(torch.tensor(state, dtype=torch.float64))
action_probabilities

tensor([0.4685, 0.5315], dtype=torch.float64, grad_fn=<SoftmaxBackward0>)

We can verify the application of the softmax operator. Try removing the softmax operator and see what it yields.

In [301]:
torch.sum(action_probabilities)

tensor(1.0000, dtype=torch.float64, grad_fn=<SumBackward0>)

We can now sample an action from the action space using these probabilities.

In [302]:
# Sample from the action space using the converted numpy array
env.action_space.sample(probability=action_probabilities.detach().cpu().numpy())

0

## Task 3: Define forward pass function

Now that we have our policy set up, we can start looking into training it using Gymnasium. The first step is to define a function in which we run a single iteration of the environment using our policy.

In [303]:
def run_environment(env, policy, render=False):
    observation, _ = env.reset()
    
    observations = [observation]
    actions = []
    action_probabilities = []
    rewards = []
    frames = []
    
    if render:
        frames.append(env.render())

    while True:
        action_probs = policy(torch.tensor(observation, dtype=torch.float64))

        # https://pytorch.org/docs/stable/distributions.html 
        dist = distributions.Categorical(action_probs)
        action = dist.sample()
        log_prob_action = dist.log_prob(action)
        
        # In some cases it might be possible to use the sample function the action_space object but we can't retrieve the probability of that action from it.
        # Which we need for the training here.
        # action = env.action_space.sample(probability=action_probs)

        # Take the chosen action and observe the next state and reward
        next_observation, reward, terminated, truncated, _, = env.step(action.item())

        observations.append(next_observation)
        actions.append(action.item())
        action_probabilities.append(log_prob_action.flatten())
        rewards.append(reward)
        
        if render:
            frames.append(env.render())

        observation = next_observation

        if terminated or truncated:
            break
    
    return observations, actions, torch.cat(action_probabilities), rewards, frames if render else None

Let's give this a go and try to visualize the episode!

In [317]:
observations, actions, action_probabilities, rewards, frames = run_environment(env, policy, render=True)

In [305]:
print(len(observations), len(actions), len(action_probabilities), len(rewards))
print(actions)
print(action_probabilities)
print(rewards)

41 40 40 40
[1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1, 0, 1, 1, 1]
tensor([-0.6322, -0.6365, -0.7519, -0.7525, -0.7579, -0.6340, -0.7577, -0.7569,
        -0.6365, -0.6321, -0.7546, -0.6316, -0.6367, -0.7469, -0.7515, -0.7589,
        -0.6353, -0.6317, -0.6414, -0.7460, -0.7470, -0.6345, -0.7458, -0.7536,
        -0.7555, -0.7461, -0.6502, -0.6401, -0.7573, -0.6382, -0.7581, -0.6371,
        -0.6329, -0.7446, -0.7553, -0.6344, -0.7531, -0.6324, -0.6394, -0.6506],
       dtype=torch.float64, grad_fn=<CatBackward0>)
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]


In [306]:
# Prompt: Create a matplotlib animation of all the rendered frames

import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import display, HTML

def display_renders(renders):
    # Create a figure and axis
    fig, ax = plt.subplots()

    # Initialize the first frame
    frame = renders[0]
    im = ax.imshow(frame)

    # Update function for animation
    def update(frame):
        im.set_array(frame)
        return [im]

    # Create the animation
    ani = animation.FuncAnimation(fig, update, frames=renders, blit=True)
    plt.close(fig)

    display(HTML(ani.to_jshtml()))

In [307]:
display_renders(frames)

Next, we need to calculate the rewards for the episode taking a discount factor into account. We calculate discounted rewards such that immediate rewards are valued higher than future rewards which can lead to a more efficient learning process.

In [331]:
def calculate_discounted_reward(rewards, discount_factor):
    discounted_rewards = np.zeros_like(rewards)
    reward_acc = 0

    for i in reversed(range(len(rewards))):
        reward_acc = rewards[i] + reward_acc * discount_factor
        discounted_rewards[i] = reward_acc
    
    discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float64)
    normalized_rewards = (discounted_rewards - discounted_rewards.mean()) / discounted_rewards.std()
    return normalized_rewards


To train our neural network, we also need a loss function. In reinforcement learning, there is no 'ground truth' with which we can compare our results. Instead, we aim to maximize our expected rewards. To achieve this, we simply negate the expected rewards.

In [309]:
def calculate_loss(rewards, action_probabilities):
    loss = -(rewards * action_probabilities).sum()
    return loss

Now we can put all this together in a single function that we use for every episode.

In [310]:
def run_epoch(env, policy, optimizer, discount_factor, render=False):
    policy.train()
    
    observations, actions, action_probabilities, rewards, frames = run_environment(env, policy, render=render)
    
    discounted_rewards = calculate_discounted_reward(rewards, discount_factor)
    
    loss = calculate_loss(discounted_rewards, action_probabilities)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    policy.eval()

    return np.sum(rewards), frames

Putting it all together:

In [341]:
epochs = 500
discount_factor = 0.99
n_trials = 25
goal = 400
learning_rate = 0.01

epoch_rewards = []
epoch_frames = []

policy = create_policy()
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)

for epoch in range(epochs):
    render = epoch % 10 == 0
    epoch_reward, frames = run_epoch(
        env,
        policy,
        optimizer,
        discount_factor,
        render
    )

    epoch_rewards.append(epoch_reward)
    mean_epoch_reward= np.mean(epoch_rewards[-n_trials:])
    
    if render:
        epoch_frames.append(frames)

    if epoch % 100 == 0:
        print(f'| Epoch: {epoch} | Mean Rewards: {mean_epoch_reward:.1f} |')

    if mean_epoch_reward >= goal:
        if not render:
            _, _, _, _, frames = run_environment(env, policy, render=True)
            epoch_frames.append(frames)
        print(f'Reached reward threshold in {epoch} epochs')
        break

| Epoch:   0 | Mean Rewards:  35.0 |
| Epoch: 100 | Mean Rewards: 208.7 |
Reached reward threshold in 144 epochs


In [342]:
import plotly.express as px

fig = px.line(
    x=list(range(len(epoch_rewards))),
    y=epoch_rewards,
    labels={'x': 'Epoch', 'y': 'Reward'},
    title='Epoch Rewards Over Time',
)

fig.show()

In [343]:
display_renders(epoch_frames[-1])