# Q-Learning
This is very similar to TD(0), but also takes into account the actions available at the next state.

In [2]:
from utils import compress_state, generate_extreme_value_state_image_from_q_table
from collections import defaultdict
import numpy as np
import minari
from IPython.display import HTML
import uuid


def q_learning_offline(dataset_id, gamma=0.99, alpha=0.1, num_actions=7):
    """
    Estimates Q(s, a) using Q-learning from an offline dataset (tabular, no function approximation),
    and tracks the episode and timestep where each state was first seen.

    Parameters
    ----------
    dataset_id : str
        Minari dataset ID containing offline trajectories.

    gamma : float
        Discount factor.

    alpha : float
        Learning rate.

    num_actions : int
        Number of discrete actions in the environment.
    
    Methodology
    ----------
    1. Load the dataset using Minari.
    2. Initialize Q-values for each (state, action) pair to zero.
    3. For each episode in the dataset:
        - Iterate through each timestep.
        - Compress the state observation from the observation dictionary into a unique key.
        - Update Q-values using the Q-learning update rule.
        - Track the first occurrence of each state.
    4. Print statistics about the Q-value function estimates.
    5. Return the Q-value function and the state locations.


    Returns
    -------
    Q : dict
        A dictionary mapping (state, action) to Q-values.

    state_locations : dict
        A dictionary mapping state keys to (episode_index, timestep) of first occurrence.
    """
    dataset: minari.Dataset = minari.load_dataset(dataset_id)
    Q = defaultdict(float)
    state_locations: dict = {}  # maps state keys to (episode_index, timestep) of first occurrence

    for episode_idx, episode in enumerate(dataset.iterate_episodes()):
        observations = episode.observations
        rewards = episode.rewards
        actions = episode.actions

        for t in range(len(actions)):  # actions and rewards both have length T
            obs_t = {k: v[t] for k, v in observations.items()} # observations of current timestep t
            obs_tp1 = {k: v[t + 1] for k, v in observations.items()} # observations of next timestep t+1
            reward = rewards[t]
            action = actions[t]

            s_t = compress_state(obs_t) # State at time t, s_t
            s_tp1 = compress_state(obs_tp1) # State at time t+1, s_t+1

            if s_t not in state_locations:
                state_locations[s_t] = (episode_idx, t)

            # Q-learning TD target: max over next state's actions
            max_q_next = max(Q[(s_tp1, a)] for a in range(num_actions))

            # Q-learning update (off-policy)
            Q[(s_t, action)] += alpha * (reward + gamma * max_q_next - Q[(s_t, action)])

        # Track missing state at the end of the episode
        if s_tp1 not in state_locations:
            state_locations[s_tp1] = (episode_idx, len(actions))

    # Print Q-value stats
    q_values = np.array(list(Q.values()))
    print("Q-value function statistics:")
    print(f"  Count:       {len(q_values)}")
    print(f"  Min value:   {np.min(q_values):.4f}")
    print(f"  Max value:   {np.max(q_values):.4f}")
    print(f"  Mean value:  {np.mean(q_values):.4f}")
    print(f"  Std dev:     {np.std(q_values):.4f}")

    return Q, state_locations


In [3]:
dataset_id = "minigrid/BabyAI-Pickup/optimal-fullobs-v0"
output_path = "./minigrid/BabyAI-Pickup/optimal-fullobs-v0/q_learning/highest_value_function.png"

generate_extreme_value_state_image_from_q_table(
    dataset_id=dataset_id,
    output_path=output_path,
    q_fn_generator=q_learning_offline,
    highest=True
) # Generate image of highest Q-values

# === Display in notebook ===
cache_buster = uuid.uuid4().hex
HTML(f'<img src="{output_path}?v={cache_buster}" width="400">')

Q-value function statistics:
  Count:       365258
  Min value:   0.0000
  Max value:   0.0997
  Mean value:  0.0003
  Std dev:     0.0048
