# Install Dependencies

If you are running on Google Colab, you need to install the necessary dependencies before beginning the exercise. 

**Note:** Don't forget to restart the environment with the button in the output. Some packages are already imported and need to be re-imported after installing the dependencies.

In [None]:
!apt-get update
!apt-get install -y xvfb python-opengl ffmpeg cmake

!pip uninstall -y pyarrow
!pip install ray[rllib] gym==0.21 pyvirtualdisplay bs4 ez_setup pygame
!pip install --upgrade setuptools

print("Successfully installed all the dependencies!")


# Import libraries and define virtual display to display video in Colab

Utility functions that enable recording and displaying a video of the environment in Google Colab.

In [None]:
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay

from pyvirtualdisplay import Display
from gym.wrappers import Monitor

"""
Utility functions to enable video recording of gym environment and displaying it
To enable video, just do "env = wrap_env(env)""
"""

def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")

def wrap_env(env):
  env = Monitor(env, './video', force=True)
  return env

display = Display(visible=0, size=(1400, 900))
display.start()

# Define CartPoleRightLeft environment

In this section we define the custom CartPoleRightLeft environment and register it with rllib.

#### Inspection Tool

We can use this code snippet to see the original implementation of environment functions to make necessary changes. (This can also be done by looking at the source code on Github, etc but using this tool we're sure about the version we are extending.)

In [None]:
# Helper code snippet to look into original implementation of different functions in the original env class
from gym.envs.classic_control.cartpole import CartPoleEnv

import inspect
env = CartPoleEnv()
lines = inspect.getsource(env.reset)
print(lines)

#### CartPoleRightLeft

In this cell, we create the custom environment for the CartPoleRightLeft enviroment.

We want the agent to balance the pole on the left or right side of the screen depending on a state variable provided by the environment. We name this variable `directional_goal`. To add this new state variable we perform the following changes in the environment:

*   Add the new variable to the `self.observation_space` attribute. (*lines 103-106*) 
*   Add the variable to the `reset` function. (*lines 199-202*)
*   Use the variable in the `step` function to calculate reward. Detailed below.

*(Holding Ctrl and pressing M L (one by one) switches on/off line numbers in the cells containing code.)*

The next step is to define a new reward function that takes the `direction_goal` variable into account. To achieve this we assume that the agent receives maximum reward when it is half its width away from the edge of the screen. We use the inspection tool above along very basic algebra to calculate the distances and the position of maximum reward. (*line 108-116*)

This is illustrated in the image below:

![picture](https://drive.google.com/uc?export=view&id=11TaxSwO4I7DIcvRbBs6BpMkwoHbblyoZ
)

Finally, we modify the `step` function and add this positional reward to the existing reward. The reward is setup in a way that the agent recieves a reward of `1` if it is balancing the pole in one of the edges. Moreover, the reward linearly decreases the further the agent is from the respective edge. The scaling factor is setup in a way that the agent gets a reward of `0.5` when it is in the middle position. (*lines 118-135, 167, 176-181*)

Note that the reward must be designed in way that that the agent still has incentive to balance the pole while also being incentivized to move towards one of the edges.

In [None]:
"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""
from gym.envs.classic_control.cartpole import CartPoleEnv

import math
from typing import Optional, Union

import numpy as np

import gym
from gym import logger, spaces
from gym.error import DependencyNotInstalled

class CartPoleRightLeft(CartPoleEnv):
    """
    ### Description

    This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson in
    ["Neuronlike Adaptive Elements That Can Solve Difficult Learning Control Problem"](https://ieeexplore.ieee.org/document/6313077).
    A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track.
    The pendulum is placed upright on the cart and the goal is to balance the pole by applying forces
     in the left and right direction on the cart.

    ### Action Space

    The action is a `ndarray` with shape `(1,)` which can take values `{0, 1}` indicating the direction
     of the fixed force the cart is pushed with.

    | Num | Action                 |
    |-----|------------------------|
    | 0   | Push cart to the left  |
    | 1   | Push cart to the right |

    **Note**: The velocity that is reduced or increased by the applied force is not fixed and it depends on the angle
     the pole is pointing. The center of gravity of the pole varies the amount of energy needed to move the cart underneath it

    ### Observation Space

    The observation is a `ndarray` with shape `(5,)` with the values corresponding to the following positions and velocities:

    | Num | Observation           | Min                 | Max               |
    |-----|-----------------------|---------------------|-------------------|
    | 0   | Cart Position         | -4.8                | 4.8               |
    | 1   | Cart Velocity         | -Inf                | Inf               |
    | 2   | Pole Angle            | ~ -0.418 rad (-24°) | ~ 0.418 rad (24°) |
    | 3   | Pole Angular Velocity | -Inf                | Inf               |
    | 4   | Direction Goal        | 0 (Left)            | 1 (Right)         |

    **Note:** While the ranges above denote the possible values for observation space of each element,
        it is not reflective of the allowed values of the state space in an unterminated episode. Particularly:
    -  The cart x-position (index 0) can be take values between `(-4.8, 4.8)`, but the episode terminates
       if the cart leaves the `(-2.4, 2.4)` range.
    -  The pole angle can be observed between  `(-.418, .418)` radians (or **±24°**), but the episode terminates
       if the pole angle is not in the range `(-.2617, .2617)` (or **±15°**)

    ### Rewards

    Since the goal is to keep the pole upright for as long as possible, a reward of `+1` for every step taken,
    including the termination step, is allotted. The threshold for rewards is 475 for v1.

    ### Starting State

    All observations except the direction goal are assigned a uniformly random value in `(-0.05, 0.05)`.
    The direction goal is randomly chosen between 0 and 1.

    ### Episode Termination

    The episode terminates if any one of the following occurs:
    1. Pole Angle is greater than ±12°
    2. Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
    3. Episode length is greater than 500 (200 for v0)

    ### Arguments

    ```
    gym.make('CartPole-v1')
    ```

    No additional arguments are currently supported.
    """
    def __init__(self, render_mode: Optional[str] = None):
        super(CartPoleRightLeft, self).__init__()

        self.screen_width = 600
        self.screen_height = 400

        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array(
            [
                self.x_threshold * 2,
                np.finfo(np.float32).max,
                self.theta_threshold_radians * 2,
                np.finfo(np.float32).max,
            ],
            dtype=np.float32,
        )

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(
            np.append(-high, 0), 
            np.append(high, 1), dtype=np.float32
            )

        # width of cart not scaled to screen
        self.cart_width = 50.0
        world_width = self.x_threshold * 2
        scale = self.screen_width / world_width
        self.unscaled_cart_width = self.cart_width / scale

        # absolute position of the maximum reward position
        self.maximum_reward_position = \
          self.x_threshold - self.unscaled_cart_width / 2

        # positional_reward_scaling_factor
        self.reward_scaling_factor = \
          self.x_threshold * 2 - (self.unscaled_cart_width / 2)

    def calculate_positional_reward(self, x, direction_goal):
      """
      Method that takes in the x position of the cart alongside the 
      direction goal and calculates the positional reward of the agent.

      The positional reward is setup in a way that the agent gets a less total
      reward if the cart is far from the maximum reward position.
      """
      if direction_goal == 0:
        maximum_reward_position = -self.maximum_reward_position
      else:
        maximum_reward_position = self.maximum_reward_position

      return (-abs(x - maximum_reward_position)) / self.reward_scaling_factor

    def step(self, action):
        err_msg = f"{action!r} ({type(action)}) invalid"
        assert self.action_space.contains(action), err_msg
        assert self.state is not None, "Call reset before using step method."
        x, x_dot, theta, theta_dot, direction_goal = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (
            force + self.polemass_length * theta_dot**2 * sintheta
        ) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
        )
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == "euler":
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot, direction_goal)

        done = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )

        if not done:
            reward = 1.0 + self.calculate_positional_reward(x, direction_goal)
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = 1.0 + self.calculate_positional_reward(x, direction_goal)
        else:
            if self.steps_beyond_done == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned done = True. You "
                    "should always call 'reset()' once you receive 'done = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_done += 1
            reward = 0.0

        return np.array(self.state, dtype=np.float32), reward, done, {}

    def reset(
        self,
    ):
        direction_goal = np.random.randint(2)
        self.state = np.append(
            self.np_random.uniform(low=-0.05, high=0.05, size=(4,)), 
            direction_goal
            )
        self.steps_beyond_done = None
        return np.array(self.state, dtype=np.float32)

