In [52]:
import time
import random

In [53]:
class Env():
    def __init__(self, length, height):
        # define the height and length of the map
        self.length = length
        self.height = height
        # define the agent's start position
        self.x = 0
        self.y = 0

    def render(self, frames=50):
        for i in range(self.height):
            if i == 0:  # cliff is in the line 0
                line = ['S'] + ['x'] * (self.length - 2) + ['T']  # 'S':start, 'T':terminal, 'x':the cliff
            else:
                line = ['.'] * self.length
            if self.x == i:
                line[self.y] = 'o'  # mark the agent's position as 'o'
            print(''.join(line))
        print('\033[' + str(self.height + 1) + 'A')  # printer go back to top-left 
        time.sleep(1.0 / frames)

    def step(self, action):
        """4 legal actions, 0:up, 1:down, 2:left, 3:right"""
        change = [[0, 1], [0, -1], [-1, 0], [1, 0]]
        self.x = min(self.height - 1, max(0, self.x + change[action][0]))
        self.y = min(self.length - 1, max(0, self.y + change[action][1]))

        states = [self.x, self.y]
        reward = -1
        terminal = False
        if self.x == 0:  # if agent is on the cliff line "SxxxxxT"
            if self.y > 0:  # if agent is not on the start position 
                terminal = True
                if self.y != self.length - 1:  # if agent falls
                    reward = -100
        return reward, states, terminal

    def reset(self):
        self.x = 0
        self.y = 0

In [54]:
class Q_table():
    def __init__(self, length, height, actions=4, alpha=0.1, gamma=0.9):
        self.table = [0] * actions * length * height  # initialize all Q(s,a) to zero
        self.actions = actions
        self.length = length
        self.height = height
        self.alpha = alpha
        self.gamma = gamma

    def _index(self, a, x, y):
        """Return the index of Q([x,y], a) in Q_table."""
        return a * self.height * self.length + x * self.length + y

    def _epsilon(self):
        return 0.1
        # version for better convergence:
        # """At the beginning epsilon is 0.2, after 300 episodes decades to 0.05, and eventually go to 0."""
        # return 20. / (num_episode + 100)

    def take_action(self, x, y, num_episode):
        """epsilon-greedy action selection"""
        if random.random() < self._epsilon():
            return int(random.random() * 4)
        else:
            actions_value = [self.table[self._index(a, x, y)] for a in range(self.actions)]
            return actions_value.index(max(actions_value))

    def max_q(self, x, y):
        actions_value = [self.table[self._index(a, x, y)] for a in range(self.actions)]
        return max(actions_value)

    def update(self, a, s0, s1, r, is_terminated):
        # both s0, s1 have the form [x,y]
        q_predict = self.table[self._index(a, s0[0], s0[1])]
        if not is_terminated:
            q_target = r + self.gamma * self.max_q(s1[0], s1[1])
        else:
            q_target = r
        self.table[self._index(a, s0[0], s0[1])] += self.alpha * (q_target - q_predict)

In [55]:
def cliff_walk():
    env = Env(length=12, height=4)
    table = Q_table(length=12, height=4)
    for num_episode in range(3000):
        # within the whole learning process
        episodic_reward = 0
        is_terminated = False
        s0 = [0, 0]
        while not is_terminated:
            # within one episode
            action = table.take_action(s0[0], s0[1], num_episode)
            r, s1, is_terminated = env.step(action)
            table.update(action, s0, s1, r, is_terminated)
            episodic_reward += r
            # env.render(frames=100)
            s0 = s1
        if num_episode % 20 == 0:
            print("Episode: {}, Score: {}".format(num_episode, episodic_reward))
        env.reset()

In [56]:
cliff_walk()

Episode: 0, Score: -100
Episode: 20, Score: -176
Episode: 40, Score: -177
Episode: 60, Score: -156
Episode: 80, Score: -49
Episode: 100, Score: -68
Episode: 120, Score: -81
Episode: 140, Score: -41
Episode: 160, Score: -36
Episode: 180, Score: -32
Episode: 200, Score: -122
Episode: 220, Score: -29
Episode: 240, Score: -126
Episode: 260, Score: -15
Episode: 280, Score: -112
Episode: 300, Score: -23
Episode: 320, Score: -17
Episode: 340, Score: -15
Episode: 360, Score: -109
Episode: 380, Score: -13
Episode: 400, Score: -100
Episode: 420, Score: -19
Episode: 440, Score: -13
Episode: 460, Score: -13
Episode: 480, Score: -13
Episode: 500, Score: -15
Episode: 520, Score: -13
Episode: 540, Score: -13
Episode: 560, Score: -15
Episode: 580, Score: -15
Episode: 600, Score: -15
Episode: 620, Score: -102
Episode: 640, Score: -13
Episode: 660, Score: -13
Episode: 680, Score: -14
Episode: 700, Score: -106
Episode: 720, Score: -103
Episode: 740, Score: -13
Episode: 760, Score: -13
Episode: 780, Score

