In [1]:
import gym
import random
import math
import numpy as np

# CartPole

In [2]:
env = gym.make('CartPole-v0')

#### posible action : 
- left , right

In [3]:
env.action_space.n

2

#### Enviroment space : 
- position of cart
- velocity of cart
- angle of the pole to the verticle
- angular velocity in which the pole is moving

> low : give lower bounds of 4 values that make up the observation space (individual state variable cannot have values below these)

> high : give upper bounds of 4 values that make up the observation space

In [9]:
env.observation_space.low

array([-4.8000002e+00, -3.4028235e+38, -4.1887903e-01, -3.4028235e+38],
      dtype=float32)

## Discretize the state space :
> so we can apply Q_Learning to bounded space 
> (this is a technique to reduce dimentionality Q_value computation)

- position of cart (Left , Right)
> 1: ignoring this variable completely in our state space
- velocity of cart
> 1: ignoring this variable completely in our state space
- angle of the pole to the verticle
- angular velocity in which the pole is moving

In [11]:
## two first 1 refer to ignoring 2 dimention of state space
## 6 bucket refer to 6 discrete intervals
NUM_BUCKETS  = ( 1 , 1 , 6 , 3 )

In [12]:
NUM_ACTION   = env.action_space.n 

In [14]:
STATE_BOUNDS = list(zip(env.observation_space.low,env.observation_space.high))

### redefine some of bounds in order to further limit our state space

In [18]:
STATE_BOUNDS[1] = [-.5 , .5]
STATE_BOUNDS[3] = [-math.radians(50) , math.radians(50)]

STATE_BOUNDS

[(-4.8, 4.8),
 [-0.5, 0.5],
 (-0.41887903, 0.41887903),
 [-0.8726646259971648, 0.8726646259971648]]

### initialize Q_Table :

#### num_states × num_actions = (1,1,3,6)×2

In [27]:
q_table = np.zeros(NUM_BUCKETS+(NUM_ACTION,))
print("shape of Q_Table : {}\n".format(q_table.shape))
print(q_table)

### first 4 dimention refer to states
### last dimention refer to actions for each state

shape of Q_Table : (1, 1, 6, 3, 2)

[[[[[0. 0.]
    [0. 0.]
    [0. 0.]]

   [[0. 0.]
    [0. 0.]
    [0. 0.]]

   [[0. 0.]
    [0. 0.]
    [0. 0.]]

   [[0. 0.]
    [0. 0.]
    [0. 0.]]

   [[0. 0.]
    [0. 0.]
    [0. 0.]]

   [[0. 0.]
    [0. 0.]
    [0. 0.]]]]]


### we initially want to explore state space in order to fill up the Q-value in Q_Table
- we use an explanatory rate of 0

In [28]:
EXPLORATION_RATE_MIN = 0.01

In [29]:
LEARNING_RATE_MIN =  0.1

### helper func. to decay exploration rate  &  learnin rate over time
- not too fast
- we dont miss any maximums

In [31]:
def get_explore_rete(t):
    return max(EXPLORATION_RATE_MIN,min(1,1.0-math.log10((t+1)/25)))

In [32]:
def get_learning_rete(t):
    return max(LEARNING_RATE_MIN,min(0.5,1.0-math.log10((t+1)/25)))

- explore the sample space at random based on the explore_rate
- or we can choose to stick with the known and perform that action that get us to the highest Q_Value

In [33]:
def select_action_function(state,explore_rate):
    if random.random() < explore_rate :
        action = env.action_space.sample()
    else :
        action = np.argmax(q_table[state])
        
    return action

## Discritize :
- helper func. input   : continuous state
- helper func. output : discritized , bucketized version

##### iterate through all 4 state variables one at a time

- if the state is beyond the lower bounds , set it to the smallest bucket
- if the state is beyond the upper bounds , set it to the largest bucket

In [38]:
def state_to_bucket(state):
    bucket_indices       = []
    
    for i in range(len(state)):
        ### less than lower bound
        if state[i]     <= STATE_BOUNDS[i][0]:
            bucket_index = 0
        
        ### more than upper bound
        elif state[i]   >= STATE_BOUNDS[i][1]:
            bucket_index = NUM_BUCKETS[i]-1
            
        else :
            bound_width  = STATE_BOUNDS[i][1] - STATE_BOUNDS[i][0]
            
            offset       = (NUM_BUCKETS[i]-1) * STATE_BOUNDS[i][0] / bound_width
            scaling      = (NUM_BUCKETS[i]-1) / bound_width
            
            bucket_index = int(round(scaling*state[i]-offset))
            
        bucket_indices.append(bucket_index)
        
    return tuple(bucket_indices)

### An episode ends when : 
- If the pole is not able to balance on the cart (it goes more than 15 degrees from the vertical)
- If the cart moves more than 2.4 units on either side 
- If when the number of discrete time intervals are up

##### we'll have every episode run for a maximum of 250 time intervals
- render enviroment for each instance

In [46]:
def simulation():
    ### remember these values be large initially and decay over time
    learning_rate   = get_learning_rete(0)
    explore_rate    = get_explore_rete(0)
    
    ### gamma : future rewards are almost as important as immediate rewards
    discount_factor = 0.99
    ### how long we can balance the pole on the cart (for inst. 200 time => streak:1)
    num_streaks     = 0
    
    for episod in range(1000):
        observ      = env.reset()
        
        ### discretize observations (states)
        state_0     = state_to_bucket(observ)
        
        for t in range(250):
            env.render()
            
            action  = select_action_function(state_0,explore_rate)
            
            observ , reward , done , _ = env.step(action)
            
            ### each time slot the observation into discrete states(buckets) 
            state   = state_to_bucket(observ)
            
            ### best state-action combination 
            best_q  = np.max(q_table(state))
            
            ### use Temporal difference method
            q_table[state_0 + (action,)] += learning_rate * (reward+discount_factor*(best_q)-q_table[state_0+(action,)])
            
            state_0 = state
            
            print("\nEpisode = %d" % episod)
            print("t = %d" % t)
            print("Action: %d" % action)
            print("State: %s" % str(state))
            print("Reward: %f" % reward)
            print("Best Q: %f" % best_q)
            print("Explore rate: %f" % explore_rate)
            print("Learning rate %f" % learning_rate)
            print("Streaks: %d" % num_streaks)
            
            print("")
            
            if done:
                print("Episode %d finished after %f time steps" % (episod,t))
                
                ### balance the pole on the cart for a fairly long time (one streak)
                if (t>=199):
                    num_streaks += 1
                    
                else:
                    num_streaks  = 0
                    
                break
        if num_streaks > 120:
            break
                
        explore_rate  = get_explore_rete(episod)
        learning_rate = get_learning_rete(episod)

In [42]:
simulation()

-1.3979400086720375