# Inverted Pendulum
**Objective**: To balance the pole (inverted pendulum) on top of the cart <br>

**Actions**: The agent takes a 1D vector for actions. The action space is a continuous `(action)` in `[-3, 3]`, where action represents the numerical force applied to the cart (with magnitude representing the amount of force and sign representing the direction) <br>

**Approach**: We use PyTorch to code REINFORCE from scratch to train a Neural Network policy to master inverted pendulum.

In [2]:
from __future__ import annotations

import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
from torch.distributions.normal import Normal

import gymnasium as gym

plt.rcParams["figure.figsize"] = (10, 5)

## Policy Network
We start by building a policy that the agent will learn using REINFORCE. A policy is a mapping from the current environment observation to a probability distribution of the actions to be taken. The policy used in the tutorial is parameterised by a neural network. It consists of 2 linear layers that are shared between both the predicted mean and standard deviation. Further, the single individual linear layers are used to estimate the mean and the standard deviation. `nn.Tanh` is used as a non-linearity between the hidden layers. The following function estimates a mean and standard deviation of a normal distribution from which an action is sampled. Hence it is expected for the policy to learn appropriate weights to output means and standard deviation based on the current observation.

In [3]:
class Policy_Network(nn.Module):
    """Parameterised Policy Network"""

    def __init__(self, obs_space_dims: int, action_space_dims: int):
        """Initialises a neural network that estimates the mean and standard deviation of a normal distribution from which an 
        action is sampled from.
        
        Args:
            obs_space_dims: dimension of the observation space
            action_space_dims: dimension of the action space
        """
        super().__init__()

        hidden_space1 = 16
        hidden_space2 = 32

        # shared network
        self.shared_net = nn.Sequential(
            nn.Linear(obs_space_dims, hidden_space1),
            nn.Tanh(),
            nn.Linear(hidden_space1, hidden_space2),
            nn.Tanh()
        )

        # policy mean specific linear layer
        self.policy_mean_net = nn.Sequential(
            nn.Linear(hidden_space2, action_space_dims)
        )

        # policy std speific linear layer
        self.policy_stddev_net = nn.Sequential(
            nn.Linear(hidden_space2, action_space_dims)
        )
    
    def forward(self, x: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
        """Conditioned on the observation, returns the mean and standard deviation of a normal distribution from which
        an action is sampled from.
        
        Args:
            x: observation from the environment
        
        Returns:
            action_means: predicted mean of the normal distribution
            action_stddevs: predicted standard deviation of the normal distribution
        """
        shared_features = self.shared_net(x.float())

        action_means = self.policy_mean_net(shared_features)
        action_stddevs = torch.log(
            1 + torch.exp(self.policy_stddev_net(shared_features))
        )

        return action_means, action_stddevs

## Building an agent
REINFORCE: Reward Increment Non-negative Factor times Offset Reinforcement times Characteristic Eligibility

In [4]:
class REINFORCE:
    """REINFORCE algorithm"""

    def __init__(self, obs_space_dims: int, action_space_dims: int):
        """Initialises an agent that learns a policy via REINFORCE algorithm.
        
        Args:
            obs_space_dims: dimension of the observation space
            action_space_dims: dimension of the action space
        """

        # Hyperparameters
        self.learning_rate = 1e-4  # learning rate for policy optimisation
        self.gamma = 0.99  # discount factor
        self.eps = 1e-6  # small number for mathematical stability

        self.probs = []  # stores probability values of the sampled action
        self.rewards = []  # stores the corresponding rewards

        self.net = Policy_Network(obs_space_dims, action_space_dims)
        self.optimiser = torch.optim.AdamW(self.net.parameters(), lr=self.learning_rate)

    def sample_action(self, state: np.ndarray) -> float:
        """Returns an action, conditioned on the policy and observation.
        
        Args:
            state: observation from the environment

        Returns:
            action: action to be performed
        """
        state = torch.tensor(np.array([state]))
        action_means, action_stddevs = self.net(state)

        # create a normal distribution from the predicted mean and standard deviation and sample an action
        distrib = Normal(action_means[0] + self.eps, action_stddevs[0] + self.eps)
        action = distrib.sample()
        prob = distrib.log_prob(action)

        action = action.numpy()

        self.probs.append(prob)

        return action
    
    def update(self):
        """Updates the policy network's weights"""
        running_g = 0
        gs = []

        # discounted return (backwards) - [::-1] will return an array in reverse
        for R in self.rewards[::-1]:
            running_g = R + self.gamma * running_g
            gs.insert(0, running_g)
        
        deltas = torch.tensor(gs)

        loss = 0
        # minimise -1 * prob * reward obtained
        for log_prob, delta in zip(self.probs, deltas):
            loss += log_prob.mean() * delta * (-1)

        # update the policy network
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # empty/zero out all episode-centric/related variables
        self.probs = []
        self.rewards = []

### Training procedure
    for seed in random seeds
        reinitialise agent 
        for episode in range of max numbers of episodes
            until episode is done
                sample action based on current observation
                take action and receive reward and next observation
                store action taken, its probability, and the observed reward
            update the policy

Note: Deep RL is fairly brittle concerning random seed in a lot of common use cases. Hence it is important to test out various seeds.

In [None]:
# create and wrap the environment
env = gym.make("InvertedPendulum-v4")
wrapped_env = gym.wrappers.RecordEpisodeStatistics(env, 50)  # records episode-reward

total_num_episodes = int(5e3)  # total number of episodes
# observation 