In [None]:
import collections
import gym
import random
import numpy as np
import operator
import pprint

<h1 style="text-align: center;">Monte-Carlo Learning</h1>

In [None]:
def every_visit_monte_carlo(iterations, alpha, gamma, exercise_name, policy):
    val_map = {}
    for round in range(iterations):
        episode = generate_full_episode(exercise_name, policy)
        g = 0
        for t in reversed(range(len(episode))):
            step = episode[t]
            state = step[0]
            reward = step[2]
            g = gamma * g + reward
            val_map[state] = val_map.get(state, 0) +  alpha * (g - val_map.get(state, 0))
    return val_map

<h1 style="text-align: center;">Temporal-Difference Learning</h1>

In [None]:
def temporal_difference_zero(iterations, initial_state_value, alpha, gamma, exercise_name, policy):
    val_map = {}
    for round in range(iterations):
        episode = generate_full_episode(exercise_name, policy)
        for t in range(0, len(episode)-1):
            # current step (t)
            step = episode[t]
            state = step[0]
            reward = step[2]
            
            # next step (t+1)
            next_step = episode[t + 1]
            next_state = next_step[0]
            
            # v(s_t)
            state_value = val_map.get(state, initial_state_value)
            
            # v(s_t+1)
            next_state_value = val_map.get(next_state, initial_state_value)
            
            # Incremental update of value function
            val_map[state] = state_value + alpha * (reward + (gamma * next_state_value))
        
        val_map[next_state] = next_state_value
    return val_map


<h1 style="text-align: center;">TD(λ)</h1>

In [None]:
def temporal_difference_lambda(iterations, initial_state_value, alpha, gamma, exercise_name, policy, lambd=0.5):
    val_map = {}
    e_map = {}
    for round in range(iterations):
        episode = generate_full_episode(exercise_name, policy)
        for t in range(0, len(episode)-1):
            # current step (t)
            step = episode[t]
            state = step[0]
            reward = step[2]
            
            # next step (t+1)
            next_step = episode[t + 1]
            next_state = next_step[0]
            
            if state not in val_map:
                val_map[state] = initial_state_value
            if state not in e_map:
                e_map[state] = initial_state_value
            delta = reward + gamma * val_map.get(next_state, initial_state_value) - val_map.get(state, initial_state_value)
            e_map[state] = e_map.get(state, initial_state_value) + 1
            for key in val_map:
                e_map[state] = gamma * lambd * e_map.get(state, initial_state_value)
                val_map[state] = val_map.get(state, initial_state_value) + alpha * delta * e_map.get(state, initial_state_value)                                    
        val_map[next_state] = val_map.get(next_state, initial_state_value)
    return val_map

In [None]:
# random policy
def random_policy(env, state):
    return env.action_space.sample()

In [None]:
def generate_full_episode(exercise, policy):
    env = gym.make(exercise)
    state = env.reset()
    episode = []
    while True:
        action = policy(env, state)
        # perform the action
        new_state, reward, done, _ = env.step(action)
        episode.append([state, action, reward])
        state = new_state
        if done:
            # append termination state to episode
            episode.append([state, -1, 0])
            break
    return episode

<h1 style="text-align: center;">OpenAI</h1>

In [None]:
# Hyperparameter
episodes = 1000
alpha = .01
gamma = .9
initialization_value = 0
lambd = .5

<h2 style="text-align: center;">FrozenLake-v0</h2>

The agent controls the movement of a character in a grid world. Some tiles of the grid are walkable, and others lead to the agent falling into the water. Additionally, the movement direction of the agent is uncertain and only partially depends on the chosen direction. The agent is rewarded for finding a walkable path to a goal tile.

*Winter is here. You and your friends were tossing around a frisbee at the park when you made a wild throw that left the frisbee out in the middle of the lake. The water is mostly frozen, but there are a few holes where the ice has melted. If you step into one of those holes, you'll fall into the freezing water. At this time, there's an international frisbee shortage, so it's absolutely imperative that you navigate across the lake and retrieve the disc. However, the ice is slippery, so you won't always move in the direction you intend.*

*The surface is described using a grid like the following:*


```
SFFF       (S: starting point, safe)
FHFH       (F: frozen surface, safe)
FFFH       (H: hole, fall to your doom)
HFFG       (G: goal, where the frisbee is located)
```