#### Register CartPoleRightLeft as an Rllib environment

In [None]:
from ray import tune
from ray.rllib.models import ModelCatalog
from ray.tune.registry import register_env

env_name = 'CartPoleRightLeft'
register_env(env_name, lambda config: CartPoleRightLeft())

# Test CartPoleRightLeft and virtual display by performing random actions

In [None]:
# env = wrap_env(CartPoleRightLeft())
# env.reset()
# for _ in range(20):
#     action = env.action_space.sample()
#     observation, reward, done, info = env.step(action)

#     env.render()

#     # print(observation, reward, done)
#     if done:
#         env.reset()
# env.close()
# show_video()

# Training

We use the `ray.tune` API to train the agent with different algorithms. We try out two well-known algorithms which work well on continous state space and discrete action space. The different config variables for the two algorithms can be seen below.

We train the algorithms with limiting the training iterations and limiting the timesteps to compare the two algorithms efficiency as well as their performance.

#### PPO

https://docs.ray.io/en/releases-1.3.0/rllib-algorithms.html#proximal-policy-optimization-ppo

In [None]:
from ray import tune
from ray.rllib.agents.ppo import DEFAULT_CONFIG as PPO_DEFAULT

ppo_config = PPO_DEFAULT.copy()
ppo_config['framework'] = 'torch'
# ppo_config['evaluation_interval'] = 10
ppo_config['env'] = 'CartPoleRightLeft'
ppo_config['num_workers'] = 1
ppo_config['num_sgd_iter'] = tune.grid_search([15, 30, 45, 60])
ppo_config['sgd_minibatch_size'] = tune.grid_search([500])
ppo_config['model']['fcnet_hiddens'] = [50, 50]
ppo_config['evaluation_num_workers'] = 0
ppo_config['evaluation_config']['render_env'] = False
ppo_config['num_gpus'] = 0
ppo_config['evaluation_duration'] = 1000
ppo_config['evaluation_duration_unit'] = 'timesteps'
ppo_config['horizon'] = 1000

