In [1]:
import gym
import matplotlib.pyplot as plt
import numpy as np
import sys
import random
import math 

from gym import spaces, logger
from gym.utils import seeding

In [2]:
class CustomCartPoleEnv(gym.Env):
    """
    Description:
        A pole is attached by an un-actuated joint to a cart, which moves along
        a frictionless track. The pendulum starts upright, and the goal is to
        prevent it from falling over by increasing and reducing the cart's
        velocity.
        
        This is just cut and pasted from the OpenAI Gym CartPole-v1 environment,
        with a simple modification to "sparsify" the reward structure (see Reward
        section below).

    Source:
        This environment corresponds to the version of the cart-pole problem
        described by Barto, Sutton, and Anderson

    Observation:
        Type: Box(4)
        Num     Observation               Min                     Max
        0       Cart Position             -4.8                    4.8
        1       Cart Velocity             -Inf                    Inf
        2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
        3       Pole Angular Velocity     -Inf                    Inf

    Actions:
        Type: Discrete(2)
        Num   Action
        0     Push cart to the left
        1     Push cart to the right

        Note: The amount the velocity that is reduced or increased is not
        fixed; it depends on the angle the pole is pointing. This is because
        the center of gravity of the pole increases the amount of energy needed
        to move the cart underneath it

    Reward:
        Reward is 0 for every step taken, -1 if it falls over

    Starting State:
        All observations are assigned a uniform random value in [-0.05..0.05]

    Episode Termination:
        Pole Angle is more than 12 degrees.
        Cart Position is more than 2.4 (center of the cart reaches the edge of
        the display).
        Episode length is greater than 200.
        Solved Requirements:
        Considered solved when the average return is greater than or equal to
        195.0 over 100 consecutive trials.
    """

    metadata = {
        'render.modes': ['human', 'rgb_array'],
        'video.frames_per_second': 50
    }

    def __init__(self):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = (self.masspole + self.masscart)
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = (self.masspole * self.length)
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = 'euler'

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array([self.x_threshold * 2,
                         np.finfo(np.float32).max,
                         self.theta_threshold_radians * 2,
                         np.finfo(np.float32).max],
                        dtype=np.float32)

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.seed()
        self.viewer = None
        self.state = None

        self.steps_beyond_done = None

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def step(self, action):
        err_msg = "%r (%s) invalid" % (action, type(action))
        assert self.action_space.contains(action), err_msg

        x, x_dot, theta, theta_dot = self.state
        force = self.force_mag if action == 1 else -self.force_mag
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass))
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == 'euler':
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot)

        done = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )

        if not done:
            reward = 0.0
        elif self.steps_beyond_done is None:
            # Pole just fell!
            self.steps_beyond_done = 0
            reward = -1.0
        else:
            if self.steps_beyond_done == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned done = True. You "
                    "should always call 'reset()' once you receive 'done = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_done += 1
            reward = 0.0

        return np.array(self.state), reward, done, {}

    def reset(self):
        self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))
        self.steps_beyond_done = None
        return np.array(self.state)

    def render(self, mode='human'):
        screen_width = 600
        screen_height = 400

        world_width = self.x_threshold * 2
        scale = screen_width/world_width
        carty = 100  # TOP OF CART
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_width, screen_height)
            l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
            axleoffset = cartheight / 4.0
            cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.carttrans = rendering.Transform()
            cart.add_attr(self.carttrans)
            self.viewer.add_geom(cart)
            l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(.8, .6, .4)
            self.poletrans = rendering.Transform(translation=(0, axleoffset))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.carttrans)
            self.viewer.add_geom(pole)
            self.axle = rendering.make_circle(polewidth/2)
            self.axle.add_attr(self.poletrans)
            self.axle.add_attr(self.carttrans)
            self.axle.set_color(.5, .5, .8)
            self.viewer.add_geom(self.axle)
            self.track = rendering.Line((0, carty), (screen_width, carty))
            self.track.set_color(0, 0, 0)
            self.viewer.add_geom(self.track)

            self._pole_geom = pole

        if self.state is None:
            return None

        # Edit the pole polygon vertex
        pole = self._pole_geom
        l, r, t, b = -polewidth / 2, polewidth / 2, polelen - polewidth / 2, -polewidth / 2
        pole.v = [(l, b), (l, t), (r, t), (r, b)]

        x = self.state
        cartx = x[0] * scale + screen_width / 2.0  # MIDDLE OF CART
        self.carttrans.set_translation(cartx, carty)
        self.poletrans.set_rotation(-x[2])

        return self.viewer.render(return_rgb_array=mode == 'rgb_array')

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None


In [4]:
# random behavior
env = CustomCartPoleEnv()
env.reset()