The episode ends when you reach the goal or fall in a hole. You receive a reward of 1 if you reach the goal, and zero otherwise.

Source: https://gym.openai.com/envs/FrozenLake-v0/

<img src="https://images.squarespace-cdn.com/content/v1/584219d403596e3099e0ee9b/1582183856858-QTMYV4HGGOA8BJPN83CA/frozen_lake.jpg?format=500w" style="width: 400px;"/>

In [None]:
# optimal FrozenLake policy

NOOP = -1
LEFT = 0
DOWN = 1
RIGHT = 2
UP = 3

action_script = {
    0: DOWN,
    1: RIGHT,
    2: DOWN,
    3: LEFT,
    4: DOWN,
    5: NOOP,
    6: DOWN,
    7: NOOP,
    8: RIGHT,
    9: DOWN,
    10: DOWN,
    11: NOOP,
    12: NOOP,
    13: RIGHT,
    14: RIGHT,
    15: NOOP
}

def optimal_policy(env, state):
    return action_script[state]

In [None]:
exercise = 'FrozenLake-v0'

# Monte Carlo
val_map = every_visit_monte_carlo(episodes, alpha, gamma, exercise, random_policy)
print('MC - Random policy')
print(f'{np.array([val_map[key] for key in sorted(val_map.keys())]).reshape(4, 4)}\n')

val_map = every_visit_monte_carlo(episodes, alpha, gamma, exercise, optimal_policy)
print('MC - Optimal policy')
print(f'{np.array([val_map[key] for key in sorted(val_map.keys())]).reshape(4, 4)}\n')
print('\n')



# TD(0)
val_map = temporal_difference_zero(episodes, initialization_value, alpha, gamma, exercise, random_policy)
print('TD(0) - Random policy')
print(f'{np.array([val_map[key] for key in sorted(val_map.keys())]).reshape(4, 4)}\n')

val_map = temporal_difference_zero(episodes, initialization_value, alpha, gamma, exercise, optimal_policy)
print('TD(0) - Optimal policy')
print(f'{np.array([val_map[key] for key in sorted(val_map.keys())]).reshape(4, 4)}\n')
print('\n')



# TD(λ)
val_map = temporal_difference_lambda(episodes, initialization_value, alpha, gamma, exercise, random_policy, lambd)
print('TD(λ) - Random policy')
print(f'{np.array([val_map[key] for key in sorted(val_map.keys())]).reshape(4, 4)}\n')

val_map = temporal_difference_lambda(episodes, initialization_value, alpha, gamma, exercise, optimal_policy, lambd)
print('TD(λ) - Optimal policy')
print(f'{np.array([val_map[key] for key in sorted(val_map.keys())]).reshape(4, 4)}\n')


<h1 style="text-align: center;">Taxi-v3</h1>

This task was introduced in [Dietterich2000] to illustrate some issues in hierarchical reinforcement learning. There are 4 locations (labeled by different letters) and your job is to pick up the passenger at one location and drop him off in another. You receive +20 points for a successful dropoff, and lose 1 point for every timestep it takes. There is also a 10 point penalty for illegal pick-up and drop-off actions.

Source: https://gym.openai.com/envs/Taxi-v3/

<img src="https://storage.googleapis.com/lds-media/images/Reinforcement_Learning_Taxi_Env.width-1200.png" style="width: 400px;"/>

In [None]:
# Improved Taxi v3 policy with random movement and targeted pick up and drop of 

# see https://github.com/openai/gym/blob/master/gym/envs/toy_text/taxi.py (l 134 ff.)
x_factor = 100
y_factor = 20

# Calculates the positional part of the state
def compute_positional_state_part(x, y):
    return x * x_factor + y * y_factor

## Position
POSITION_R = compute_positional_state_part(0, 0)
POSITION_G = compute_positional_state_part(0, 4)
POSITION_Y = compute_positional_state_part(4, 0)
POSITION_B = compute_positional_state_part(4, 3)

## Passenger locations
PASSENGER_AT_R = 0
PASSENGER_AT_G = 1
PASSENGER_AT_Y = 2
PASSENGER_AT_B = 3
PASSENGER_IN_TAXI = 4

