In [54]:
import gym
import math
import time
import numpy as np
from IPython.display import Image
from sklearn.preprocessing import KBinsDiscretizer
from typing import Tuple
np.random.seed(0)

In [15]:
?env.env

[1;31mType:[0m        CartPoleEnv
[1;31mString form:[0m <CartPoleEnv<CartPole-v1>>
[1;31mFile:[0m        c:\users\taha\anaconda3\lib\site-packages\gym\envs\classic_control\cartpole.py
[1;31mDocstring:[0m  
Description:
    A pole is attached by an un-actuated joint to a cart, which moves along
    a frictionless track. The pendulum starts upright, and the goal is to
    prevent it from falling over by increasing and reducing the cart's
    velocity.

Source:
    This environment corresponds to the version of the cart-pole problem
    described by Barto, Sutton, and Anderson

Observation:
    Type: Box(4)
    Num     Observation               Min                     Max
    0       Cart Position             -4.8                    4.8
    1       Cart Velocity             -Inf                    Inf
    2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
    3       Pole Angular Velocity     -Inf                    Inf

Actions:
    Type: Discrete(2)
   

In this notebook all algorithms that I reference are from the textbook: Reinforcement Learning: An Introduction by Sutton and Barto.

https://github.com/openai/gym/wiki/CartPole-v0

This link has all the information regarding the specifics of the CartPole environment, such as the action space, how rewards are disitributed, and what it means to solve this problem.

Solved requirements: "Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials." 

![title](C:\Users\Taha\Documents\Work\RL Code & Practice\ql.png)

## Q-Learning

We will first attempt to solve this problem with the Q-learning algorithm, which is an off-policy temporal difference control algorithm.

In [140]:
# Create the environment with openai gym and set a seed to work in
env = gym.make('CartPole-v1')
env.seed(0)

[0]

In [174]:
state_space = (1, 1, 6 , 12)
lower_bounds = [env.observation_space.low[0], -0.5, env.observation_space.low[2], -math.radians(50)]
upper_bounds = [env.observation_space.high[0], 0.5, env.observation_space.high[2], math.radians(50)]
def discrete(s):
    tiles = []
    for i in range(len(s)):
        scaling = (s[i] + abs(lower_bounds[i])) / (upper_bounds[i] - lower_bounds[i])
        scaled_s = int(round((state_space[i] - 1) * scaling))
        scaled_s = min(state_space[i] - 1, max(0, scaled_s))
        tiles.append(scaled_s)
    return tuple(tiles)

In [194]:
state_space = (6 , 12)
lower_bounds = [env.observation_space.low[2], -math.radians(50)]
upper_bounds = [env.observation_space.high[2], math.radians(50)]

def discretizer(_, __, angle, pole_velocity):
    est = KBinsDiscretizer(n_bins=state_space, encode='ordinal', strategy='uniform')
    est.fit([lower_bounds, upper_bounds])
    return tuple(map(int,est.transform([[angle, pole_velocity]])[0]))

In [195]:
Q_table = np.zeros(state_space + (env.action_space.n,))
Q_table.shape

(6, 12, 2)

In [196]:
tic = time.time()
Q_table = np.zeros(state_space + (env.action_space.n,))
reward_list = []
total_reward = 0
success = False
for i in range(1, 10001):
    current, done = discretizer(*env.reset()), False
    while not done:
        lr = max(0.01, min(1.0, 1.0 - math.log10((i + 1) / 25)))
        epsilon =  max(0.05, min(1, 1.0 - math.log10((i  + 1) / 25)))
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(Q_table[current])
        observation, reward, done, info = env.step(action) 
        next_state = discretizer(*observation)
        Q_table[current][action] += lr*(reward + np.max(Q_table[next_state]) - Q_table[current][action])
        current = next_state
        total_reward += reward
#        if i % 100:
#            env.render()
    if i % 100 == 0:
        print(total_reward/100)
        reward_list.append(total_reward/100)
        if reward_list[-1] >= 195.0:
            toc = time.time()
            success = True
            print(f'Took {toc-tic:.2f} seconds and achieved average reward of {reward_list[-1]} in {(reward_list.index(reward_list[-1]) + 1)*100} episodes')
            env.close()
            break
        total_reward = 0
toc = time.time()
if not success:
    print(f'Took {toc-tic:.2f} seconds and failed')
    print(f'Max average reward achieved: {np.max(reward_list):.2f}')
env.close()
print(Q_table)

28.6
199.21
Took 8.62 seconds and achieved average reward of 199.21 in 200 episodes
[[[  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]
  [  0.           0.        ]]

 [[104.20008834 111.27741248]
  [ 71.48163902 106.87327771]
  [ 67.94355213 106.42540857]
  [ 41.62215394 100.44442793]
  [ 43.61451518  95.54375652]
  [ 24.56737402  79.58280282]
  [  0.83133755  80.05350206]
  [ 31.59545701  36.66123333]
  [ 47.23839918   0.        ]
  [ 23.01910275   0.        ]
  [ 34.14212814   2.43113237]
  [ 26.           0.        ]]

 [[303.19040285 183.04920616]
  [305.91212328 220.7093263 ]
  [325.84101826 244.74638333]
  [330.08185611 264.04146952]
  [328.81075064 303.85478055]
  [332.21646468 323.29891961]
  [328.09428

## Sarsa

In [2]:
env = gym.make('CartPole-v1')
env.seed(0)

[0]

In [46]:
tic = time.time()
reward_list = []
Q_table = np.zeros(n_bins + (env.action_space.n,))
policy = np.zeros(n_bins)
total_reward = 0
for i in range(1, 10001):
    current, done = test(env.reset()), False
    while not done:
        lr = max(0.01, min(1.0, 1.0 - math.log10((i + 1) / 25)))
        epsilon =  max(0.05, min(1, 1.0 - math.log10((i + 1) / 25)))
        action = int(policy[current])
        observation, reward, done, info = env.step(action)
        next_state = test(observation)
        Q_table[current][action] += lr * (reward + Q_table[next_state][int(policy[next_state])] - Q_table[current][action])
        current = next_state
        if np.random.random() < epsilon:
            policy[current] = env.action_space.sample()
        else:
            policy[current] = np.argmax(Q_table[current])
        total_reward += reward
    if i % 100 == 0:
        reward_list.append(total_reward/100)
        print(reward_list[-1])
        if reward_list[-1] >= 195.0:
            toc = time.time()
            print(f'Took {toc-tic:.2f} seconds and achieved average reward of {reward_list[-1]} in {(reward_list.index(reward_list[-1]))*100} episodes')
        total_reward = 0
toc = time.time()
if not success:
    print(f'Took {toc-tic:.2f} seconds and failed')
    print(f'Max average reward achieved: {np.max(reward_list):.2f}')
env.close()

32.57
40.56
48.98
59.25
60.62
55.46
63.61
55.73
64.43
59.26
55.03
61.05
58.78
61.38
59.42
62.46
58.86
66.6
60.44
54.8
59.68
52.68
55.56
65.77
55.28
60.1
66.38
62.31
61.0
55.61
53.92
55.56
58.76
66.56
58.04
67.33
56.27


KeyboardInterrupt: 

## Double Q Learning

In [93]:
tic = time.time()
Q_table_1 = np.zeros(n_bins + (env.action_space.n,))
Q_table_2 = np.zeros(n_bins + (env.action_space.n,))
reward_list = []
total_reward = 0
success = False
for i in range(1, 10001):
    current, done = discretizer(*env.reset()), False
    while not done:
        lr = max(0.01, min(1.0, 1.0 - math.log10((i + 1) / 25)))
        epsilon =  max(0.05, min(1, 1.0 - math.log10((i  + 1) / 25)))
        action = np.argmax(Q_table_1[current] + Q_table_2[current])
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        observation, reward, done, info = env.step(action) 
        next_state = discretizer(*observation)
        if np.random.random() <= 0.5:
            Q_table_1[current][action] += lr*(reward + Q_table_2[next_state][np.argmax(Q_table_1[next_state])] - Q_table[current][action])
        else:
            Q_table_2[current][action] += lr*(reward + Q_table_1[next_state][np.argmax(Q_table_2[next_state])] - Q_table[current][action])
        current = next_state
        total_reward += reward
#        if i % 100:
#            env.render()
    if i % 100 == 0:
#        print(total_reward/100)
        reward_list.append(total_reward/100)
        if reward_list[-1] >= 195.0:
            toc = time.time()
            success = True
            print(f'Took {toc-tic:.2f} seconds and achieved average reward of {reward_list[-1]} in {(reward_list.index(reward_list[-1]) + 1)*100} episodes')
            env.close()
            break
        total_reward = 0
toc = time.time()
if not success:
    print(f'Took {toc-tic:.2f} seconds and failed')
    print(f'Max average reward achieved: {np.max(reward_list):.2f}')
env.close()

Took 173.85 seconds and failed
Max average reward achieved: 46.95


In [237]:
est = KBinsDiscretizer(n_bins=(3,10), encode='ordinal', strategy='uniform')
est.fit([[2, 9], [6,10]])
t = tuple(map(int,est.transform([[6,9]])[0]))

In [238]:
t

(2, 0)