# Reinforcement Q-learning Cartpole Discretised
The following code uses the q-table learning on a continous state space of the cartpole environment by using an observation wrapper. This discretises the 4-dimensional continuous state-space into dicretised space without losing any information. The objective is to balance the cartpole for as long as possible. 

In [2]:
# import packages
import gym
import numpy as np
import random 

In [3]:
# Create the taxi environment 
env = gym.make("CartPole-v0")

In [4]:
print(env.observation_space)
print(env.action_space)

Box(4,)
Discrete(2)


In [5]:
print(env.observation_space.high)
print(env.observation_space.low)

[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]


The observation is is a 4-dimentional continuous space of (cart position, cart velocity, pole angle, pole velocity at tip) and the actions are (0,1) to move the cart left and right. To allow the use of a q-table, it is required to conver this continous observation space into discretised space. 

In [5]:
class DiscretizedObservationWrapper(gym.ObservationWrapper):
    """This wrapper converts a Box observation into a single integer.
    """
    def __init__(self, env, n_bins=10, low=None, high=None):
        super().__init__(env)
        assert isinstance(env.observation_space, gym.spaces.Box)

        low = self.observation_space.low if low is None else low
        high = self.observation_space.high if high is None else high

        self.n_bins = n_bins
        self.val_bins = [np.linspace(l, h, n_bins + 1) for l, h in
                         zip(low.flatten(), high.flatten())]
        self.observation_space = gym.spaces.Discrete((n_bins + 2) ** (low.flatten().shape[0]))

    def _convert_to_one_number(self, digits):
        return sum([d * ((self.n_bins + 1) ** i) for i, d in enumerate(digits)])

    def observation(self, observation):
        digits = [np.digitize([x], bins)[0]
                  for x, bins in zip(observation.flatten(), self.val_bins)]
        return self._convert_to_one_number(digits)

# create the new environment
env = DiscretizedObservationWrapper(
    env, 
    n_bins=8, 
    low=np.array([-2.4, -2.0, -0.42, -3.5]), 
    high=np.array([2.4, 2.0, 0.42, 3.5])
)

In [7]:
# check the new action space and observation space
print(env.action_space)
print(env.observation_space)

Discrete(2)
Discrete(10000)


In [10]:
# create an empty table of zeros 
q_table = np.zeros([env.observation_space.n, env.action_space.n])
print('The q-table is of size (observation_space_size, action_space_size): ',(len(q_table), len(q_table[0])))

The q-table is of size (observation_space_size, action_space_size):  (10000, 2)


In [11]:
# Hyperparameters
alpha = 0.5
gamma = 0.99
epsilon = 0.1

In [12]:
# initialise reward accumulation
avg_rewards = 0
N = 0

In [17]:
# Loop for 15000 of episodes to train
for i in range(1,1001):
    
    # reset environment 
    state = env.reset()
    done = False
    total_rewards = 0

    
    # loop until the agent exits the environment (terminal point/drop off passenger)
    while not done:
        
        # epsilon greedy explore and exploit
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state])
        
        # take the action
        next_state, reward, done, info = env.step(action)
        max_q_next = np.max(q_table[next_state])
        
        # update the q-value for state
        q_table[state, action] = (1-alpha)*q_table[state,action] + alpha*(reward + gamma*max_q_next)
        
        # set the current state as next state
        state = next_state
        
        # accumulate rewards per episode
        total_rewards += reward
        
    # averaging total rewards
    N += 1
    avg_rewards = avg_rewards + 1.0/(N) * (total_rewards - avg_rewards)
        
    # print out some average rewards every 1000 epsiodes
    if (i % 1000) == 0:
        print("Run: " + str(i) + " average score of past 1000 episodes: " + str(avg_rewards))
        N = 0
        avg_rewards = 0

print('Done Training!')   

Run: 1000 average score of past 1000 episodes: 112.78200000000001
Done Training!


In [18]:
# function to test the result of the q_table for n_episodes
def test(q_table, n_episodes):
    
    # store average rewards
    avg_rewards = 0
    
    for i in range(1, n_episodes+1):

        state = env.reset()
        done = False 
        total_rewards = 0
        
        # until done 
        while not done:
            
            # take an action in the max q_table
            action = np.argmax(q_table[state])
            state, reward, done, info = env.step(action)
            
            # acculmulate rewards
            total_rewards += reward
        
        avg_rewards = avg_rewards + 1/(i) * (total_rewards - avg_rewards)

    print("After " + str(n_episodes) + " the average score is " + str(avg_rewards))
            
# run test for 5000 episodes
test(q_table, 5000)

After 5000 the average score is 182.21959999999933
