# QLearning Examples

In [1]:
import gym  # Using OpenAI Gym to create our environments
import numpy as np
import math 
import random

In [2]:
# env = gym.make("CartPole-v0") 
env = gym.make("CartPole-v0", render_mode= "human") # Use this one to visualize

  logger.warn(


In [3]:
env.action_space.n # 2 actions - move cart left or move cart right

2

In [4]:
# 4 variables represent the current state: position of cart, velocity of cart, angle of pole, and rotation of pole
env.observation_space

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)

In [5]:
# Lower bounds of the 4 values that make up the observation space
env.observation_space.low

array([-4.8000002e+00, -3.4028235e+38, -4.1887903e-01, -3.4028235e+38],
      dtype=float32)

In [6]:
# Upper bounds of the 4 values that make up the observation space
env.observation_space.high

array([4.8000002e+00, 3.4028235e+38, 4.1887903e-01, 3.4028235e+38],
      dtype=float32)

In [7]:
# The state space is potentially infiniite. Discretize the state space so we can apply Q-learning to a bounded space
NUM_BUCKETS = (1, 1, 6, 3)
# Each of the four vairables that define our current state are discreitzed into buckets.
# The first variable represents the position of the cart. The cart poisiton can have two states, left or right. When we 
# reduce the number of states to 1, this means we are ignoring this variable completely in our state space. This is a 
# technique we can use to reduce the dimensionality of our Q-value computation.
# The second variable is the cart velocity. By reducing the the number of buckets for this variable to 1, we are ignoring 
# this state variable as well in our Q-value computation.
# We specified two 1s in our number of buckets. That means we've reduced our state space along two dimensions. This will make our learning much faster as our Q-table size is smaller. 
# The remaining two state variables represent the angular position of the pole w.r.t. to the vertical and the angular velocity of the pole. We are going to use 6 buckets, or 6 discrete varaibles, to represent the angular position of the pole and three buckets to represent its angular velocity. 

In [8]:
NUM_ACTIONS = env.action_space.n

In [9]:
STATE_BOUNDS = list(zip(env.observation_space.low, env.observation_space.high))

In [10]:
# Further limit the state space by redefining the bounds for cart velocity and pole angular velocity
# We now have a state space that is tractable to work with on our local machine
# NOTE: The cartpole environment already limits the state space for us. The episode ends when the pole is more than 15 degrees from the vertical or the cart moves more than 2.4 units from the center. 
STATE_BOUNDS[1] = [-0.5, 0.5] 
STATE_BOUNDS[3] = [-math.radians(50), math.radians(50)]

In [11]:
STATE_BOUNDS

[(-4.8, 4.8),
 [-0.5, 0.5],
 (-0.41887903, 0.41887903),
 [-0.8726646259971648, 0.8726646259971648]]

In [12]:
# num_states x num-actions = (1x1x6x3) x 2
q_table = np.zeros(NUM_BUCKETS + (NUM_ACTIONS,))

In [13]:
# First four dimensions represent the number of states and the last dimenstion represent the number of actions we can 
# take in each state
q_table.shape

(1, 1, 6, 3, 2)

In [14]:
q_table

array([[[[[0., 0.],
          [0., 0.],
          [0., 0.]],

         [[0., 0.],
          [0., 0.],
          [0., 0.]],

         [[0., 0.],
          [0., 0.],
          [0., 0.]],

         [[0., 0.],
          [0., 0.],
          [0., 0.]],

         [[0., 0.],
          [0., 0.],
          [0., 0.]],

         [[0., 0.],
          [0., 0.],
          [0., 0.]]]]])

In [15]:
# We initially want to explore the state space in order to fill up our q_table. 
EXPLORE_RATE_MIN = 0.01 # Exploration rate
LEARNING_RATE_MIN = 0.1 # Learning rate

In [16]:
# We are going to decay our exploration rate over time, but not too fast. We want to explore less as we're more sure of 
# getting the balance right. 
def get_explore_rate(t):
    return max(EXPLORE_RATE_MIN, min(1, 1.0 -  math.log10((t+1)/25)))

In [17]:
# We are going to decay the learning rate as well so that we don't miss any maximums. 
def get_learning_rate(t):
    return max(LEARNING_RATE_MIN, min(0.5, 1.0 - math.log10((t+1)/25)))

