# Cartpole Q Learning

Docs:  
https://github.com/openai/gym/wiki/CartPole-v0

In [1]:
import gym
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import math

# Play around with the environment

In [3]:
import time

env = gym.make('CartPole-v0')
for i_episode in range(1):
    observation = env.reset()
    for t in range(100):
        env.render()
        print(observation)
        action = env.action_space.sample()
        print(action)
        print(time.sleep(0.1))
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t+1))
            break
env.close()

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[ 0.04849368 -0.00552053 -0.0049169  -0.02934312]
0
None
[ 0.04838327 -0.20057162 -0.00550376  0.26178443]
1
None
[ 0.04437184 -0.00537154 -0.00026807 -0.03262936]
1
None
[ 0.04426441  0.18975425 -0.00092066 -0.32539685]
0
None
[ 0.0480595  -0.00535458 -0.0074286  -0.0330044 ]
1
None
[ 0.0479524   0.18987311 -0.00808868 -0.32802184]
1
None
[ 0.05174987  0.38510928 -0.01464912 -0.62324456]
1
None
[ 0.05945205  0.58043267 -0.02711401 -0.9205049 ]
1
None
[ 0.07106071  0.77591036 -0.04552411 -1.2215842 ]
1
None
[ 0.08657891  0.97158839 -0.06995579 -1.52817661]
1
None
[ 0.10601068  1.16748105 -0.10051933 -1.84184713]
1
None
[ 0.1293603   1.3635583  -0.13735627 -2.16398131]
0
None
[ 0.15663147  1.1700215  -0.18063589 -1.91666624]
0
None
Episode finished after 13 timesteps


In [None]:
print(env.action_space)
print(env.observation_space)
print(env.observation_space.high)
print(env.observation_space.low)

# Discretisation

In [29]:
x = np.array([-0.1, 0.1, 2.4, 3.0, 4.6])
bins = np.array([0.0, 1.0, 2.0, 3.0, 4.0])
out = np.digitize(x, bins)
print("bins:", bins)
print("x:", x)
print("out:", out)

bins: [0. 1. 2. 3. 4.]
x: [-0.1  0.1  2.4  3.   4.6]
out: [0 1 3 4 5]


# The Tools

In [5]:
class Epsilon(object):
    def __init__(self, start=1.0, end=0.01, update_increment=0.01):
        self.start = start
        self.end = end
        self.update_increment = update_increment
        self._value = self.start
        self.isTraining = True
    
    def increment(self, count=1):
        self._value = max(self.end, self._value - self.update_increment*count)
        return self
        
    def value(self):
        if not self.isTraining:
            return 0.0
        else:
            return self._value
"""
Instantiate object with epsilon starting at 1.0 (100% exploration), final value 0.01 (1% exploration), 
each time we call increment it'll go down by 0.01. 
If eps.isTraining is set to True then it'll return 0.0 (zero exploration)
"""
eps = Epsilon(start=1.0, end=0.01, update_increment=0.01)
print(eps.value())
print("Incrementing 3 times")
print(eps.increment().value())
print(eps.increment().value())
print(eps.increment().value())
print("Increment 99 times and the lowest it goes to is 0.01")
print(eps.increment(99).value())
print("Set training = False")
eps.isTraining = False
print(eps.increment().value())
print("Set training = True")
eps.isTraining = True
print(eps.increment().value())

1.0
Incrementing 3 times
0.99
0.98
0.97
Increment 99 times and the lowest it goes to is 0.01
0.01
Set training = False
0.0
Set training = True
0.01


In [6]:
class QTable():
    def __init__(self, num_actions=4):
        self.num_actions = num_actions
        self.Q = {}
    
    """Q(s, a): get the Q value of (s, a) pair"""
    def get_Q(self, s, a):
        self._check(s, a)
        return self.Q[s][a]
    
    def _check(self, s, a):
        if not s in self.Q:
            self.Q[s] = [0]*self.num_actions
    
    """max Q(s): get the max of all Q value of state s"""
    def get_max_Q(self, s):
        self._check(s, 0)
        return np.max(self.Q[s])
    
    """Q(s, a) = q: update the q value of (s, a) pair"""
    def set_Q(self, s, a, q):
        self._check(s, a)
        self.Q[s][a] = q
    
    """argmax_a Q(s, a): get the action which has the highest Q in state s"""
    def get_max_a_for_Q(self, s):
        self._check(s, 0)
        return np.argmax(self.Q[s])
    
    def __str__(self):
        output = []
        for s in self.Q:
            output.append(s.__str__() + ": " + self.Q[s].__str__())
        output.sort()
        return "\n".join(output)

