# TP2 INFO8003
The idea behind this notebook is to get familiar with the state-action value function $Q(s,a)$ and learning this function using model-based and model-free approaches for small state-action spaces.

## Environment description 

Jack is searching for treasures on an island represented by a 5x5 grid, where each cell corresponds to a state. Jack starts at the bottom-left corner, and his goal is to navigate the grid to collect treasures. There are two types of treasures: a small one at (2, 2) that rewards him with 1 gold each time he collects it, and a large one at (4, 4) that rewards him with 10 gold each time. Walls are present at (1, 1), (2, 1), and (2, 3), and Jack cannot pass through these cells (they are not considered as state therefore).

Beware of cliffs at (4, 1), (4, 2), and (4, 3), as falling off a cliff will cost Jack 100 gold. Additionally, some cells are slippery (the blue ones shown in the figure below), increasing the likelihood of falling into a cliff.

Since Jack has consumed some rum, he has an 80% chance of performing the intended action and a 20% chance of performing a random action. He can move right, left, up, or down. The environment will display Jack’s position and the layout of the grid.

<p align="center"> 
    <img src="GridEnv.png" alt="GridEnv visualization">
    </p>

In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

class GridEnv(gym.Env):
    def __init__(self):
        super(GridEnv, self).__init__()
        self.grid_size = 5
        self.start_state = (4, 0)
        self.small_treasure_state = (2, 2)
        self.big_treasure_state = (4, 4)
        self.wall_states = [(1, 1), (2, 1), (2, 3)]
        self.cliff_states = [(4, 1), (4, 2), (4, 3)]
        self.slippery_states = [(3, 1), (3, 2), (3, 3)]

        # Actions: 0=Right, 1=Left, 2=Up, 3=Down
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Discrete(self.grid_size * self.grid_size-len(self.wall_states))

        self.state = self.encode_state(self.start_state)

    def encode_state(self, state_tuple):
        x, y = state_tuple
        return x  * self.grid_size + y

    def decode_state(self, state_int):
        x = state_int // self.grid_size
        y = state_int % self.grid_size
        return (x, y)

    def step(self, action):
        x, y = self.decode_state(self.state)
        new_x, new_y = x, y

        # Check if Jack takes an action at random or not
        w = np.random.uniform()
        if w < 0.2:
            action = self.action_space.sample()

        if action == 0:  # right
            new_y = min(y + 1, self.grid_size - 1)
        elif action == 1:  # left
            new_y = max(y - 1, 0)
        elif action == 2:  # up
            new_x = max(x - 1, 0)
        elif action == 3:  # down
            new_x = min(x + 1, self.grid_size - 1)

        # Check if the new position is a wall
        if (new_x, new_y) in self.wall_states:
            self.state = self.encode_state((x, y))
        else:
            self.state = self.encode_state((new_x, new_y))

        if (new_x, new_y) == self.big_treasure_state:
            reward = 10
        elif (new_x, new_y) in self.cliff_states:
            reward = -100
        elif (new_x, new_y) == self.small_treasure_state:
            reward = 1
        else:
            reward = 0

        done = False
        truncated = False

        return self.state, reward, done, truncated, {}

    def reset(self):
        self.state = self.encode_state(self.start_state)
        return self.state, {}

    def render(self, mode='human'):
        grid = np.zeros((self.grid_size, self.grid_size), dtype=str)
        grid[:] = '.'
        grid[self.big_treasure_state] = 'T'
        grid[self.small_treasure_state] = 't'
        for cliff in self.cliff_states:
            grid[cliff] = 'C'
        for wall in self.wall_states:
            grid[wall] = 'W'
        x, y = self.decode_state(self.state)
        grid[x, y] = 'J'
        print("\n".join(["".join(row) for row in grid]))
        print()

env = GridEnv()
state = env.reset()
env.render()

for _ in range(10):
    action = env.action_space.sample()  # Random action
    state, reward, done, truncated, _ = env.step(action)
    print(f"Action: {action}, Reward: {reward}")
    env.render()

.....
.W...
.WtW.
.....
JCCCT

Action: 3, Reward: 0
.....
.W...
.WtW.
.....
JCCCT