for _ in range(5):
    env.reset()
    done = False
    step_counter = 0
    sum_rewards = 0
    for _ in range(50):
        env.render()
        state, reward, done, info = env.step(env.action_space.sample()) # take a random action
        
env.close()




In [40]:
# # collect some statistics on states visited under random policy
# # used for subsequent discretization of state space for tabular Q learning
# nmc = 50000
# saved_states = []

# for i in range(nmc):
#     env.reset()
#     done = False
#     step_counter = 0
#     sum_rewards = 0
#     while not done:
#         state, reward, done, info = env.step(env.action_space.sample()) # take a random action
#         step_counter += 1
#         sum_rewards += reward
#         saved_states.append(state)


# n_points = 4
# q = np.linspace(0,1,n_points*2)
# cutpoints = np.quantile(saved_states, q, axis=0)
# cutpoints = np.insert(cutpoints, n_points, 0, 0) 

# q = np.linspace(0.5/n_points, 1, n_points)
# cut_pos = np.quantile(np.abs(saved_states), q, axis=0)
# cutpoints = np.concatenate((np.flip(-cut_pos,0), cut_pos), axis=0)
# cutpoints = np.insert(cutpoints, n_points, 0, 0) 

In [5]:
cutpoints = np.array([[-0.05,0,0.05], [-0.38, 0, 0.38], [-0.06, 0, 0.06], [-0.55, 0, 0.55]]).transpose()

print(cutpoints)

[[-0.05 -0.38 -0.06 -0.55]
 [ 0.    0.    0.    0.  ]
 [ 0.05  0.38  0.06  0.55]]


In [6]:
# A simple cartpole agent that implements Q learning via a tabular method.
# In the constructor, cutpoints is a n_cuts x 4 array telling the agent how
# to discretize the continuous 4D state space.


class CartPoleAgent():
    def __init__(self, env, cutpoints, epsilon = 0.1):
        # cutpoints: an n_buckets x 4 array of cutpoints for
        # the discrete state-space representation
        n_buckets = cutpoints.shape[0] + 1
        self.Q = np.zeros((n_buckets, n_buckets, n_buckets, n_buckets, 2))
        self.action_space = env.action_space
        self.cutpoints = cutpoints
        self.n_buckets = n_buckets
        
    def discretize_state(self, state_real):
        state_bucket = [np.digitize(state_real[i], self.cutpoints[:,i]) for i in range(4)]
        return tuple(state_bucket)
        
    def choose_action(self, state, eps = 0.05):
    # epsilon-greedy action choice
        if(random.random() < eps):
            action = self.action_space.sample()
        else:
            state_bucket = self.discretize_state(state)
            q_vec = self.Q[(*state_bucket),...]
            if(q_vec[0] == q_vec[1]):
                action = self.action_space.sample()
            else:
                action = np.argmax(self.Q[(*state_bucket),...])
        return action
    
    def show_n_episodes(self, env, n, verbose=True):
        for i in range(n):
            state = env.reset()
            done = False
            step_counter = 0
            sum_rewards = 0
            while not done and step_counter < 500:
                env.render(mode='human')
                action = self.choose_action(state, eps=0) # greedy policy
                state, reward, done, info = env.step(action)
                sum_rewards += reward
                step_counter += 1
            if verbose:
                print(step_counter)
        env.close()
    
class CartPoleSarsaLambdaLearner(CartPoleAgent):
    
    def __init__(self, env, cutpoints, epsilon = 0.1):
        CartPoleAgent.__init__(self, env, cutpoints, epsilon)
        self.Z = np.zeros((self.n_buckets, self.n_buckets, self.n_buckets, self.n_buckets, 2))
        
    
    def update_Q(self, SARSA, alpha=0.001, gamma=1.0, lam = 0.99):
        current_state, action, reward, next_state, next_action = SARSA
        
        # discretize
        current_state_bucket = self.discretize_state(current_state)
        next_state_bucket = self.discretize_state(next_state)
        
        # reward target
        current_Q = self.Q[(*current_state_bucket),action]
        next_Q = self.Q[(*next_state_bucket),next_action]
        delta = reward + gamma*next_Q - current_Q
        
        # update eligibility trace
        self.Z[(*current_state_bucket),action] += 1
        
        # update all states and decay eligibility trace
        self.Q += (alpha*delta)*self.Z
        self.Z = gamma*lam*self.Z
                
            
class CartPoleQLearner(CartPoleAgent):   
    
    def __init__(self, env, cutpoints, epsilon = 0.1):
        CartPoleAgent.__init__(self, env, cutpoints, epsilon)
        
    def update_Q(self, step, alpha, gamma=1.0, method='Q'):
        state, action, reward, next_state = step
        state_bucket = self.discretize_state(state)
        next_state_bucket = self.discretize_state(next_state)
        current_Q = self.Q[(*state_bucket),action]
        best_Qp = np.amax(self.Q[(*next_state_bucket),...])  # Q-value of best action for successor state
        delta = reward + gamma*best_Qp - current_Q
        self.Q[(*state_bucket),action] = current_Q + alpha*delta
             
            
