## Import all the required libraries

In [1]:
import numpy as np
import os

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Gym
import gymnasium as gym

# Video capture
import skvideo.io

## Create the environment
* Take note that two environments are created, one for training and one for evaluation.
* Technically, you just only need 1 environment, since there are no differences in the environment.

In [2]:
env_id = "CartPole-v1"
# Create the env
env = gym.make(env_id, render_mode='rgb_array')

# Create the evaluation env
eval_env = gym.make(env_id, render_mode='rgb_array')

# Get the state space and action space
s_size = env.observation_space.shape[0]
a_size = env.action_space.n

In [3]:
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample())  # Get a random observation

_____OBSERVATION SPACE_____ 

The State Space is:  4
Sample observation [ 1.8610468e+00 -7.1385665e+37  2.0359427e-01  1.5766993e+38]


In [4]:
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample())  # Take a random action


 _____ACTION SPACE_____ 

The Action Space is:  2
Action Space Sample 1


## Create the neural network
* Note the input and output size of the network should match the environment observations (known as the state space here) and actions

In [5]:
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        
        # h_size : Number of hidden layer nodes
        # s_size : State space size (input)
        # a_size : action space size (output)
        
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

In [6]:
debug_policy = Policy(s_size, a_size, 64)
debug_policy.act(env.reset()[0])

(0, tensor([-0.6793], grad_fn=<SqueezeBackward1>))

In [7]:
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100)
    scores = []
    # Line 3 of pseudocode
    for i_episode in range(1, n_training_episodes + 1):
        saved_log_probs = []
        rewards = []
        state = env.reset()[0]
        # Line 4 of pseudocode
        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            state, reward, done, _, _ = env.step(action)
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        # Line 6 of pseudocode: calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)

        for t in range(n_steps)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()
        ## eps is the smallest representable float, which is
        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Line 7:
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Line 8: PyTorch prefers gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print("Episode {}\tAverage Score: {:.2f}".format(i_episode, np.mean(scores_deque)))

    return scores

## Set hyperparameters

In [8]:
cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes": 1000,
    "n_evaluation_episodes": 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr": 1e-2,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}

## Setup the optimizer and start training

In [9]:
cartpole_policy = Policy(
    cartpole_hyperparameters["state_space"],
    cartpole_hyperparameters["action_space"],
    cartpole_hyperparameters["h_size"],
)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])

In [10]:
scores = reinforce(
    cartpole_policy,
    cartpole_optimizer,
    cartpole_hyperparameters["n_training_episodes"],
    cartpole_hyperparameters["max_t"],
    cartpole_hyperparameters["gamma"],
    100,
)

Episode 100	Average Score: 37.80
Episode 200	Average Score: 375.88
Episode 300	Average Score: 798.57
Episode 400	Average Score: 946.64
Episode 500	Average Score: 990.64
Episode 600	Average Score: 965.03
Episode 700	Average Score: 997.06
Episode 800	Average Score: 1000.00
Episode 900	Average Score: 957.76
Episode 1000	Average Score: 1000.00


## Define a function to evaluate the agent and calculate the mean reward

In [11]:
def evaluate_agent(env, max_steps, n_eval_episodes, policy):
    """
    Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
    :param env: The evaluation environment
    :param n_eval_episodes: Number of episode to evaluate the agent
    :param policy: The Reinforce agent
    """
    episode_rewards = []
    for episode in range(n_eval_episodes):
        state = env.reset()[0]
        step = 0
        done = False
        total_rewards_ep = 0

        for step in range(max_steps):
            action, _ = policy.act(state)
            new_state, reward, done, info, _ = env.step(action)
            total_rewards_ep += reward

            if done:
                break
            state = new_state
        episode_rewards.append(total_rewards_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward

In [12]:
evaluate_agent(
    eval_env, cartpole_hyperparameters["max_t"], cartpole_hyperparameters["n_evaluation_episodes"], cartpole_policy
)

(1000.0, 0.0)

## Define a function to generate a video of the policy

In [16]:
def record_video(env, policy, out_directory, max_steps=1000, fps=30):
    """
    Generate a replay video of the agent
    :param env
    :param Qtable: Qtable of our agent
    :param out_directory
    :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
    """
    images = []
    done = False
    state = env.reset()[0]
    img = env.render()
    images.append(img)
    for step in range(max_steps):
        # Take the action (index) that have the maximum expected future reward given that state
        action, _ = policy.act(state)
        state, reward, done, info, _ = env.step(action)  # We directly put next_state = state for recording logic
        img = env.render()
        images.append(img)
    #imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)
    skvideo.io.vwrite(out_directory, 
              np.asarray(images),inputdict={"-r":"30"}, outputdict={"-r" : "30", "-pix_fmt": "yuv420p"})

In [17]:
record_video(env, cartpole_policy, "replay.mp4")