<a href="https://colab.research.google.com/github/isaacchunn/cartpole-balancing/blob/main/cartpole_balancing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Import progress bar and other dependencies
!pip install tqdm
!pip3 install ipywidgets --user

Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets)
  Downloading jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jedi
Successfully installed jedi-0.19.1


In [2]:
import gym
from gym import logger as gymlogger
from gym.wrappers import RecordVideo
gymlogger.set_level(40) #error only
import tensorflow as tf
import numpy as np
import random
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math
import glob
import io
import base64
from IPython.display import HTML
from IPython import display as ipythondisplay
from tqdm.notebook import trange


# Render Video Function
---

In [3]:
def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else:
    print("Could not find video")

  and should_run_async(code)


# Loading CartPole Environment
---

In [4]:
env = gym.make("CartPole-v1")

In [5]:
#Print our observation space
print("Observation Space", env.observation_space)
print("Sample Observation", env.observation_space.sample()) #Display a random observation

Observation Space Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
Sample Observation [-1.7094618e+00 -1.5777194e+38 -3.1643775e-01  1.3519563e+38]


In [6]:
#Print action space
print("Action Space Shape", env.action_space.n)
print("Action Space Sample", env.action_space.sample())

Action Space Shape 2
Action Space Sample 1


## Initializing Hyperameters
---


Definition of each hyperparameters :
1. learning_rate =
2. discount =
3. epsilon =
4. num_episodes =
5.

## Initializing Q-Table
---



In [7]:
#state_space = env.observation_space.shape
#print("There are ", state_space, " possible states")

#action_space = env.action_space.n
#print("There are ", action_space, " possible actions")

In [8]:
#Training parameters
learning_rate = 0.2
gamma = 0.95
num_episodes = 50000

#Environment parameters
observation_size = [40,40,50,50]
np_array_win_size = np.array([0.25, 0.25, 0.05, 0.5])

#Exploration parameters
max_epsilon = 1.0
min_epsilon = 0.05
epsilon_decay_rate = 0.0005

rewardArr = []

In [9]:
q_table = np.zeros((observation_size + [env.action_space.n]))
q_table.shape

(40, 40, 50, 50, 2)

## Discretize the state space
---

In [10]:
def discretize_state(state):
    discrete_state = state/np_array_win_size + np.array([15,10,1,10])
    return tuple(discrete_state.astype(int))

In [11]:
print("Action Space Shape : ", env.action_space.n)
print("Observation Space : ", env.observation_space)

Action Space Shape :  2
Observation Space :  Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)


In [12]:
#Epsilon greedy policy (acting policy)
def epsilon_greedy_policy(q_table, state, epsilon):
    random_d = np.random.random()
    if(random_d > epsilon):
        action = np.argmax(q_table[state])
    else:
        action = env.action_space.sample()
    return action

## Task 1 - Training the Agent
---


In [13]:

def train_agent(env, num_episodes, learning_rate, gamma, max_epsilon, epsilon_decay_rate, q_table):
  total_reward = 0

  for episode in trange(num_episodes):
    #Reset the environment
    state = env.reset()
    #Find out the curr discrete state
    curr_discrete_state = discretize_state(state[0])
    #Calculate the new epsilon
    epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-epsilon_decay_rate * episode)

    done = False
    episode_reward = 0
    episode_length = 0

    if episode % 2500 == 0:
      print(f"Episode Count : {episode}")

    #Run till all the episodes are complete
    while not done:
        #Determine our policy based on epsilon greedy
        action = epsilon_greedy_policy(q_table, curr_discrete_state, epsilon)

        #Take the chosen action and observe the new state
        new_state, reward, done, info = env.step(action)
        episode_length += 1
        episode_reward += reward
        #Find the new discrete state given the new state calculated from action
        new_discrete_state = discretize_state(new_state)

        #Setup some rewards
        if done:
          if episode_reward >= 195:
            reward = 1
          else:
            reward = -1
        else:
          reward = 0.1

        #Update our q table based on the formula
        q_table[curr_discrete_state + (action,)] += learning_rate * (reward + gamma * np.max(q_table[new_discrete_state]) - q_table[curr_discrete_state + (action,)])

        if done:
          break
        #update the current state to the new state
        curr_discrete_state = new_discrete_state

    total_reward += episode_reward
    rewardArr.append(episode_reward)

    if episode % 2500 == 0:
      print(f"Epsilon : {epsilon}")
      mean_reward = total_reward / 2500
      total_reward = 0
      print(f"Mean Reward : {mean_reward}")

  return q_table, curr_discrete_state, new_discrete_state


In [None]:
#Train the table
q_table_cart, cd, nd = train_agent(env, num_episodes, learning_rate, gamma, max_epsilon, epsilon_decay_rate, q_table)

#q_table_cart
cd

  0%|          | 0/50000 [00:00<?, ?it/s]