## Destinations
DEST_R = 0
DEST_G = 1
DEST_Y = 2
DEST_B = 3

## Actions
SOUTH = 0
NORTH = 1
EAST = 2
WEST = 3
PICK_UP = 4
DROP_OFF = 5

action_script = {
    POSITION_R + PASSENGER_AT_R * 4 + DEST_R: PICK_UP,  # State: 0
    POSITION_R + PASSENGER_AT_R * 4 + DEST_G: PICK_UP,  # State: 1
    POSITION_R + PASSENGER_AT_R * 4 + DEST_Y: PICK_UP,  # State: 2
    POSITION_R + PASSENGER_AT_R * 4 + DEST_B: PICK_UP,  # State: 3
    POSITION_G + PASSENGER_AT_G * 4 + DEST_R: PICK_UP,  # State: 84
    POSITION_G + PASSENGER_AT_G * 4 + DEST_G: PICK_UP,  # State: 85
    POSITION_G + PASSENGER_AT_G * 4 + DEST_Y: PICK_UP,  # State: 86
    POSITION_G + PASSENGER_AT_G * 4 + DEST_B: PICK_UP,  # State: 87
    POSITION_Y + PASSENGER_AT_Y * 4 + DEST_R: PICK_UP,  # State: 408
    POSITION_Y + PASSENGER_AT_Y * 4 + DEST_G: PICK_UP,  # State: 409
    POSITION_Y + PASSENGER_AT_Y * 4 + DEST_Y: PICK_UP,  # State: 410
    POSITION_Y + PASSENGER_AT_Y * 4 + DEST_B: PICK_UP,  # State: 411
    POSITION_B + PASSENGER_AT_B * 4 + DEST_R: PICK_UP,  # State: 472
    POSITION_B + PASSENGER_AT_B * 4 + DEST_G: PICK_UP,  # State: 473
    POSITION_B + PASSENGER_AT_B * 4 + DEST_Y: PICK_UP,  # State: 474
    POSITION_B + PASSENGER_AT_B * 4 + DEST_B: PICK_UP,  # State: 475
    POSITION_R + PASSENGER_IN_TAXI * 4 + DEST_R: DROP_OFF,  # State: 16
    POSITION_G + PASSENGER_IN_TAXI * 4 + DEST_G: DROP_OFF,  # State: 97
    POSITION_Y + PASSENGER_IN_TAXI * 4 + DEST_Y: DROP_OFF,  # State: 418
    POSITION_B + PASSENGER_IN_TAXI * 4 + DEST_B: DROP_OFF  # State: 479
}


def improved_policy(env, state):
    return action_script.get(state, random.choice([SOUTH, NORTH, EAST, WEST]))

Executes the three policy evaluation algorithms for a random and a improved policy in the Taxi-v3 environment and stores the calculated state values. The calculation may took a while due to the the high number of episodes and steps per episode.

In [None]:
exercise = 'Taxi-v3'

# Monte Carlo
val_map_mc_random = every_visit_monte_carlo(episodes, alpha, gamma, exercise, random_policy)
val_map_mc_improved = every_visit_monte_carlo(episodes, alpha, gamma, exercise, improved_policy)

# TD(0)
val_map_td0_random = temporal_difference_zero(episodes, initialization_value, alpha, gamma, exercise, random_policy)
val_map_td0_improved = temporal_difference_zero(episodes, initialization_value, alpha, gamma, exercise, improved_policy)

# TD(λ)
val_map_tdl_random = temporal_difference_lambda(episodes, initialization_value, alpha, gamma, exercise, random_policy, lambd)
val_map_tdl_improved = temporal_difference_lambda(episodes, initialization_value, alpha, gamma, exercise, improved_policy, lambd)

<h3 style="text-align: center;">Comparison of value-functions ... </h3>

The following methods can be used to print certain state values of the Taxi-v3 environment in both policies with all evaluation algorithms. Requires that the above code has been executed.

In [None]:
env = gym.make(exercise)

def render(state):
    env.env.s = state
    env.render()
    