Selected state: ((2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 5, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 4, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 6, 5, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 6, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 10, 0, 2, 1, 0, 0, 1, 0, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0,

  from pkg_resources import resource_stream, resource_exists


In [None]:
dataset_id = "minigrid/BabyAI-Pickup/optimal-fullobs-v0"
output_path = "./minigrid/BabyAI-Pickup/optimal-fullobs-v0/q_learning/lowest_value_function.png"

generate_extreme_value_state_image_from_q_table(
    dataset_id=dataset_id,
    output_path=output_path,
    q_fn_generator=q_learning_offline,
    highest=False
) # Generate image of lowest Q-values

# === Display in notebook ===
cache_buster = uuid.uuid4().hex
HTML(f'<img src="{output_path}?v={cache_buster}" width="400">')

Q-value function statistics:
  Count:       365258
  Min value:   0.0000
  Max value:   0.0997
  Mean value:  0.0003
  Std dev:     0.0048
Selected state: ((2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 5, 2, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 7, 4, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 4, 3, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 6, 5, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 2, 5, 0, 2, 5, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 

- **Highest Q-value image**: state s where action a has the highest Q-value- agent can pick up the object


- **Lowest Q-value image**: state s where action a has the lowest Q-value- agent cannot pick up the object

## How do we create an optimal policy

Q-learning is an off-policy algorithm. That means that you don't need to follow a given policy while collecting the data. You can derive a policy from the extracted data.

In [2]:
import numpy as np
from collections import defaultdict

def extract_greedy_policy(Q: dict, num_actions: int, epsilon: float) -> dict:
    """
    Extracts an epsilon-greedy policy π(s) from a tabular Q-function.
    
    For each state s, the greedy action is selected as:
        π(s) = argmax_a Q(s, a)

    The returned policy assigns probability (1 - epsilon) + epsilon/num_actions to the best action,
    and epsilon/num_actions to all other actions.

    Parameters
    ----------
    Q : dict
        A dictionary mapping (state, action) pairs to Q-values.
    num_actions : int
        The total number of discrete actions in the environment.
    epsilon : float
        Probability of taking a random action (exploration rate).

    Returns
    -------
    policy : dict
        A dictionary mapping state keys to action probability vectors (np.ndarray).
    """
    state_action_values = defaultdict(lambda: [0.0] * num_actions)

    for (state, action), q_value in Q.items():
        state_action_values[state][action] = q_value

    policy = {}
    for state, q_values in state_action_values.items():
        best_action = int(np.argmax(q_values))  # greedy action
        prob_vector = np.full(num_actions, epsilon / num_actions)
        prob_vector[best_action] += (1.0 - epsilon)
        policy[state] = prob_vector

    return policy


But the approach above is only interesting if we are dealing with offline training. In online training, we would sometimes prefer to explore the state space over exploiting the results we already know are profitable.

This is what is called exploration-explation approach, and we apply an epsilon-greedy approach, where with a probability of epsilon we would explore one of the other suboptimal approaches.

In [14]:
def extract_epsilon_greedy_policy(Q: dict, num_actions: int, epsilon=0.1) -> dict:
    """
    Extracts an epsilon-greedy policy π(s) from a tabular Q-function.

    For each state s, the greedy action is selected as:
        π(s) = argmax_a Q(s, a)

    The resulting policy assigns a probability distribution over actions as follows:
    For each state s (with constant epsilon):

        π(a|s) = {
            1 - ε + ε/n,  if a = argmax_a Q(s, a)
            ε/n,          if a ≠ argmax_a Q(s, a)
        }

    Parameters
    ----------
    Q : dict(state, action) -> float
        Dictionary mapping (state, action) to Q-values.
    num_actions : int
        Total number of discrete actions in the environment.
    epsilon : float
        Probability of taking a random action (exploration rate).

    Returns
    -------
    pi : dict(state) -> np.ndarray
        Dictionary mapping states to action probability vectors as a probability distribution.
    """
    # Initialize policy with epsilon-greedy strategy
    # For each state, find the action with the highest Q-value and assign it a higher probability
    policy = {}

    for (state, action), q_value in Q.items():
        if state not in policy:
            policy[state] = (action, q_value)
        else:
            # Greedy update: for each state, if this action has a higher Q-value, update the policy
            if q_value > policy[state][1]:  # update if this action has a higher Q-value
                policy[state] = (action, q_value)

    # Convert to probabilities
    pi = {}
    for state, (best_action, _) in policy.items():
        pi[state] = np.full(num_actions, epsilon / num_actions)
        pi[state][best_action] += 1.0 - epsilon

    return pi  # state -> action probability vector


In [13]:
def extract_epsilon_greedy_policy_episode(Q: dict, num_actions: int, episode: int,
                                   epsilon_start=1.0, epsilon_min=0.01, epsilon_decay=0.995) -> dict:
    """
    Extracts an epsilon-greedy policy π(s) from a tabular Q-function,
    with epsilon decreasing over episodes.

    Parameters
    ----------
    Q : dict(state, action) -> float
        Dictionary mapping (state, action) to Q-values.
    num_actions : int
        Total number of discrete actions in the environment.
    episode : int
        Current episode number, used to decay epsilon.
    epsilon_start : float
        Initial epsilon value.
    epsilon_min : float
        Minimum value for epsilon.
    epsilon_decay : float
        Multiplicative decay factor per episode.

    Returns
    -------
    pi : dict(state) -> np.ndarray
        Dictionary mapping states to action probability vectors as a probability distribution.
    """

    # Decay epsilon based on episode number
    epsilon = max(epsilon_min, epsilon_start * (epsilon_decay ** episode))

    # Extract greedy actions
    policy = {}
    for (state, action), q_value in Q.items():
        if state not in policy or q_value > policy[state][1]:
            policy[state] = (action, q_value)

    # Convert to epsilon-greedy probability distributions
    pi = {}
    for state, (best_action, _) in policy.items():
        pi[state] = np.full(num_actions, epsilon / num_actions)
        pi[state][best_action] += 1.0 - epsilon

    return pi
