In [24]:
# Implementation of Cross-Entropy to solve Gymnasium's CartPole-V1 environment

import numpy as np
import typing as tt
from dataclasses import dataclass

# OpenAI Gymnasium
import gymnasium as gym
# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim

In [25]:
# Number of neurons in the hidden layer of the NN
HIDDEN_SIZE = 128
# Number of episodes per batch
BATCH_SIZE = 16
# Percentile above which episodes will be selected for training NN
PERCENTILE = 70

In [26]:
# Neural Network for training the RL agent
class Net(nn.Module):
    def __init__(self, obs_size: int, hidden_size: int, n_actions: int):
        super(Net, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions)
        )

    def forward(self, x: torch.Tensor):
        return self.net(x)

In [27]:
# Data struct for observations and the action selected for it
@dataclass
class EpisodeStep:
    observation: np.ndarray
    action: int

# Data struct for total reward from a list of episode steps
@dataclass
class Episode:
    reward: float
    steps: tt.List[EpisodeStep]

In [28]:
def iterate_batches(env: gym.Env, net: Net, batch_size: int) -> tt.Generator[tt.List[Episode], None, None]:
    """
    Plays the environment using the current state of the NN.
    
    Args:
        env (gym.Env): OpenAI environment
        net (Net): Neural Network for training the RL agent
        batch_size (int): Number of episodes to train on
    """
    # Initialize
    batch = []
    episode_reward = 0.0
    episode_steps = []
    # Reset the environment
    obs, _ = env.reset()
    # Softmax to convert the NN's output into a probability distribution
    sm = nn.Softmax(dim=1)
    while True:
        # Convert observation to a PyTorch tensor
        obs_v = torch.tensor(obs, dtype=torch.float32)
        # Pass the tensor observation to the NN, get action probabilities
        act_probs_v = sm(net(obs_v.unsqueeze(0)))
        # Select the next action based on the probabilities
        act_probs_v = sm(net(obs_v.unsqueeze(0)))
        act_probs = act_probs_v.data.numpy()[0]
        action = np.random.choice(len(act_probs), p=act_probs)
        # Step the environment based on the action
        next_obs, reward, is_done, is_trunc, _ = env.step(action)
        # Add to the reward tally
        episode_reward += float(reward)
        # Append observation and action to the list
        step = EpisodeStep(observation=obs, action=action)
        episode_steps.append(step)
        # If the episode ended then prepare and yield the batch
        if is_done or is_trunc:
            e = Episode(reward=episode_reward, steps=episode_steps)
            batch.append(e)
            episode_reward = 0.0
            episode_steps = []
            next_obs, _ = env.reset()
            if len(batch) == batch_size:
                yield batch
                batch = []
        obs = next_obs

In [29]:
def filter_batch(batch: tt.List[Episode], percentile: float) -> \
        tt.Tuple[torch.FloatTensor, torch.LongTensor, float, float]:
    """
    Filters out the episodes that are below the percentil argument.
    Returns the observations, actions, reward bound for filtering, and the
    average reward of the episode list.

    Args:
        batch (tt.List[Episode]): List of episodes to train the RL agent on.
        percentile (float): Threshold for keeping the best episodes.
    """
    # Get the rewards from the batch
    rewards = list(map(lambda s: s.reward, batch))
    # Calculate the threshold for keeping episodes
    reward_bound = float(np.percentile(rewards, percentile))
    # Calculate the average reward
    reward_mean = float(np.mean(rewards))

    train_obs: tt.List[np.ndarray] = []
    train_act: tt.List[int] = []
    # Loop through and cull episodes below the threshold
    for episode in batch:
        if episode.reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, episode.steps))
        train_act.extend(map(lambda step: step.action, episode.steps))

    train_obs_v = torch.FloatTensor(np.vstack(train_obs))
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_bound, reward_mean

In [30]:
if __name__ == "__main__":
    # Create the Cartpole environment
    env = gym.make("CartPole-v1")
    assert env.observation_space.shape is not None
    # Get the number of items that will be in each observation
    obs_size = env.observation_space.shape[0]
    assert isinstance(env.action_space, gym.spaces.Discrete)
    # Get the number of actions possible
    n_actions = int(env.action_space.n)

    # Build the NN
    net = Net(obs_size=obs_size, hidden_size=HIDDEN_SIZE, n_actions=n_actions)
    print(net)
    # Set up the objective function to optimize
    objective = nn.CrossEntropyLoss()
    # Set up the optimizer with learning rate
    optimizer = optim.Adam(params=net.parameters(), lr=0.01)

    for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):
        obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
        # Zero-out the gradients
        optimizer.zero_grad()
        # Run observations through NN
        action_scores_v = net(obs_v)
        # Calculate the loss
        loss_v = objective(action_scores_v, acts_v)
        # Back-propagation
        loss_v.backward()
        # Gradient-descent
        optimizer.step()
        # Print progress
        print("%d: loss=%.3f, reward_mean=%.1f, rw_bound=%.1f" % (
            iter_no, loss_v.item(), reward_m, reward_b))
        # Check if training is good enough
        if reward_m > 495:
            print("Solved!")
            break
env.close()


Net(
  (net): Sequential(
    (0): Linear(in_features=4, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=2, bias=True)
  )
)
0: loss=0.697, reward_mean=19.2, rw_bound=21.0
1: loss=0.693, reward_mean=26.4, rw_bound=30.5
2: loss=0.678, reward_mean=25.8, rw_bound=28.5
3: loss=0.662, reward_mean=27.8, rw_bound=29.5
4: loss=0.653, reward_mean=38.2, rw_bound=45.5
5: loss=0.636, reward_mean=32.3, rw_bound=38.0
6: loss=0.648, reward_mean=59.1, rw_bound=67.0
7: loss=0.624, reward_mean=42.6, rw_bound=55.0
8: loss=0.605, reward_mean=49.4, rw_bound=55.0
9: loss=0.607, reward_mean=60.8, rw_bound=71.0
10: loss=0.623, reward_mean=74.4, rw_bound=92.5
11: loss=0.595, reward_mean=64.4, rw_bound=75.5
12: loss=0.602, reward_mean=79.4, rw_bound=88.5
13: loss=0.588, reward_mean=71.4, rw_bound=73.5
14: loss=0.585, reward_mean=74.9, rw_bound=88.5
15: loss=0.582, reward_mean=69.4, rw_bound=76.5
16: loss=0.595, reward_mean=85.9, rw_bound=91.5
17: loss=0.584, reward_mea

In [33]:
# Create a new environment with render_mode set to 'human'
env_render = gym.make("CartPole-v1", render_mode="human")
obs, _ = env_render.reset()

# Run the trained agent in the environment
sm = nn.Softmax(dim=1)
while True:
    # Turn the observation into a PyTorch Tensor
    obs_v = torch.tensor(obs, dtype=torch.float32)
    # Get the action probabilities
    act_probs_v = sm(net(obs_v.unsqueeze(0)))
    # Select the next action
    action = torch.argmax(act_probs_v).item()
    
    # Step the environment on the action
    obs, reward, done, truncated, _ = env_render.step(action)
    
    # Check if is done
    if done or truncated:
        break

env_render.close()