Action: 2, Reward: 0
.....
.W...
.WtW.
J....
.CCCT

Action: 1, Reward: 0
.....
.W...
.WtW.
J....
.CCCT

Action: 2, Reward: 0
.....
.W...
JWtW.
.....
.CCCT

Action: 1, Reward: 0
.....
.W...
JWtW.
.....
.CCCT

Action: 0, Reward: 0
.....
.W...
JWtW.
.....
.CCCT

Action: 3, Reward: 0
.....
.W...
JWtW.
.....
.CCCT

Action: 1, Reward: 0
.....
.W...
JWtW.
.....
.CCCT

Action: 1, Reward: 0
.....
.W...
JWtW.
.....
.CCCT

Action: 1, Reward: 0
.....
.W...
.WtW.
J....
.CCCT



In [2]:
import numpy as np
env = GridEnv()

def simulate_policy(env, policy, steps=10):
    """
    Simulate a policy in the given environment over a specified number of steps.

    Parameters:
    env (Env): The environment in which the policy is to be simulated.
    policy (function): A function that takes a state as input and returns an action.
    steps (int): The number of steps to simulate the policy for.

    Returns:
    list: A list representing the trajectory, i.e. (s_0, a_0, r_0, s_1, a_1, r_1,..., a_{t-1}, r_{t-1}, s_t).
    """
    state, _ = env.reset()
    trajectory = [state]
    for _ in range(steps):
        action = policy(state)
        next_state, reward, _, _, _ = env.step(action)
        trajectory.append(action)
        trajectory.append(reward)
        trajectory.append(next_state)
        state = next_state

    return trajectory

def random_policy(state):
    """
    Policy taking an action at random in [0, 4[.

    Parameters:
    state (int): the current state.

    Returns:
    action (int): a random action.
    """
    action = np.random.randint(4)
    return action

trajectory =  simulate_policy(env, random_policy, 100000)
print("trajectory:", trajectory)