ppo_agent = "PPO"

#### DQN

https://docs.ray.io/en/releases-1.3.0/rllib-algorithms.html#deep-q-networks-dqn-rainbow-parametric-dqn

In [None]:
from ray.rllib.agents.dqn import DEFAULT_CONFIG as DQN_DEFAULT

dqn_config = DQN_DEFAULT.copy()
dqn_config['framework'] = 'torch'
# dqn_config['evaluation_interval'] = 10
dqn_config['env'] = 'CartPoleRightLeft'
dqn_config['num_workers'] = 1
dqn_config['model']['fcnet_hiddens'] = [50, 50]
dqn_config['evaluation_config']['render_env'] = False
dqn_config['lr'] = tune.grid_search([0.01, 0.001, 0.0001])
dqn_config['num_gpus'] = 0
dqn_config['evaluation_duration'] = 1000
dqn_config['evaluation_duration_unit'] = 'timesteps'
dqn_config['horizon'] = 1000

dqn_agent = "DQN"

#### Train with ray tune

Change the `trainer_config` and `trainer_algorithm` variables based on the algorithm you want to try out.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import gym
import ray
from ray import tune
from ray.tune.logger import pretty_print

# Select algorithm to use.
trainer_config = ppo_config
trainer_algorithm = ppo_agent