class CartPoleMCLearner(CartPoleAgent):   
    
    def __init__(self, env, cutpoints, epsilon = 0.1):
        CartPoleAgent.__init__(self, env, cutpoints, epsilon)
        self.C = np.zeros((self.n_buckets, self.n_buckets, self.n_buckets, self.n_buckets, 2))
        
    def update_Q(self, episode, alpha=0.001, gamma=1.0, method='Q'):
        total_rewards = 0.0
        # Simple Monte Carlo control
        # work backwards to sum rewards and attribute those to each visited (state, action) pair in the episode
        for state, action, reward, next_state in reversed(episode):
            total_rewards = gamma*total_rewards + reward
            state_bucket = self.discretize_state(state)
            self.C[(*state_bucket),action] += 1
            current_Q = self.Q[(*state_bucket),action]
            current_C = self.C[(*state_bucket),action]
            step_size =  max(alpha, 1.0/current_C)
            self.Q[(*state_bucket),action] += step_size*(total_rewards - current_Q)


In [13]:
###
# SARSA(lambda) learning
###

env = CustomCartPoleEnv()
env.reset()
agent = CartPoleSarsaLambdaLearner(env, cutpoints)

alpha_decay = 0.99
alpha_min = 0.01
alpha = 0.2
lam = 0.9
gam = 0.95


# run some steps
num_episodes = 100

for i in range(num_episodes):
    eps = max(0.1, 1.0/(1.0+i))
    state = env.reset()
    done = False
    step_counter = 0
    sum_rewards = 0
    
    # reset the eligibility trace for this episode
    agent.Z[...] = 0.0
    
    # choose initial action according to epsilon-greedy version of the policy
    action = agent.choose_action(state, eps=eps)
    
    while not done and step_counter < 500:
        
        # we know SA or SARSA from previous step
        # now get RSA (reward, next_state, next_action)
        next_state, reward, done, info = env.step(action)
        next_action = agent.choose_action(next_state, eps=eps)
        
        SARSA = [state, action, reward, next_state, next_action]
        agent.update_Q(SARSA, alpha = alpha, gamma = gam, lam=lam)
        step_counter += 1
        sum_rewards += reward
        state = next_state
        action = next_action
        
    # decay alpha
    alpha = max(alpha_min, alpha*alpha_decay)


In [14]:
agent.show_n_episodes(env, 3)

230
500
500


In [115]:
###
# Q learning
###

env = CustomCartPoleEnv()
env.reset()
agent = CartPoleQLearner(env, cutpoints)

alpha_decay = 0.99
alpha_min = 0.01
alpha = 0.2
gam = 0.95


# run some steps
num_episodes = 100

for i in range(num_episodes):
    eps = max(0.1, 1.0/(1.0+i))
    state = env.reset()
    done = False
    step_counter = 0
    sum_rewards = 0
    
    while not done and step_counter < 500:
        
        action = agent.choose_action(state, eps=eps)
        next_state, reward, done, info = env.step(action)
    
        agent.update_Q([state, action, reward, next_state], alpha = alpha, gamma = gam)
        step_counter += 1
        sum_rewards += reward
        state = next_state
        
    # decay alpha
    alpha = max(alpha_min, alpha*alpha_decay)

print(alpha)
env.close()

0.0732064682546459


In [116]:
agent.show_n_episodes(env, 3)

84
163
163


In [111]:
###
# Monte Carlo learning
###

env = CustomCartPoleEnv()
env.reset()
agent = CartPoleMCLearner(env, cutpoints)

alpha_decay = 0.99
alpha_min = 0.01
alpha = 0.2
lam = 0.8
gam = 0.99


# run some steps
num_episodes = 100

for i in range(num_episodes):
    eps = max(0.1, 1.0/(1.0+i))
    state = env.reset()
    done = False
    step_counter = 0
    sum_rewards = 0
    episode = []
    
    while not done and step_counter < 500:
        
        action = agent.choose_action(state, eps=eps)
        next_state, reward, done, info = env.step(action)
        step_counter += 1
        sum_rewards += reward
        episode.append([state, action, reward, next_state])
        state = next_state
        
        
    # decay alpha
    agent.update_Q(episode, alpha = alpha, gamma = gam)
    alpha = max(alpha_min, alpha*alpha_decay)

print(alpha)
env.close()

0.0732064682546459


In [112]:
agent.show_n_episodes(env, 3)

500
466
401