Episode Count : 0
Epsilon : 1.0
Mean Reward : 0.0192


  if not isinstance(terminated, (bool, np.bool8)):


Episode Count : 2500
Epsilon : 0.32217955701718054
Mean Reward : 53.3648
Episode Count : 5000
Epsilon : 0.12798074869270387
Mean Reward : 114.4224
Episode Count : 7500
Epsilon : 0.07234185856320866
Mean Reward : 130.8548
Episode Count : 10000
Epsilon : 0.056401049649131195
Mean Reward : 134.4096
Episode Count : 12500
Epsilon : 0.05183393142941633
Mean Reward : 140.6324
Episode Count : 15000
Epsilon : 0.050525430151640446
Mean Reward : 133.3796


In [None]:
q_table_cart[(15, 10, -3, 8)]

In [None]:
env = RecordVideo(gym.make("CartPole-v1", render_mode="human"), "./video")
observation = env.reset()
discrete_state = discretize_state(observation[0])
while not done:
    env.render()
    #your agent goes here
    action = np.argmax(q_table[discrete_state])
    new_state, reward, done, info = env.step(action)
    discrete_state = discretize_state(new_state)
    if done:
      observation = env.reset()
      discrete_state = discretize_state(observation[0])
    print(done)
show_video()


## Task 2 - Effectiveness of Agent
---

**Episode End**
1. The episode ends if any one of the following occurs:
2. Termination: Pole Angle is greater than ±12°
3. Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
4. Truncation: Episode length is greater than 500.


In [None]:

def evaluate_agent(env, q_table, n_eval_episodes = 100, max_steps = 500):
  episode_rewards = []
  for episode in trange(n_eval_episodes):
    #Reset the environment
    state = env.reset()
    #Find out the curr discrete state
    curr_discrete_state = discretize_state(state[0])
    done = False
    total_rewards_ep = 0

    for step in range(max_steps) :
      action = np.argmax(q_table[curr_discrete_state])
      #Take the chosen action and observe the new state
      new_state, reward, done, info = env.step(action)
      total_rewards_ep += reward

      if done:
        break

      #update the current state to the new state
      curr_discrete_state = discretize_state(new_state)

    #append the total rewards / episode to episode_rewards array
    episode_rewards.append(total_rewards_ep)

  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)
  env.close()
  return mean_reward, std_reward


In [None]:
mean_reward, std_reward = evaluate_agent(env, q_table)


In [None]:
print(mean_reward,std_reward)

In [None]:
#Set the initial value of episode_reward to 0
episode_reward = 0

#Creating instance of RecordVideo
env = RecordVideo(gym.make("CartPole-v1", render_mode="human"),"./video")

#Reset env and obtain initial observation and discrete state
observation = env.reset()
discrete_state = discretize_state(observation[0])

done = False
while not done:
    env.render()

    #Determining action to take based on q_table values and select action with highest value
    action = np.argmax(q_table[discrete_state])

    #take chosen action in env and obtain new state, reward, and done flag
    new_env = env.step(action)
    new_state = new_env[0]
    reward = new_env[1]
    done = new_env[2]

    #Add reward for the current step to the episode reward variable
    episode_reward += reward

    #Convert new state to its discrete equivalent and update discrete_state
    new_discrete_state = discretize_state(new_state)
    discrete_state = new_discrete_state

    #if episode is complete, reset env and obtain initial observation and discrete state
    if done:
        observation = env.reset()
        discrete_state = discretize_state(observation[0])

env.close()
show_video()
print(episode_reward)

In [None]:
# def train_agent(env, NUM_EPISODES, LEARNING_RATE, DISCOUNT, max_epsilon, epsilon_decay_rate, q_table):
#   total_reward = 0
#   rewardsArr = []

#   for episode in range(NUM_EPISODES):
#     #reset the environment and initialize the state
#     discrete_state = discretize_state(env.reset()[0])
#     done = False
#     episode_reward = 0

#     #Run all the episodes until done flag is set to True
#     while not done:

#         #Using Q-Table, we choose the best action to take for current state
#         action = np.argmax(q_table[discrete_state])

#         #obtain the new state, reward and done flag after taking the best action
#         updated_env = env.step(action)
#         updated_state, reward, done = updated_env
#         episode_reward += reward

#         #We truncate the training if episodes length is greater than 500
#         if episode_reward > 500:
#           done = True

#         #discretize the new state and update discrete_state
#         discrete_state = discretize_state(updated_state)

#         #if episode is completed, we reset the environment and reinitialize the observation and discrete state
#         if done:
#           observation = env.reset()
#           discrete_state = discretize_state(observation[0])

#     rewardsArr.append(episode_reward)

#     env.close()
#     return q_table, rewardArr

# q_table, rewards = train_agent(env, NUM_EPISODES, LEARNING_RATE, DISCOUNT, max_epsilon, epsilon_decay_rate, q_table)