ray.shutdown()
ray.init(ignore_reinit_error=True)
register_env(env_name, lambda config: CartPoleRightLeft())

analysis = tune.run(
    trainer_algorithm,
    stop={
        # "episode_reward_mean": 500,
        # "training_iteration": 50,
        "timesteps_total": 50_000,
        # "episodes_total": 1000,
        },
    config=trainer_config,
    checkpoint_at_end=True
)

#### Train agent for `N_ITER` iterations manually. (Don't use) 

In [None]:
# N_ITER = 3
# for i in range(N_ITER):
#     result = agent.train()
#     print(result)

# Evaluation

For the evaluation we compare the performance of the algorithms in `500` timesteps. We analyze the `episode_len_mean` and the `episode_reward_mean`.

Based on the evaluations it seems that the PPO agent is performing better and more stable. However, not that the `episode_reward_max` is higher for the DQN. At the same time, the `episode_reward_min` is lower for the DQN algorithm. This points to the DQN algorithm being more unstable and trying to go towards the edge of the screen faster.

To reach more concrete conclusions we will need to perform more extensive evaluations and hyper-parameter tunings that were out of the scope of this task because of time constraints. We will also need to refine the definition of success for this task. In the current reward and evaluation setup we can't define an exact criteria for the success of the task but we can compare different methods together. A good criteria might be balancing the pole for `T` time steps in a small boundary around the maximum reward point.

Finally, in the current defined environment the agent still gets positive rewards if it balances the pole in the middle of the screen. It might be worth designing a more complex reward function that penalizes the agent for balancing in the middle. (However, this is also challenging as it might motivate the agent to just drop the pole to not get negative rewards.)

#### Rllib Evaluation

We can use the `analysis` object to get the best checkpoint of the trained agent based on different metrics. Here we use the `episode_len_mean`. Note that since we have changed the reward function, the total reward of the agent will be lower than the original environment even if the pole is balanced for the same number of timesteps.

#### Load agent from checkpoint

We use the ExperimentAnalysis class to access the best configs and checkpoints for each algorithm and in total.

In [None]:
import ray
from ray.tune.registry import register_env
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.agents.dqn import DQNTrainer

from ray.tune import ExperimentAnalysis
analysis = ExperimentAnalysis("~/ray_results/")

best_trial = trial=analysis.get_best_trial(metric="episode_reward_mean", mode="max")

best_config = analysis.get_best_config(metric="episode_reward_mean", mode="max")
best_config['evaluation_duration'] = 10000
best_config['evaluation_duration_unit'] = 'timesteps'
best_config['evaluation_num_workers'] = 1

best_checkpoint = analysis.get_best_checkpoint(
    metric="episode_reward_mean", mode="max", trial=best_trial
)

best_checkpoint

In [None]:

checkpoint_path = best_checkpoint

ray.shutdown()
ray.init(ignore_reinit_error=True)
register_env('CartPoleRightLeft', lambda config: CartPoleRightLeft())

# Load agent from checkpoint
agent_trainer = PPOTrainer

test_agent = agent_trainer(best_config, 'CartPoleRightLeft')
test_agent.restore(checkpoint_path)

#### Rllib Evaluation

In [None]:
test_agent.evaluate()

#### Run and render trained agent

In [None]:
# Run trained agent in environment for final evaluation

env = wrap_env(CartPoleRightLeft())
state = env.reset()
done = False
cumulative_reward = 0

print(state)
step_count = 0
while not done and step_count < 3000:
    action = test_agent.compute_action(state)
    state, reward, done, _ = env.step(action)
    step_count += 1

    env.render()
    cumulative_reward += reward

print(cumulative_reward)
env.close()
show_video()