# Exercise: Q Learner for Cartpole
### Suggested progression
- Get agent training loop working with random action
- Convert env state space into discretised states
- Get agent action selection to use epsilon-greedy
- Get agent to store and update Q values

In [11]:
class Agent():
    def __init__(self):
        self.env = gym.make('CartPole-v0')
        self.Q = QTable(num_actions=2)
        self.epsilon = Epsilon(start=1.0, end=0.05, update_increment=0.002)
        
    def getAction(self, s):
        action = self.env.action_space.sample()
        return action
    
    def train(self, episodes=100):
        pass
                
    
    def run(self):
        self.env = gym.make('CartPole-v0')
        self.epsilon.isTraining = False
        s = self.env.reset()
        steps = 0
        while True:
            self.env.render()
            action = self.getAction(s)
            s_1, reward, done, info = self.env.step(action)
            steps += 1
            if done:
                print("Episode finished successfully after {} timesteps".format(steps))
                break
        self.env.close()

agent = Agent()
# agent.train(episodes=2)
agent.run()



[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m
Episode finished successfully after 22 timesteps


# Solution

In [None]:
class Agent():
    def __init__(self):
        self.env = gym.make('CartPole-v0')
        self.alpha = 0.1
        self.gamma = 0.99
        self.Q = QTable(num_actions=2)
        self.epsilon = Epsilon(start=1.0, end=0.05, update_increment=0.002)
        
        # get initial state, divide continuous states into discrete bins
#         self.bins = [np.linspace(env.observation_space.low[i], env.observation_space.high[i], 7) for i in range(4)]
        self.bins = []
        self.bins.append(np.linspace(-2.4, 2.4, 5))
        self.bins.append(np.linspace(-0.5, 0.5, 5))
        self.bins.append(np.linspace(-41.8, 41.8, 5))
        self.bins.append(np.linspace(-math.radians(50), math.radians(50), 5))
        
        self.episode_durations = []
        
    def get_state(self, s):
        return tuple([np.asscalar(np.digitize(s[i], self.bins[i])) for i in range(4)])
    
    def getAction(self, s):
        if np.random.rand() >= self.epsilon.value():
            action = self.Q.get_max_a_for_Q(s)
        else:
            action = self.env.action_space.sample()
        self.epsilon.increment(1)
        return action
    
    def train(self, episodes=100):
        self.epsilon.isTraining = True
        # run for 100 episodes:"
        for i in range(episodes):
            s = self.get_state(self.env.reset())
            steps = 0
            while True:
                action = self.getAction(s)
                
                s_1, reward, done, info = self.env.step(action)
                s_1 = self.get_state(s_1)
                
                q = self.Q.get_Q(s, action)
                max_q_s_1 = self.Q.get_max_Q(s_1)
                if done and steps < 199:
                    max_q_s_1 = -100
                
                q = q + self.alpha * (reward + self.gamma * max_q_s_1 - q)
                self.Q.set_Q(s, action, q)
                s = s_1
                
                steps += 1
                if done:
#                     print("Training episode finished after {} timesteps".format(steps))
                    break
            self.episode_durations.append(steps)
#             self.epsilon.increment(1)
                
    
    def run(self):
        self.env = gym.make('CartPole-v0')
        self.epsilon.isTraining = False
        s = self.get_state(self.env.reset())
        steps = 0
        while True:
            self.env.render()
            action = self.getAction(s)
            s_1, reward, done, info = self.env.step(action)
            s_1 = self.get_state(s_1)
            s = s_1
            steps += 1
            if done:
                print("Episode finished successfully after {} timesteps".format(steps))
                break
        self.env.close()

agent = Agent()
agent.train(episodes=2)
# agent.run()
plt.figure(2)
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(agent.episode_durations)


In [None]:
for i in range(1):
    agent.run()