<a href="https://colab.research.google.com/github/ymiftah/machine_learning/blob/master/gym_cartpole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# The CartPole

The cartpole stabilization is a classic problem in control systems. It also serves as a good topical introduction to reinforcement learning.

This notebook contains some of my experimentations with openai/gym

## Install dependancies to visualise OpenAI Gym results on colab

Credits to [Paul Steven Conyngham, Star-AI](https://star-ai.github.io/Rendering-OpenAi-Gym-in-Colaboratory/) for the code in this section.

We recommend running this notebook on Google Colab.


In [0]:
# Rendering Dependancies
!pip install -qq gym pyvirtualdisplay
!apt-get install -y -qq xvfb python-opengl ffmpeg

!apt-get update -qq
!apt-get install -qq cmake
!pip install --upgrade -qq setuptools
!pip install -qq ez_setup

## Imports and helper function
# Install TensorFlow
try:
  # %tensorflow_version only exists in Colab.
  %tensorflow_version 2.x
except Exception:
  pass

import tensorflow as tf

import gym
from gym import logger as gymlogger
from gym.wrappers import Monitor
gymlogger.set_level(40) #error only
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math
import glob
import io
import base64
from IPython.display import HTML

from IPython import display as ipythondisplay

from pyvirtualdisplay import Display
display = Display(visible=0, size=(1400, 900))
display.start()

"""
Utility functions to enable video recording of gym environment and displaying it
To enable video, just do "env = wrap_env(env)""
"""

def show_video(folder):
    mp4list = glob.glob(folder+'/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{0}" type="video/mp4" />
                 </video>'''.format(encoded.decode('ascii'))))
    else: 
        print("Could not find video")
    

def wrap_env(env, folder='./video'):
    env = Monitor(env, folder, force=True)
    return env

def play(env, agent):
    observation = env.reset()
    reward = 0
    counter = 0
    done = False

    while True:
        counter += 1
        env.render()
        
        #your agent goes here
        action = agent.act(observation, reward, done) 
            
        observation, reward, done, info = env.step(action)

        if done:
            print('Total steps taken : ', counter)
            print(info)
            break;
                
    env.close()

## Cart Pole

The cartpole problem is a classical problem of control theory.
The goal is to find the right controls for the cart such that the inverted pendulum stays in a stable upright position.

<center> <img src=https://upload.wikimedia.org/wikipedia/commons/thumb/0/00/Cart-pendulum.svg/300px-Cart-pendulum.svg.png> </center>

We remind below the equations ruling the dynamics of the cartpole system (frictionless), and which are implemented on OpenAI CartPole environment.

$$ \ddot{x} = \frac{F+m_p l\ [\sin(\theta)\ \dot{\theta}^2- \cos(\theta) \ddot{\theta}]}{M} $$

$$
\ddot{\theta}=
\frac{g\sin(\theta) - \frac{F}{M}\cos(\theta) - \frac{m_p}{M}l \cos(\theta)\sin(\theta)\dot{\theta}^2}{l [\frac{4}{3} - \frac{m_p}{M} \ \cos^2\theta]}
$$

We stress that contrary to the above picture, the angle in the gym environment is positive when the ball is on the right.

Let us see how fares a random agent

In [0]:
class RandomAgent(object):
    def act(self, observation, reward, done):
        return np.random.randint(0,2) # [0,2) is [0,1]

env = wrap_env(gym.make("CartPole-v1"), folder='./video')
env.theta_threshold_radians = 1
agent = RandomAgent()

play(env, agent)

show_video(folder='./video')

As expected the pole falls quite quickly

## Classic control

Let us check how good old control theory performs on this task.

The equations of dynamics of the cartpole are known. We can linearize these equations around the unstable equilibrium position at $\theta = 0$, and $x=0$.

We define $\mathbf{X}$ as

$$
\mathbf{X}
=
\begin{bmatrix} x \\ \dot{x} \\  
\theta \\ \dot{\theta} \end{bmatrix}
$$.

We have the following linearized system:

$$
\mathbf{\dot{X}}
=
\begin{bmatrix}\dot{x} \\ \ddot{x} \\  
\dot{\theta} \\ \ddot{\theta}
\end{bmatrix}
=
\begin{bmatrix}
0 & 1 & 0 & 0 \\
0 & 0 & - \frac{m_p}{\frac{4}{3}M - m_p}g & 0 \\  
0 & 0 & 0 & 1 \\
0 & 0 & \frac{M}{\frac{4}{3}M - m_p}\frac{g}{l} & 0 
\end{bmatrix}
\begin{bmatrix} x \\ \dot{x} \\  
\theta \\ \dot{\theta} \end{bmatrix}
+
\begin{bmatrix}
0 \\ \frac{m_p}{M} \frac{F}{\frac{4}{3}M - m_p} + \frac{F}{M}\\  
0 \\ -\frac{1}{\frac{4}{3}M - m_p}\frac{F}{l}
\end{bmatrix}u
$$

Introducing the matrices $A$ and $B$, and $u$ the direction of the force (control variable), we have the following form:

$$
\mathbf{\dot{X}} = A \mathbf{X} + bu
$$



In [0]:
env = gym.make("CartPole-v1")
m_p = env.masspole
M = env.total_mass
bla = (1/(4./3 * M - m_p))
F = env.force_mag
l = env.length
g = env.gravity

A = np.array([[0, 1, 0, 0],
              [0, 0, -m_p*bla*g, 0],
              [0, 0, 0, 1],
              [0, 0, M*g/l*bla, 0]])
B = np.array([0, F*bla*m_p/M + F/M, 0, - F/l*bla])

## Linear Feedback

We will find a linear feedback control $R$ such that $u = - \mathbf{r^TX}$. then

$$
\mathbf{\dot{X}} = \mathbf{(A-br^T)X}
$$

Can we find the vector r such that the eigen values of the matrix $\mathbf{(A-br^T)}$ all have a negative real part ?

We will use the scipy "place_poles" function to select the gain vector $\mathbf{r}$ such that the poles are around -1.
" Around " -1 because the rank of B is 1, so the multiplicity of the poles must be no greater than 1 for the place_poles algorithm to function.

In [0]:
from scipy.signal import place_poles

r = place_poles(A,B.reshape(4,1),[-1.1,-1.02,-1.05,-1.01]).gain_matrix.flatten()
print(r)

In [0]:
env = wrap_env(gym.make("CartPole-v1"), folder='./video')

class ClassicControlAgent(object):
    def __init__(self, gain):
        self.gain = gain
    def act(self, observation, reward, done):
        x, dx, theta, dtheta = observation

        u = - np.dot(self.gain, np.array(observation))
        sign = u > 0
        return int(sign)

agent = ClassicControlAgent(r)

play(env, agent)

show_video(folder='./video')

This looks much better : the pole is held perfectly upright, with a limitation that the pole may still go off bounds (this could be corrected with an improved controller).

Of course in this case our agent knows exactly the dynamics of the problem, in a way we have hardcoded the agent with the right control logic.

What if the dynamics are unknown or too complex to approach by a classical method ?

We will try to have an agent learn the right control logic through experience, instead of telling the agent how.

## Cross Entropy Learning

Our agent will now learn which action to take through learning from experience.

First we will apply the cross entropy method (implementation from [Max Lapan](https://github.com/PacktPublishing/Deep-Reinforcement-Learning-Hands-On) )

The method is as follows:
- First the agent is a neural network which input is the current observation, and output the probability of taking action 0 or 1.
- The agent plays BATCH_SIZE of games, and keeps the best performing plays to train the network.

The idea is that the agent will progressively learn the replicate the best plays.

In [0]:
import torch
import torch.nn as nn
import torch.optim as optim

from collections import namedtuple

HIDDEN_SIZE = 128
BATCH_SIZE = 32
PERCENTILE = 70

# We define a two-layer neural network of dense layers
class Net(nn.Module):
    def __init__(self, obs_size, hidden_size, n_actions):
        super(Net, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ELU(),
            nn.Linear(hidden_size, hidden_size//2),
            nn.ELU(),
            nn.Linear(hidden_size//2, n_actions)
        )

    def forward(self, x):
        return self.net(x)


class NetAgent(object):
    def __init__(self, net):
        self.nn = net
    
    def act(self, obs, reward, done):
        assert isinstance(observation, np.ndarray) and observation.ndim == 1, 'unsupported observation type for now.'
        
        sm = nn.Softmax(dim=1)
        obs_v = torch.FloatTensor([obs])
        act_probs_v = sm(self.nn(obs_v))
        act_probs = act_probs_v.data.numpy().flatten()
        # We choose the action according to the model output distribution
        action = np.random.choice(len(act_probs), p=act_probs)
        return action

Episode = namedtuple('Episode', field_names=['reward', 'steps'])
EpisodeStep = namedtuple('EpisodeStep', field_names=['observation', 'action'])

# Helper function to create a batch of 
def iterate_batches(env, net, batch_size):
    batch = []
    episode_reward = 0.0
    episode_steps = []
    obs = env.reset()
    sm = nn.Softmax(dim=1)
    while True:
        # Get the action from the observation
        obs_v = torch.FloatTensor([obs])
        act_probs_v = sm(net(obs_v))
        act_probs = act_probs_v.data.numpy()[0]
        action = np.random.choice(len(act_probs), p=act_probs)

        # Take action and record
        next_obs, reward, is_done, _ = env.step(action)
        episode_reward += reward
        episode_steps.append(EpisodeStep(observation=obs, action=action))
        if is_done:
            # Add the full episode and total reward
            batch.append(Episode(reward=episode_reward, steps=episode_steps))

            #reset the episode records and start a new game
            episode_reward = 0.0
            episode_steps = []
            next_obs = env.reset()

            # We played batch_size number of games, yield the result
            if len(batch) == batch_size:
                yield batch
                batch = []
        
        # Not done, we keep on playing
        obs = next_obs

# Helper function to filter from the batch of episode those which
# total reward is in the Xth percentile
def filter_batch(batch, percentile):
    rewards = list(map(lambda s: s.reward, batch))
    reward_bound = np.percentile(rewards, percentile)
    reward_mean = float(np.mean(rewards))

    train_obs = []
    train_act = []
    for example in batch:
        if example.reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, example.steps))
        train_act.extend(map(lambda step: step.action, example.steps))

    # make then torch tensors
    train_obs_v = torch.FloatTensor(train_obs)
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_bound, reward_mean

In [0]:
env = gym.make("CartPole-v1")
env._max_episode_steps = 500
# env = gym.wrappers.Monitor(env, directory="mon", force=True)
obs_size = env.observation_space.shape[0]
n_actions = env.action_space.n

# Define our network, objective and optimizer
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.01)

def CrossEntropyTraining(env, net):
    # Logs of the mean total reward of a batch to monitor learning progress
    log_mean = [0]*10

    for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):
        obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)

        optimizer.zero_grad()
        action_scores_v = net(obs_v)
        loss_v = objective(action_scores_v, acts_v)
        loss_v.backward()
        optimizer.step()
        print("\r %d: loss=%.3f, reward_mean=%.1f, reward_bound=%.1f" % (
            iter_no, loss_v.item(), reward_m, reward_b), end="", flush=True)
        log_mean.append(reward_m)

        if iter_no > 100 or sum(log_mean[-10:])/5 >= 490:
            print("\n Solved!")
            break
    return log_mean

log_mean = CrossEntropyTraining(env, net)
# writer.close()
plt.plot(log_mean)

In [0]:
env = wrap_env(gym.make("CartPole-v1"), folder='./video')
env._max_episode_steps = 500
observation = env.reset()
reward = 0
counter = 0
done = False

while True:
    counter += 1
    env.render()
    #your agent goes here
    sm = nn.Softmax(dim=1)
    obs_v = torch.FloatTensor([observation])
    act_probs_v = sm(net(obs_v))
    act_probs = act_probs_v.data.numpy()[0]
    action = np.random.choice(len(act_probs), p=act_probs)
        
    observation, reward, done, info = env.step(action)

    if done:
        print('Total steps taken : ', counter)
        break;
            
env.close()

show_video(folder='./video')


Compared to the feedback controller method we see that our pole is much less stable with this simple algorithm, but it does learn to keep the pole up !

We will try to have the agent keep the pole at the central position. For this we
will slightly modify our rewards to add a penalty for deviating from the central position.

In [0]:
class MyRewardWrapper(gym.RewardWrapper):
    def __init__(self, env):
        super(MyRewardWrapper, self).__init__(env)

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        observation, reward, done, info = self.env.step(action)
        reward += np.exp(-10 * np.abs(np.linalg.norm(observation)))
        return observation, reward, done, info

env = MyRewardWrapper(gym.make("CartPole-v1"))
env._max_episode_steps = 1000
env.reset()

In [0]:
# Define our network, objective and optimizer
net = Net(obs_size, HIDDEN_SIZE, n_actions)
objective = nn.CrossEntropyLoss()
optimizer = optim.Adam(params=net.parameters(), lr=0.01)

log_mean = CrossEntropyTraining(env, net)
# writer.close()
plt.plot(log_mean)

In [0]:
env = wrap_env(gym.make("CartPole-v1"), folder='./video')
env._max_episode_steps = 500
observation = env.reset()
reward = 0
counter = 0
done = False

while True:
    counter += 1
    env.render()
    #your agent goes here
    sm = nn.Softmax(dim=1)
    obs_v = torch.FloatTensor([observation])
    act_probs_v = sm(net(obs_v))
    act_probs = act_probs_v.data.numpy()[0]
    action = np.random.choice(len(act_probs), p=act_probs)
        
    observation, reward, done, info = env.step(action)

    if done:
        print('Total steps taken : ', counter)
        break;
            
env.close()

show_video(folder='./video')

Pretty, neat, it does seem to bring the pole at the $0$ position as much as possible