def print_state_values(state):
    print(f'State #{state}')
    render(state)
    print(f'Policy   | Algorithm | Value-Function')
    print(f'---------+-----------+---------------')
    print(f'Random   | MC        | {val_map_mc_random.get(state, 0)}')
    print(f'Random   | TD(0)     | {val_map_td0_random.get(state, 0)}')
    print(f'Random   | TD(λ)     | {val_map_tdl_random.get(state, 0)}')
    print(f'---------+-----------+---------------')
    print(f'Improved | MC        | {val_map_mc_improved.get(state, 0)}')
    print(f'Improved | TD(0)     | {val_map_td0_improved.get(state, 0)}')
    print(f'Improved | TD(λ)     | {val_map_tdl_improved.get(state, 0)}')
    print(f'\n')

<h4 style="text-align: center;">... for all states</h4>

In [None]:
for i in range(0, env.nS):
    print_state_values(i)

<h4 style="text-align: center;">... for a specific state</h4>

In [None]:
state = 97
print_state_values(state)

<h4 style="text-align: center;">... for the best states</h4>

In [None]:
# best states

print('- Random policy (best) -\n')
mc_random_best = max(val_map_mc_random.items(), key=operator.itemgetter(1))[0]
print(f'MC   : #{mc_random_best}')
print(f'value: {val_map_mc_random[mc_random_best]}')
render(mc_random_best)

td0_random_best = max(val_map_td0_random.items(), key=operator.itemgetter(1))[0]
print(f'TD(0): #{td0_random_best}')
print(f'value: {val_map_td0_random[td0_random_best]}')
render(td0_random_best)

tdl_random_best = max(val_map_tdl_random.items(), key=operator.itemgetter(1))[0]
print(f'TD(λ): #{tdl_random_best}')
print(f'value: {val_map_tdl_random[tdl_random_best]}')
render(tdl_random_best)


print('- Improved random policy (best) -\n')
mc_improved_best = max(val_map_mc_improved.items(), key=operator.itemgetter(1))[0]
print(f'MC   : #{mc_improved_best}')
print(f'value: {val_map_mc_improved[mc_improved_best]}')
render(mc_improved_best)

td0_improved_best = max(val_map_td0_improved.items(), key=operator.itemgetter(1))[0]
print(f'TD(0): #{td0_improved_best}')
print(f'value: {val_map_td0_improved[td0_improved_best]}')
render(td0_improved_best)

tdl_improved_best = max(val_map_tdl_improved.items(), key=operator.itemgetter(1))[0]
print(f'TD(λ): #{tdl_improved_best}')
print(f'value: {val_map_tdl_improved[tdl_improved_best]}')
render(tdl_improved_best)

<h4 style="text-align: center;">... for the worst states</h4>

In [None]:
# worst states

print('- Random policy (worst) -\n')
mc_random_worst = min(val_map_mc_random.items(), key=operator.itemgetter(1))[0]
print(f'MC   : #{mc_random_worst}')
print(f'value: {val_map_mc_random[mc_random_worst]}')
render(mc_random_worst)

td0_random_worst = min(val_map_td0_random.items(), key=operator.itemgetter(1))[0]
print(f'TD(0): #{td0_random_worst}')
print(f'value: {val_map_td0_random[td0_random_worst]}')
render(td0_random_worst)

tdl_random_worst = min(val_map_tdl_random.items(), key=operator.itemgetter(1))[0]
print(f'TD(λ): #{tdl_random_worst}')
print(f'value: {val_map_tdl_random[tdl_random_worst]}')
render(tdl_random_worst)


print('- Improved random policy (worst) -\n')
mc_improved_worst = min(val_map_mc_improved.items(), key=operator.itemgetter(1))[0]
print(f'MC   : #{mc_improved_worst}')
print(f'value: {val_map_mc_improved[mc_improved_worst]}')
render(mc_improved_worst)

td0_improved_worst = min(val_map_td0_improved.items(), key=operator.itemgetter(1))[0]
print(f'TD(0): #{td0_improved_worst}')
print(f'value: {val_map_td0_improved[td0_improved_worst]}')
render(td0_improved_worst)

tdl_improved_worst = min(val_map_tdl_improved.items(), key=operator.itemgetter(1))[0]
print(f'TD(λ): #{tdl_improved_worst}')
print(f'value: {val_map_tdl_improved[tdl_improved_worst]}')
render(tdl_improved_worst)