trajectory: [20, 3, 0, 20, 1, 0, 20, 3, 0, 20, 1, 0, 20, 1, 0, 20, 0, -100, 21, 0, -100, 22, 1, -100, 21, 3, -100, 21, 3, -100, 21, 1, 0, 20, 3, 0, 20, 2, 0, 15, 3, 0, 20, 0, 0, 20, 3, 0, 20, 0, -100, 21, 1, 0, 20, 2, 0, 15, 1, 0, 15, 1, 0, 15, 3, 0, 20, 3, 0, 20, 0, -100, 21, 0, -100, 22, 2, 0, 17, 3, -100, 22, 0, -100, 21, 0, -100, 22, 1, -100, 21, 1, 0, 20, 3, 0, 20, 1, 0, 20, 2, -100, 21, 1, 0, 20, 3, 0, 20, 1, 0, 20, 2, 0, 15, 2, 0, 10, 1, 0, 10, 2, 0, 5, 0, 0, 5, 0, 0, 5, 0, 0, 5, 3, 0, 10, 0, 0, 5, 1, 0, 5, 1, 0, 5, 0, 0, 5, 2, 0, 0, 0, 0, 1, 2, 0, 1, 1, 0, 0, 3, 0, 5, 1, 0, 5, 3, 0, 5, 0, 0, 5, 3, 0, 10, 2, 0, 5, 1, 0, 5, 3, 0, 5, 2, 0, 0, 2, 0, 0, 3, 0, 5, 0, 0, 5, 2, 0, 5, 1, 0, 5, 1, 0, 5, 3, 0, 10, 0, 0, 10, 0, 0, 10, 1, 0, 10, 2, 0, 5, 3, 0, 10, 3, 0, 15, 0, 0, 15, 2, 0, 20, 3, 0, 20, 3, 0, 20, 1, 0, 20, 2, 0, 15, 3, 0, 20, 3, 0, 20, 1, 0, 20, 0, -100, 21, 1, 0, 20, 1, 0, 20, 3, -100, 21, 3, -100, 21, 0, -100, 22, 1, -100, 21, 1, 0, 20, 0, -100, 21, 2, 0, 16, 2, 0, 16, 1, 

## Question 1
Implement a routine which estimates $r(s,a)$ and $p(s'|s,a)$ from a given trajectory $h_t = (s_0, a_0, r_0, s_1, a_1, r_1, \ldots, a_{t-1}, r_{t-1}, s_t)$ produced by a random policy. Justify why your routine converges towards the true values for a growing trajectory ? 

Advice: take a trajectory large enough to estimate as close as possible the true mdp structure.

In [None]:
import numpy as np
from collections import defaultdict

def estimate_mdp_from_trajectory(trajectory):
    """
    Estimate the reward function r(s, a) and transition probabilities p(s'|s, a) from a given trajectory.
    Hint: use defaultdict structure.

    Parameters:
    trajectory (list): A list representing the trajectory, i.e. (s_0, a_0, r_0, s_1, a_1, r_1,..., a_{t-1}, r_{t-1}, s_t).

    Returns:
    estimated_rewards (dict): Estimated reward function with the structure {(s, a): mean_reward, ...}
    estimated_transitions (dict): Estimated transition probabilities with the strcuture {(s, a): {s_prime: probability, ...}, ...}.
    """
    rewards_sum = defaultdict(float)
    rewards_count = defaultdict(int)
    transition_count = defaultdict(lambda: defaultdict(int))

    # Trajectory pattern: [s0, a0, r0, s1, a1, r1, s2, ...]
    for i in range(0, len(trajectory) - 1, 3):
        s = trajectory[i]
        a = trajectory[i+1]
        r = trajectory[i+2]
        s_next = trajectory[i+3]
        rewards_sum[(s, a)] += r
        rewards_count[(s, a)] += 1
        transition_count[(s, a)][s_next] += 1

    estimated_rewards = {}
    estimated_transitions = {}
    for key in rewards_sum:
        estimated_rewards[key] = rewards_sum[key] / rewards_count[key]
        total = sum(transition_count[key].values())
        estimated_transitions[key] = {s_next: count/total for s_next, count in transition_count[key].items()}

    estimated_rewards = dict(sorted(estimated_rewards.items()))
    estimated_transitions = dict(sorted(estimated_transitions.items()))

    return estimated_rewards, estimated_transitions

# Testing function by setting `steps` large enough for estimates to converge
steps = 1_000_000
estimate_mdp_from_trajectory(simulate_policy(env, random_policy, steps))


({(0, 0): 0.0,
  (0, 1): 0.0,
  (0, 2): 0.0,
  (0, 3): 0.0,
  (1, 0): 0.0,
  (1, 1): 0.0,
  (1, 2): 0.0,
  (1, 3): 0.0,
  (2, 0): 0.0,
  (2, 1): 0.0,
  (2, 2): 0.0,
  (2, 3): 0.0,
  (3, 0): 0.0,
  (3, 1): 0.0,
  (3, 2): 0.0,
  (3, 3): 0.0,
  (4, 0): 0.0,
  (4, 1): 0.0,
  (4, 2): 0.0,
  (4, 3): 0.0,
  (5, 0): 0.0,
  (5, 1): 0.0,
  (5, 2): 0.0,
  (5, 3): 0.0,
  (7, 0): 0.04870584120321791,
  (7, 1): 0.04910325613790702,
  (7, 2): 0.05008347245409015,
  (7, 3): 0.847850531960903,
  (8, 0): 0.0,
  (8, 1): 0.0,
  (8, 2): 0.0,
  (8, 3): 0.0,
  (9, 0): 0.0,
  (9, 1): 0.0,
  (9, 2): 0.0,
  (9, 3): 0.0,
  (10, 0): 0.0,
  (10, 1): 0.0,
  (10, 2): 0.0,
  (10, 3): 0.0,
  (12, 0): 0.0,
  (12, 1): 0.0,
  (12, 2): 0.0,
  (12, 3): 0.0,
  (14, 0): 0.0,
  (14, 1): 0.0,
  (14, 2): 0.0,
  (14, 3): 0.0,
  (15, 0): 0.0,
  (15, 1): 0.0,
  (15, 2): 0.0,
  (15, 3): 0.0,
  (16, 0): -4.984984984984985,
  (16, 1): -5.251181515841064,
  (16, 2): -5.305383022774327,
  (16, 3): -84.79501816294758,
  (17, 0): -4.8982

> **Answer:** The routine converges towards the true values for a growing trajectory because as the trajectory grows, we are collecting more and more samples for each state-action pair, which make the averaging of the reward more and more accurate (*law of large numbers*).
>
> Since the random policy makes that all reachable state-action pairs are visited at least once and often enough for a long enough trajectory, the estimates become more and more accurate.

## Question 2
Compute $\widehat{Q}$ by using $\widehat{r}(s,a)$ and $\widehat{p}(s'|s,a)$ that estimate the MDP structure computed in question 1.  Explain the influence of the length of the trajectory on the quality of the approximation $\hat{Q}$. Derive $\widehat{\mu}^*$ from $\widehat{Q}$.

Display $V_N^{\widehat{\mu}^*}$ for each state $s$. Supposing you estimate the true $Q_N$ functions, determine the smallest N such that the bound $V^{\mu^*}-V^{\mu^*_N}$ is smaller than 0.1 ? Justify.

> **Answer:** *(Essentially the same as for previous question)* With more steps, we collect a larger sample for each state-action pair. By the law of large numbers, the estimates for the immediate reward $\hat{r}(s,a)$ and the transition probabilities $\hat{p}(s\prime |s,a)$ converge to their true values. This results in a more accurate computation of $\hat{Q}(s,a)$ using the Bellman updates. 
> 
> In contrast, a short trajectory can under-sample some state-action pairs, leading to higher variance and bias in $\hat{Q}$.

In [16]:
def estimated_q_n(estimated_rewards, estimated_transitions, gamma=0.95, N=500):
    """
    Compute the Q-values using the estimated rewards and transition probabilities.

    Parameters:
    estimated_rewards (dict): Estimated reward function.
    estimated_transitions (dict): Estimated transition probabilities.
    gamma (float): Discount factor.
    iterations (int): Number of iterations for convergence.

    Returns:
    dict: Estimated Q_N-values with the structure  {(s, a): value, ...}.
    """
    estimated_q_n = defaultdict(lambda: 0)

    for _ in range(N):
        new_q = defaultdict(lambda: 0)  # new dict for updated Q-values

        for (s, a) in estimated_rewards:
            """
            Modus operandi for each state-action pair:
                1. Get the reward for the current state-action pair
                2. Get the transition probabilities for the current state-action pair
                3. Initialize expected future reward
                4. Iterate over all possible next states and their probabilities
                    5. Find the best Q-value for the next state over all possible actions
                    6. Update the expected future reward
                7. Update the Q-value for the current state-action pair
            """
            r = estimated_rewards[(s, a)]   # 1.
            transitions = estimated_transitions.get((s, a), {}) # 2.
            expected_future = 0 # 3.

            for s_next, prob in transitions.items():    # 4.
                best_next = max([estimated_q_n[(s_next, a_next)] for a_next in range(4)])   # 5.
                expected_future += prob * best_next  # 6.

            new_q[(s, a)] = r + gamma * expected_future # 7.

        # Update Q-values
        estimated_q_n = new_q

    return dict(estimated_q_n)

def derive_optimal_policy(q_values):
    """
    Derive the optimal policy from Q-values.

    Parameters:
    q_values (dict): Q-values.

    Returns:
    dict: Optimal policy mapping states to actions with the structure  {s: a, ...}.
    """
    policy = {}
    # Get all states in the Q-val
    states = set(s for (s, a) in q_values.keys())
    for s in states:
        best_action = None
        best_val = -float('inf')
        for a in range(4):
            val = q_values.get((s, a), 0)
            if val > best_val:
                best_val = val
                best_action = a
        policy[s] = best_action
    return policy


def compute_value_function(q_values, policy):
    """
    Compute the value function from Q-values and a policy.

    Parameters:
    q_values (dict): Q-values.
    policy (dict): Policy mapping states to actions.

    Returns:
    dict: Value function for each state with the structure  {s: value, ...}.
    """
    value_function = {}

    for s, a in policy.items():
        value_function[s] = q_values.get((s, a), 0)

    return value_function

estimated_rewards, estimated_transitions = estimate_mdp_from_trajectory(simulate_policy(env, random_policy, 1000000))
q_values = estimated_q_n(estimated_rewards, estimated_transitions)

optimal_policy = derive_optimal_policy(q_values)
value_function = compute_value_function(q_values, optimal_policy)

print("Estimated Q-values:", q_values)
print("Optimal Policy:", optimal_policy)
print("Value Function:", value_function)

Estimated Q-values: {(0, 0): 54.997666329134, (0, 1): 52.25351348794192, (0, 2): 52.263957181474815, (0, 3): 49.688495873316455, (1, 0): 58.593721754639134, (1, 1): 52.96735197582985, (1, 2): 55.672319394085875, (1, 3): 55.688341434246325, (2, 0): 62.41042027103755, (2, 1): 56.57445214652311, (2, 2): 59.434894738990465, (2, 3): 62.09453251244978, (3, 0): 66.29134550147558, (3, 1): 60.22777117050706, (3, 2): 63.1865153414451, (3, 3): 66.20311716199839, (4, 0): 66.90430386994862, (4, 1): 63.79950296337557, (4, 2): 66.92710670384875, (4, 3): 70.40219840388995, (5, 0): 49.06162156273529, (5, 1): 49.026936339091925, (5, 2): 51.61355969309112, (5, 3): 46.73021469595781, (7, 0): 65.85738376000057, (7, 1): 62.466534139069566, (7, 2): 59.83922678321903, (7, 3): 60.40742770733627, (8, 0): 70.27942607282235, (8, 1): 63.234291796937434, (8, 2): 63.54197291743211, (8, 3): 66.58862232499546, (9, 0): 71.08405461842058, (9, 1): 67.48608202723466, (9, 2): 67.54732836264, (9, 3): 75.04572828359107, (10,

> **Answer:** Using the error bound for discounted value functions:
> 
> $||V^{µ^*} - V^{µ^*_N}||_\infty \leq \frac{2 \gamma^N B_r}{(1 - \gamma)^2}$
> 
> Determine the smallest $N$ such that the bound $V^{µ^*} - V^{µ^*_N} \leq 0.1$.

In [11]:
gamma = 0.95
B_r = 10  # constant bounding the reward function; suppose 10
N = 0
error_bound = 2 * gamma**N * B_r / ((1 - gamma)**2)
while error_bound > 0.1:
    N += 1
    error_bound = 2 * gamma**N * B_r / ((1 - gamma)**2)

print("Bound:", N)

Bound: 221


Display your optimal policy using the following function. Describe your result, is it coherent ?

In [10]:
from IPython.display import display, Math

def display_policy_as_latex_table(policy, grid_size):
    """
    Display the optimal policy as a LaTeX table with arrows.

    Parameters:
    policy (dict): Optimal policy mapping states to actions.
    grid_size (int): The size of the grid (assuming a square grid).

    Returns:
    None: Displays the LaTeX table in the notebook.
    """
    action_symbols = {
        0: r'\rightarrow',
        1: r'\leftarrow',
        2: r'\uparrow',
        3: r'\downarrow'
    }

    latex_table = "\\begin{array}{" + "c" * grid_size + "}\n"

    for i in range(grid_size):
        row = []
        for j in range(grid_size):
            state = i * grid_size + j
            action = policy.get(state, None)
            if action is not None:
                row.append(action_symbols[action])
            else:
                row.append("")

        latex_table += " & ".join(row) + " \\\\\n"

    latex_table += "\\end{array}"

    display(Math(latex_table))

display_policy_as_latex_table(optimal_policy, grid_size=5)

<IPython.core.display.Math object>

> **Answer**: results indeed seem coherent as we clearly see a trend toward reaching the bottom right cell, while evidently avoiding the three $-100$ ones. Also, as expected, we can't move through walls.

## Question 3

Sample a trajectory from a random uniform policy with a finite, large enough time horizon $T$. Motivate your choice of $T$. Implement a routine which iteratively updates $\widehat{Q}(s_k,a_k)$ using the Q-learning algorithm from this trajectory. Run your routine with a constant learning rate $\alpha = 0.05$. Derive directly $\widehat{\mu}^*$ from $\widehat{Q}$ and display this optimal policy. Display $V_N^{\widehat{\mu}^*}$ for each initial state $s$.

In [None]:
def q_learning_update(trajectory, alpha=0.05, gamma=0.95):
    """
    Compute the Q-values using the Q-learning algorithm.

    Parameters:
    estimated_rewards (dict): Estimated reward function.
    estimated_transitions (dict): Estimated transition probabilities.
    gamma (float): Discount factor.
    iterations (int): Number of iterations for convergence.

    Returns:
    dict: Estimated Q-values with the structure  {(s, a): value, ...}.
    """
    q_values = defaultdict(lambda: 0)

    # Process trajectory: [s0, a0, r0, s1, a1, r1, s2, ...]
    for i in range(0, len(trajectory) - 1, 3):
        s = trajectory[i]
        a = trajectory[i+1]
        r = trajectory[i+2]
        s_next = trajectory[i+3]
        best_next = max([q_values[(s_next, a_next)] for a_next in range(4)])
        q_values[(s, a)] = q_values[(s, a)] + alpha * (r + gamma * best_next - q_values[(s, a)])

    return dict(q_values)


env = GridEnv()
# Choose T large enough to cover many transitions.
# Motivation: Q-learning is model-free (i.e., it learns from experience). Hence, the more experience, the better.
T = 100_000
trajectory = simulate_policy(env, random_policy, T)
q_values = q_learning_update(trajectory)

optimal_policy = derive_optimal_policy(q_values)

display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)


<IPython.core.display.Math object>

value function {0: 70.25671916562904, 1: 74.3272351120088, 2: 80.18752516782561, 3: 85.86260629809891, 4: 91.1092183997245, 5: 65.50249465204058, 7: 85.92052356041404, 8: 91.53344714794952, 9: 98.32898334101262, 10: 67.39273808754936, 12: 82.83783677386535, 14: 107.5682629705798, 15: 73.78115287721866, 16: 81.19770471987222, 17: 85.23323279055731, 18: 99.23360888582559, 19: 115.01322031057641, 20: 63.09612519218288, 21: 62.197017218949796, 22: 70.3925734038888, 23: 103.47153999377268, 24: 111.24402868142266}


## Question 4

Implement an intelligent agent which learns $Q$ with Q-learning using an $\epsilon$-greedy policy, i.e. a policy that is greedy with a probability of $(1-\epsilon)$ and random otherwise.

Run the three following experimental protocols and display for each of them the derived optimal policy and their value function. Discuss the results.

The first experimental protocol is the following. The agent trains over $100$ episodes having $1000$ transitions. An episode always starts from the initial state. The learning rate $\alpha$ is equal to $0.05$ and the exploration rate $\epsilon$ is equal to $0.5$. The values of $\alpha$ and $\epsilon$ are both constant over time. The function $\hat{Q}$ is updated after every transition. The transitions are used only once for updating $\hat{Q}$ i.e. there are not stored in a replay buffer.

In [None]:
def epsilon_greedy_policy(q_values, state, epsilon):
    """
    Selects an action using the epsilon-greedy policy.

    Parameters:
    q_values (dict): A dictionary where keys are tuples of (state, action) and values are the estimated Q-values.
    state (tuple): The current state for which the action is to be selected.
    epsilon (float): The probability of selecting a random action (exploration). Should be between 0 and 1.

    Returns:
    int: The action selected according to the epsilon-greedy policy. With probability epsilon, a random action is chosen.
         With probability 1-epsilon, the action with the highest Q-value for the given state is chosen.
    """
    if np.random.rand() < epsilon:
        return np.random.randint(4)
    else:
        return np.argmax(q_values[(state, a_prime)] for a_prime in range(4))

def q_learning(env, episodes=100, alpha=0.05, epsilon=0.5, gamma=0.95):
    """
    Perform Q-learning to learn the optimal Q-values with the first protocol.

    Parameters:
    env (Env): The environment in which Q-learning is performed.
    episodes (int): The number of episodes to run Q-learning.
    alpha (float): The learning rate for updating Q-values.
    epsilon (float): The probability of selecting a random action (exploration) in epsilon-greedy policy.
    gamma (float): The discount factor for future rewards.

    Returns:
    dict: A dictionary containing the learned Q-values for state-action pairs.
    """
    q_values = defaultdict(lambda: 0)
    num_transitions = 1_000

    for _ in range(episodes):
        s, _ = env.reset()
        for _ in range(num_transitions):
            a = epsilon_greedy_policy(q_values, s, epsilon)
            s_next, r, _, _, _ = env.step(a)
            best_next = max([q_values[(s_next, a)] for a in range(4)])
            q_values[(s, a)] = q_values[(s, a)] + alpha * (r + gamma * best_next - q_values[(s, a)])
            s = s_next

    return q_values

env = GridEnv()
q_values = q_learning(env)
optimal_policy = derive_optimal_policy(q_values)
display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)


<IPython.core.display.Math object>

value function {0: 60.25425319847443, 1: 84.13369228319175, 2: 91.14530551610966, 3: 97.14485004817786, 4: 104.1072601923253, 5: 11.257538570529688, 7: 95.6739133091974, 8: 103.6840662606859, 9: 112.31187038721542, 10: 12.528771980665846, 12: 91.07856190534719, 14: 122.8542878856119, 15: 66.03790559221258, 16: 86.81942875958869, 17: 109.27587745561975, 18: 123.52452992365485, 19: 134.69603019987366, 20: 36.09636450500747, 21: 60.70091528982468, 22: 76.61888655593901, 23: 124.5692167869473, 24: 138.55371129512417}


The second experimental protocol differs from the first one only by the learning rate which is such that $\alpha_0 = 0.05$ and $\forall t > 0, \alpha_t = 0.8\alpha_{t-1}$.

In [None]:
def q_learning_2(env, episodes=100, alpha=0.05, epsilon=0.5, gamma=0.95):
    """
    Perform Q-learning to learn the optimal Q-values with the second protocol.

    Parameters:
    env (Env): The environment in which Q-learning is performed.
    episodes (int): The number of episodes to run Q-learning.
    alpha (float): The learning rate for updating Q-values.
    epsilon (float): The probability of selecting a random action (exploration) in epsilon-greedy policy.
    gamma (float): The discount factor for future rewards.

    Returns:
    dict: A dictionary containing the learned Q-values for state-action pairs.
    """
    q_values = defaultdict(lambda: 0)
    num_transitions = 1_000

    current_alpha = alpha
    for _ in range(episodes):
        s, _ = env.reset()
        for _ in range(num_transitions):
            a = epsilon_greedy_policy(q_values, s, epsilon)
            s_next, r, _, _, _ = env.step(a)
            best_next = max([q_values[(s_next, a)] for a in range(4)])
            q_values[(s, a)] = q_values[(s, a)] + current_alpha * (r + gamma * best_next - q_values[(s, a)])
            s = s_next
            current_alpha *= 0.8

    return q_values

env = GridEnv()
q_values = q_learning(env)
optimal_policy = derive_optimal_policy(q_values)
display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)

<IPython.core.display.Math object>

value function {0: 65.08300473111258, 1: 80.54293256759945, 2: 88.90829893475446, 3: 96.4748573144164, 4: 102.79510664246034, 5: 12.185538806513023, 7: 93.58073144206523, 8: 101.11588804335618, 9: 108.28086903055352, 10: 13.16184752915477, 12: 89.20319313686656, 14: 113.41731005374226, 15: 62.20157251508887, 16: 76.86642493415792, 17: 100.1913018293673, 18: 100.88921199854528, 19: 116.37393962224894, 20: 22.346648214753685, 21: 54.65815374220102, 22: 78.43935692980544, 23: 94.05500193437734, 24: 114.11497192908396}


In the following, a replay buffer is defined as a data structure of previously seen transitions i.e. $(s_t, a_t, r_t, s_{t+1})$. The third experimental protocol is identical from the first one except that the one-step system transitions are stored in a replay buffer and that at each time-step, the function $\hat{Q}$ is updated ten times by drawing ten transitions at random from the replay buffer.


In [15]:
def q_learning_3(env, episodes=100, alpha=0.05, epsilon=0.5, gamma=0.95):
    """
    Perform Q-learning to learn the optimal Q-values with the third protocol.

    Parameters:
    env (Env): The environment in which Q-learning is performed.
    episodes (int): The number of episodes to run Q-learning.
    alpha (float): The learning rate for updating Q-values.
    epsilon (float): The probability of selecting a random action (exploration) in epsilon-greedy policy.
    gamma (float): The discount factor for future rewards.

    Returns:
    dict: A dictionary containing the learned Q-values for state-action pairs.
    """
    q_values = defaultdict(lambda: 0)
    num_transitions = 1_000

    replay_buffer = []
    for _ in range(episodes):
        s, _ = env.reset()
        for _ in range(num_transitions):
            a = epsilon_greedy_policy(q_values, s, epsilon)
            s_next, reward, _, _, _ = env.step(a)
            replay_buffer.append((s, a, reward, s_next))

            # Update Q-values 10x through random sampling of replay_buffer
            for _ in range(10):
                s_sample, a_sample, r_sample, s_next_sample = replay_buffer[np.random.randint(len(replay_buffer))]
                best_next = max([q_values[(s_next_sample, a)] for a in range(4)])
                q_values[(s_sample, a_sample)] = q_values[(s_sample, a_sample)] + alpha * (r_sample + gamma * best_next - q_values[(s_sample, a_sample)])
            s = s_next

    return q_values

env = GridEnv()
q_values = q_learning(env)
optimal_policy = derive_optimal_policy(q_values)
display_policy_as_latex_table(optimal_policy, grid_size=5)

value_function = compute_value_function(q_values, optimal_policy)
print("value function", value_function)

<IPython.core.display.Math object>

value function {0: 62.442842680507994, 1: 85.26084824885662, 2: 97.92748532960624, 3: 102.56611519862085, 4: 108.86801229743699, 5: 12.460781162109303, 7: 100.4871935304105, 8: 107.30296922747208, 9: 115.65956748456436, 10: 10.686516186162008, 12: 93.23169562840219, 14: 122.63692018604529, 15: 67.78769957703048, 16: 77.21981008906258, 17: 95.77169153707179, 18: 116.97037099902793, 19: 128.815112775002, 20: 22.408870874008336, 21: 61.981539728965494, 22: 69.06188631302355, 23: 114.03167320158428, 24: 129.65429550721953}


> **Answer (Discuss the results):** The three protocols gives similar high‐level results, yet with subtle differences highlighting how learning parameters affect convergence and policy quality:
>
> - **1st Method (Constant $\alpha$ and $\varepsilon$):**  
>   The agent updates Q-values after every transition using a constant step size ($\alpha = 0.05$) and a high exploration rate ($\varepsilon = 0.5$). This produces a reasonable policy that tends to move right along the top rows before turning down, and the value function shows higher rewards in states likely to reach the big treasure. However, since every update is weighted equally, there may be more variance in the early stages of learning.
> 
> - **2nd Method (Decaying $\alpha$):**  
>   Here, the learning rate starts at $0.05$ but decays by a factor of $0.8$ with each transition. A decaying $\alpha$ tends to stabilize learning over time because the updates become smaller as we get more experience. This can reduce oscillations in Q-values and leads to a slightly different optimal policy (*for example, one of the arrows in the bottom row changes direction*) and value estimates that are a bit more conservative.
> 
> - **3rd Method (Replay Buffer with Multiple Updates):**  
>   By storing transitions and performing extra updates per time step (sampling from the replay buffer), the agent makes better use of its experience. This tends to smooth the learning process and can lead to improved sample efficiency. The optimal policy derived here shows minor changes (such as an extra downward move in the top row) compared to the first two methods. The value function is in a similar range, which indicates that the overall strategy is similar, but the replay mechanism might help in reinforcing rare but important transitions (like those avoiding cliffs, i.e., going up on the $+1$ cell).