In [18]:
# Select the action that we want from the current state. This takes in the explore_rate as well.
def select_action(state, explore_rate):
    # Explore the sample space at random based on the explore_rate
    if random.random() < explore_rate: 
        action = env.action_space.sample()
    # Perform the action that gets us the state with the highest Q-value
    else:
        action = np.argmax(q_table[state])
    
    return action

In [19]:
# Takes in continuous state information and retuns a discreitzed, bucketized version.
def state_to_bucket(state):
    bucket_indices = []

    # Iterate through all four state variables one at a time
    for i in range(len(state)):
        # If state is less than the lower bounds, then set the state value to ve equal to that of smallest bucket, 0.
        if state[i] <= STATE_BOUNDS[i][0]:
            bucket_index = 0
        # If state is beyond the upper bounds, cap it to the largest bucket.
        elif state[i] >= STATE_BOUNDS[i][1]:
            bucket_index = NUM_BUCKETS[i] - 1
        # Use the bound width and the number of buckets to calculate which discrete bucket our continuous value falls in
        else: 
            bound_width = STATE_BOUNDS[i][1] - STATE_BOUNDS[i][0]

            offset = (NUM_BUCKETS[i] - 1) * STATE_BOUNDS[i][0] / bound_width
            scaling = (NUM_BUCKETS[i] - 1) / bound_width

            bucket_index = int(round(scaling * state[i] - offset))

        bucket_indices.append(bucket_index)

    return tuple(bucket_indices)


## Q-learning (temporal difference) Formula Recap
$Q^{new}(s_{k}, a_{k}) = Q^{old}(s_{k}, a_{k}) + \alpha(r_{k} + \gamma max_{a}Q(s_{k+1}, a) - Q^{old}(s_{k}, a_{k}))$

In [20]:
# We want to balance the pole for as long as possible
def simulate():

    # Get initial learning and explore rates
    learning_rate = get_learning_rate(0)
    explore_rate = get_explore_rate(0)

    discount_factor = 0.99 # Future rewards are almost as important as immediate rewards
    num_streaks = 0 # How long has the pole balanced on the cart? 200 time instances make one streak

    # An episode ends when the pole is not able to balance on the cart and it goes more than 15 degrees from the vertical 
    # or if the cart moves more than 2.4 units to either side. The episode also ends when the number of discrete time intervals are up. We'll have every episode run for a maximum of 250 time intervals. 
    for episode in range(1000):

        observ = env.reset()

        state_k = state_to_bucket(observ[0])

        for t in range(250):
            
            env.render()
            
            action_k = select_action(state_k, explore_rate)

            observ, reward_k, terminated, truncated, info = env.step(action_k) 

            state_kplus1 = state_to_bucket(observ) # Discretize the observations into buckets
            
            best_q = np.amax(q_table[state_kplus1]) # Best state action combination

            # Q-learning mathematical formula for updating Q-values
            q_table[state_k + (action_k,)] += learning_rate*(reward_k + discount_factor*(best_q) - q_table[state_k + (action_k,)])

            state_k = state_kplus1

            print("\nEpisode = %d" % episode)
            print("t = %d" % t)
            print("Action: %d" % action_k)
            print("State: %s" % str(state_kplus1))
            print("Reward: %f" % reward_k)
            print("Best Q: %f" % best_q)
            print("Explore rate: %f" % explore_rate)
            print("Learning rate: %f" % learning_rate)
            print("Streaks: %d" % num_streaks)

            print("")

            done = truncated or terminated

            if done:
                print("Episode %d finished after %f time steps" % (episode, t))

                if (t >= 199):
                    num_streaks += 1
                else:
                    num_streaks = 0
                break
        
        if num_streaks > 120:
            break

        explore_rate = get_explore_rate(episode)
        learning_rate =  get_explore_rate(episode)


In [21]:
simulate()

  if not isinstance(terminated, (bool, np.bool8)):



Episode = 0
t = 0
Action: 1
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 0.000000
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episode = 0
t = 1
Action: 0
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 0.500000
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episode = 0
t = 2
Action: 0
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 0.747500
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episode = 0
t = 3
Action: 1
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 1.243763
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episode = 0
t = 4
Action: 0
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 1.365662
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episode = 0
t = 5
Action: 1
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 1.797884
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episode = 0
t = 6
Action: 0
State: (0, 0, 2, 1)
Reward: 1.000000
Best Q: 2.072784
Explore rate: 1.000000
Learning rate: 0.500000
Streaks: 0


Episo

: 