In [None]:
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import numpy as np
import pickle
import math

import sys
import os

from ModelFreeRL_Support.helper import *
from ModelFreeRL_Support.dp_algorithms import *
from ece4078.gym_simple_gridworlds.envs.grid_env import GridEnv
from ece4078.gym_simple_gridworlds.envs.grid_2dplot import *

from collections import namedtuple, defaultdict
import matplotlib.gridspec as gridspec

from IPython.display import display, HTML

# 1. The Grid World environment

Recall the grid in which our robot lives

![GridWorldExample.png](https://i.postimg.cc/5tMM5vqf/Grid-World-Example.png)

- The states $s \in \mathcal{S}$ correspond to locations in the grid. Each location has also a cell index associated to it, e.g., cell index 4 is associated to location (row=1,col=0)
- The robot can move up, down, left, or right. Actions correpond to unit increments or decrements in the specified direction.
    - Up : (-1,0)
    - Down: (1,0)
    - Left: (0,-1)
    - Right: (0, 1)
- Each action is represented by a number. Action (Up) is represented by 0, (Down) by 1, (Left) by 2 and, finally, (Right) by 3. No actions are available at a terminal state
- Discount factor $\gamma = 0.9$ (class attribute ``gamma=0.9``)
- Stochastic transition matrix (class attribute ``noise=0.2``)
- Rewards are only obtained at terminal states (class attribute ``living_reward=-0.04``)

### Known Model

Recall also the **optimal policy** we found using policy-iteration

![example_policy.png](https://i.postimg.cc/pLjHnkj0/example-policy.png)

since the dynamics of our grid world environment are known, we obtained the state-value function $v_\pi(s)$ associated to this policy using ``policy_evalution(.)`` 

We have defined the class ``GridEnv`` to represent our Grid World MDP.

In [None]:
# Create a Grid World instance
grid_world = GridEnv(gamma=0.9, noise=0.2, living_reward=-0.04)

# Get policy shown in image
policy_pi = encode_policy(grid_world)

# Compute value-function using dynamic programming
v_pi = policy_evaluation(grid_world, policy_pi)

plot_value_function(grid_world, v_pi)

# 2. Monte Carlo Methods

## 2.1 Monte-Carlo Policy Evaluation

### What if the Transition and Reward Function are Unknown?

Let's first define the helper method ``generate_episode(.)``. It samples an episode i.e., a sequence of ($s, a, r, s'$) tuples, from a given policy

In [None]:
Sample = namedtuple('Sample', ['state', 'action', 'reward', 'next_state'])

def generate_episode(grid_env, policy):
    """
    Generate an episode of experiences in environment under a given policy
    :param grid_env (GridEnv): Environment
    :param policy (dict of probabilites): Policy used to sample actions
    
    :return List(Sample) Complete episode
    """
    episode = []

    # Reset the environment to a random initial state
    state = grid_env.reset()

    # Set flag to indicate whether episode has ended
    done = False

    while not done:
        # Get actions available at current state
        all_actions = list(policy_pi[state].keys())
        # Get action probabilities
        all_probabilities = np.array(list(policy[state].values()))
        # Sample an action from policy
        action = np.random.choice(all_actions, 1, p=all_probabilities)[0]
        
        next_state, reward, done, info = grid_env.step(action)
        episode.append(Sample(state, action, reward, next_state))
        state = next_state

    return episode

Now, under the assumption that $\mathcal{T}(s,a,s')$ and $\mathcal{R}(s,a)$ are unknown, let's use the algorithm shown below to get an estimate $\hat{v}_\pi(s)$ of the true state-value function $v_\pi(s)$

![MCPolicyEvaluation.png](https://i.postimg.cc/6pXj5P6D/MCPolicy-Evaluation.png)

In [None]:
def monte_carlo_first_visit_policy_evaluation(grid_env, policy, true_v, n_episodes=1):
    """
    Compute estimate of state-value function for a given policy
    :param grid_env (GridEnv): Environment
    :param policy (dict of probabilites): Policy to be evaluated
    :param true_v (dict of floats): True state-value function. Used to compute prediciton error
    :param n_episodes (int): Number of episodes to use for prediction
    
    :return List(float): Prediction error after each episode
    :return dict(float): Predicted state-value function
    
    """
    all_states = grid_env.get_states()
    
    # Counter of visits for all states
    state_visits = {s:0 for s in all_states}
    # Cummulative return for each state
    state_returns = {s:0 for s in all_states}
    # Predicted state-value function
    pred_v = {s:0 for s in all_states}
    
    # Variable used for plotting
    list_errors = []
    
    for i in range(n_episodes):
        # Generate episode
        episode = generate_episode(grid_env, policy)
        # Create auxiliary variable to keep of first state visits
        visited = {s: False for s in all_states}
        # Return for current episode
        g = 0
        # Variable used to keep track of prediction error for this episode
        error = 0
        # Starting from last sampled observation in episode
        for obs in episode[::-1]:
            # Get visite state
            s = obs.state
            # Compute return for this state
            g = g*grid_env.gamma + obs.reward    
            # If this is the first time visiting this state
            if not visited[s]:
                # Increment the visit counter
                state_visits[s] += 1
                # Add return
                state_returns[s] += g
                # Compute mean return
                pred_v[s] = state_returns[s]/float(state_visits[s])
                # Set the state as visited
                visited[s] = True
                # Compute error
                error += np.abs(true_v[s] - pred_v[s])
        
        list_errors.append(error)
    
    return list_errors, pred_v

Let's now try the algorithm and compare its out to the true value-state function. 

**Interaction**:
Run the algorithm multiple times and observe what happens when the number of episodes increases.

In [None]:
#change n_episodes to see what happens
errors, predicted_v = monte_carlo_first_visit_policy_evaluation(grid_world, policy_pi, v_pi, n_episodes=100)

fig = plt.figure(constrained_layout=True)
spec = gridspec.GridSpec(ncols=2, nrows=2, figure=fig)
f_ax1 = fig.add_subplot(spec[0, 0])
f_ax2 = fig.add_subplot(spec[0, 1])
f_ax3 = fig.add_subplot(spec[1, :])

#Plot true value function
plot_value_function(grid_world, v_pi, f_ax1)
f_ax1.set_title("True state-value function")

plot_value_function(grid_world, predicted_v, f_ax2)
f_ax2.set_title("Predicted state-value function")

f_ax3.plot(errors)
f_ax3.set_title("Predicted Error (sum of abs. differences)")
f_ax3.set_xlabel("Num. episodes")

In a model-free setting where our state-value and action-value estimates depend on the actions chosen by the agent, how can we guarantee that the all actions will continue to be selected?

## 2.2 $\epsilon$-Greedy Policies

We can use an $\epsilon$-greedy policy. This type of policy are formally defined as:

\begin{equation*}
\begin{aligned}
    \pi(a|s) = 
    \begin{cases}
        1 - \epsilon + \frac{\epsilon}{|\mathcal{A}|},&  \text{if } a^* = \arg\max_{a \in \mathcal{A}} q_\pi(s,a)\\
        \frac{\epsilon}{|\mathcal{A}|}, & \text{otherwise}
    \end{cases}
\end{aligned}
\end{equation*}

Let's see how the agent behaves when it follows an $\epsilon$-greedy policy.

In [None]:
def get_egreedy_action(grid_env, state, q_value, epsilon):
    """
    Select action to execute at a given state under an epsilon-greedy policy
    :param grid_env (GridEnv): Grid world environment
    :param state (int): Location in grid for which next action is going to be choosen
    :param q_value (dict): Action-value function 
    :param epsilon (float): Randomness threshold used to choose action
    """
    
    rand_n = np.random.random()
    if rand_n <= epsilon:
        return grid_env.action_space.sample()
    else:
        actions = list(q_value[state].keys())
        return actions[np.argmax(list(q_value[state].values()))]

In [None]:
# We set noise to zero. Randomness in agent behaviour is only due to e-greedy policy
grid_world = GridEnv(noise=0, living_reward=-0.04, gamma=0.9)

# Get policy shown in section 1
policy_pi = encode_policy(grid_world)

# Compute value-function using dynamic programming
v_pi = policy_evaluation(grid_world, policy_pi)

# Use value-function to compute q-values
q_pi = grid_world.get_q_values(v_pi)

# Start episode
cur_state = grid_world.idx_cur_state
s_x, s_y = get_state_to_plot(grid_world)

# We can visualize our grid world using the render() function
fig, ax = grid_world.render()
agent, = ax.plot([], [], 'o', color='b', linewidth=6)
reward_text = ax.text(0.02, 0.95, '', transform=ax.transAxes)

done = False
cumulative_reward = 0
path_to_plot = []

v_epsilon = 0.8

while not done:
    action = get_egreedy_action(grid_world, cur_state, q_pi, v_epsilon)
    cur_state, cur_reward, done, _ = grid_world.step(int(action))
    n_x, n_y = get_state_to_plot(grid_world)
    cumulative_reward += cur_reward
    path_to_plot.append([cumulative_reward, n_x, n_y])

def init():
    agent.set_data([s_x + 0.5], [s_y + 0.5])
    reward_text.set_text('')
    return agent, reward_text

def animate(i):
    if i < len(path_to_plot):
        r, n_x, n_y = path_to_plot[i]
        agent.set_data([n_x + 0.5], [n_y + 0.5])
        reward_text.set_text('Cumulative reward: %.2f' % r)
    return agent, reward_text

ani = animation.FuncAnimation(fig, animate, frames=len(path_to_plot), blit=False, interval=500, init_func=init,
                              repeat=False)

plt.close('all') 
display(HTML(f"<div align=\"center\">{ani.to_jshtml()}</div>"))

# 3. Temporal Difference Methods

# 3.1 TD Policy Evaluation
Estimate $\hat{v}_\pi(s)$ of the true state-value function $v_\pi(s)$

![TDPolicyEvaluation.png](https://i.postimg.cc/c4yywX4c/TDPolicy-Evaluation.png)

In [None]:
def temporal_learning_policy_evaluation(grid_env, policy, true_v, alpha=0.1, n_episodes=1):
    """
    Compute estimate of state-value function for a given policy
    :param grid_env (GridEnv): Environment
    :param policy (dict of probabilites): Policy to be evaluated
    :param true_v (dict of floats): True state-value function. Used to compute prediciton error
    :param alpha (float): step-size
    :param n_episodes (int): Number of episodes to use for prediction
    
    :return List(float): Prediction error after each episode
    :return dict(float): Predicted state-value function
    
    """
    all_states = grid_env.get_states()
    
    # Predicted state-value function
    pred_v = {s:0 for s in all_states}
    
    # Variable used for plotting
    list_errors = []
    
    for i in range(n_episodes):
        # Generate episode
        episode = generate_episode(grid_env, policy)
        
        # Variable used to keep track of prediction error for this episode
        error = 0
        
        # Starting from the first sampled observation in episode
        for obs in episode:
            pred_v[obs.state] += alpha * (obs.reward + grid_env.gamma * pred_v[obs.next_state] - pred_v[obs.state]) 
            error += np.abs(true_v[obs.state] - pred_v[obs.state])       
        
        list_errors.append(error)
    
    return list_errors, pred_v

Let's now try the algorithm and compare its output to the true value-state function. 

In [None]:
grid_world = GridEnv(gamma=0.9, noise=0.2, living_reward=-0.04)
policy_pi = encode_policy(grid_world)
v_pi = policy_evaluation(grid_world, policy_pi)

#change n_episodes to see what happens
errors, predicted_v = temporal_learning_policy_evaluation(grid_world, policy_pi, v_pi, alpha=0.1, n_episodes=200)

fig = plt.figure(constrained_layout=True)
spec = gridspec.GridSpec(ncols=2, nrows=2, figure=fig)
f_ax1 = fig.add_subplot(spec[0, 0])
f_ax2 = fig.add_subplot(spec[0, 1])
f_ax3 = fig.add_subplot(spec[1, :])

#Plot true value function
plot_value_function(grid_world, v_pi, f_ax1)
f_ax1.set_title("True state-value function")

plot_value_function(grid_world, predicted_v, f_ax2)
f_ax2.set_title("Predicted state-value function")

f_ax3.plot(errors)
f_ax3.set_title("Predicted Error (sum of abs. differences)")
f_ax3.set_xlabel("Num. episodes")

## 3.2 Q-Learning

We have seen how to evaluate a policy without a model. Let's now find an *approximately* optimal policy using the off-policy control method Q-learning.

To help during the learning, we have added a lambda function that iteratively decreases epsilon. Our agent will strongly explore the environment at first to then swicth into exploitation mode

In [None]:
min_epsilon=0.001
max_epsilon=1.0
epsilon_decay = 80.0
epsilon_by_episode = lambda ep_idx: (min_epsilon 
         + (max_epsilon - min_epsilon) 
         * math.exp (-1 * ep_idx/epsilon_decay))

fig, ax = plt.subplots(figsize=(4, 4))

ax.plot([epsilon_by_episode(i) for i in range(500)])
ax.set_xlabel("Num. episodes")
ax.set_ylabel("Epsilon")

Here is our implementation of the q-learning algorithm shown below

![q-learning.png](https://i.postimg.cc/8z70Yv5C/q-learning.png)

**Complete the missing steps**:
- Choose an action using an $\epsilon$-greedy policy (use the function ``get_egreedy_action(.)`` we tested in Section 3.1)
- Update our q-function using a greedy (max) policy (use ``q_function[cur_state][action]`` to index our q-function)

**Keep in Mind**: Correspondance between the mathematical notation and implemented code

|                 |                            |                  |
| :-------------- | -------------------------: | ---------------: |
|                 | **Variable/Attribute**     | **Type**         | 
| $\epsilon$      | `epsilon_by_episode`       | `float`          |
| $\alpha$        | `alpha`                    | `float`          | 
| $\gamma$        | `grid_world.gamma`         | `float`          | 
| $\hat{q}(s, a)$ | `q_function[idx_s][idx_a]` | `dict` of `dict` | 
| $s$             | `cur_state`                | `int`            | 
| $r$             | `reward`                   | `int`            |
| $s^{\prime}$    | `next_state`               | `int`            |

In [None]:
def q_learning(grid_env, alpha=0.1, min_epsilon=0.01, max_epsilon=1.0, 
               epsilon_decay = 80.0, n_episodes=500):
    """
    This function computes an approximately optimal policy using q-learning
    
    :param grid_env (GridEnv): MDP environment
    :param alpha (float): step-size
    :param epsilon (float): value used during e-greedy action selection
    :return: (dict) State-values for all non-terminal states
    """
        
    # This lambda function iteratively decreases epsilon
    epsilon_by_episode = lambda ep_idx: min_epsilon + (max_epsilon - min_epsilon) * math.exp (-1 * ep_idx/epsilon_decay)
    
    # Obtain list of all states in environment
    states = grid_env.get_states()
    actions = grid_env.get_actions()
    q_function = defaultdict(lambda: defaultdict(float))
    
    # Initialize q_function arbitrarily
    for s in states:
        for a in actions:
            q_function[s][a] = 0
    
    
    for i_episode in range(1, n_episodes+1):
        cur_state = grid_env.reset()
        done = False
        epsilon = epsilon_by_episode(i_episode)
        
        while not done:
            #TODO 1: Complete off-policy action selection (e-greedy)-----------
            action = None
            #ENDTODO ----------------------------------------------------------
            
            next_state, reward, done,_ = grid_env.step(action)
            q_next_state = list(q_function[next_state].values())
            
            #TODO 2: Complete update of q-function -----------------------------
            q_function[cur_state][action] += 0
            #ENDTODO ------------------------------------------------------------
            
            cur_state=next_state
    
    return decode_policy(grid_env, q_function)

Let's now test our implementation and compare our free-model policy with the one we obtained in the last lecture using value iteration

In [None]:
grid_world = GridEnv(gamma=0.9, noise=0.2, living_reward=-0.04)
policy_pi = encode_policy(grid_world)
v_pi = policy_evaluation(grid_world, policy_pi)

q_learning_policy = q_learning(grid_world)

# DISPLAYING COMPARISON PLOT

fig = plt.figure(figsize=(8, 8), constrained_layout=True)
spec = gridspec.GridSpec(ncols=2, nrows=2, figure=fig)
f_ax1 = fig.add_subplot(spec[0, 0])
f_ax2 = fig.add_subplot(spec[0, 1])
f_ax3 = fig.add_subplot(spec[1, 0])
f_ax4 = fig.add_subplot(spec[1, 1])

#Plot policy obtained using value-iteration value function
grid_world = GridEnv(gamma=0.9, noise=0.2, living_reward=-0.04)
value_function, optimal_policy = value_iteration(grid_world)

plot_policy(grid_world, optimal_policy, f_ax1)
f_ax1.set_title("Policy - Value Iteration")

plot_policy(grid_world, q_learning_policy, f_ax2)
f_ax2.set_title("Policy - Q-learning")

# Compute value function for q_learning policy
q_policy_state_values = policy_evaluation(grid_world, encode_policy(grid_world, q_learning_policy))

plot_value_function(grid_world, value_function, f_ax3)
f_ax3.set_title("Value Function - Value Iteration")

plot_value_function(grid_world, q_policy_state_values, f_ax4)
f_ax4.set_title("Value Function - Q-learning")