In [57]:
import numpy as np

import gym
from gym import wrappers

off_policy = True  # if True use off-policy q-learning update, if False, use on-policy SARSA update

n_states = 40
iter_max = 5000

initial_lr = 1.0  # Learning rate
min_lr = 0.003
gamma = 1.0
t_max = 10000
eps = 0.1


def run_episode(env, policy=None, render=False):
    obs, _ = env.reset()
    total_reward = 0
    step_idx = 0
    for _ in range(t_max):
        if render:
            env.render()
        if policy is None:
            action = env.action_space.sample()
        else:
            a, b = obs_to_state(env, obs)
            action = policy[a][b]
        obs, reward, done, _, _ = env.step(action)
        total_reward += gamma ** step_idx * reward
        step_idx += 1
        if done:
            break
    return total_reward


def obs_to_state(env, obs):
    """ Maps an observation to state """
    # we quantify the continous state space into discrete space
    env_low = env.observation_space.low
    env_high = env.observation_space.high
    env_dx = (env_high - env_low) / n_states
    a = int((obs[0] - env_low[0]) / env_dx[0])
    b = int((obs[1] - env_low[1]) / env_dx[1])
    a = a if a < n_states else n_states - 1
    b = b if b < n_states else n_states - 1
    return a, b


if __name__ == '__main__':
    env_name = 'MountainCar-v0'
    # env = gym.make(env_name, render_mode="human")
    env = gym.make(env_name)
    # env.seed(0)
    np.random.seed(0)
    if off_policy == True:
        print('----- using Q Learning -----')
    else:
        print('------ using SARSA Learning ---')

    q_table = np.zeros((n_states, n_states, 3))
    for i in range(iter_max):
        obs, _ = env.reset()
        total_reward = 0
        ## eta: learning rate is decreased at each step
        eta = max(min_lr, initial_lr * (0.85 ** (i // 100)))
        for j in range(t_max):
            a, b = obs_to_state(env, obs)
            if np.random.uniform(0, 1) < eps:
                action = np.random.choice(env.action_space.n)
            else:
                action = np.argmax(q_table[a][b])
            obs, reward, done, _, _ = env.step(action)
            total_reward += reward
            # update q table
            a_, b_ = obs_to_state(env, obs)
            if off_policy == True:
                # use q-learning update (off-policy learning)
                q_table[a][b][action] = q_table[a][b][action] + eta * (
                        reward + gamma * np.max(q_table[a_][b_]) - q_table[a][b][action])
            else:
                # use SARSA update (on-policy learning)
                # epsilon-greedy policy on Q again
                if np.random.uniform(0, 1) < eps:
                    action_ = np.random.choice(env.action_space.n)
                else:
                    action_ = np.argmax(q_table[a_][b_])
                q_table[a][b][action] = q_table[a][b][action] + eta * (
                        reward + gamma * q_table[a_][b_][action_] - q_table[a][b][action])
            if done:
                break
        if i % 200 == 0:
            print('Iteration #%d -- Total reward = %d.' % (i + 1, total_reward))
    solution_policy = np.argmax(q_table, axis=2)
    solution_policy_scores = [run_episode(env, solution_policy, False) for _ in range(100)]
    print("Average score of solution = ", np.mean(solution_policy_scores))
    # Animate it
    for _ in range(2):
        run_episode(env, solution_policy, True)
    env.close()

----- using Q Learning -----
Iteration #1 -- Total reward = -10000.
Iteration #201 -- Total reward = -529.
Iteration #401 -- Total reward = -214.
Iteration #601 -- Total reward = -216.
Iteration #801 -- Total reward = -157.
Iteration #1001 -- Total reward = -184.
Iteration #1201 -- Total reward = -170.
Iteration #1401 -- Total reward = -125.
Iteration #1601 -- Total reward = -177.
Iteration #1801 -- Total reward = -127.
Iteration #2001 -- Total reward = -155.
Iteration #2201 -- Total reward = -161.
Iteration #2401 -- Total reward = -120.
Iteration #2601 -- Total reward = -131.
Iteration #2801 -- Total reward = -120.
Iteration #3001 -- Total reward = -158.
Iteration #3201 -- Total reward = -126.
Iteration #3401 -- Total reward = -154.
Iteration #3601 -- Total reward = -140.
Iteration #3801 -- Total reward = -190.
Iteration #4001 -- Total reward = -123.
Iteration #4201 -- Total reward = -150.
Iteration #4401 -- Total reward = -115.
Iteration #4601 -- Total reward = -159.
Iteration #4801 