<a href="https://colab.research.google.com/github/mohamedyosef101/rl-algorithms/blob/main/Q-Learning/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Q-Learning Algorithm
At first, I assume that you have alread read my article titled "[Reinforcement Learning: All the Basics](https://mohamedyosef101.github.io/publication/rl101)" or you know what is RL and Q-learning.


In [1]:
# libraries we will use
import numpy as np
import random

I'll use `numpy` for its ability to work with matices and arrays. While `random` is simply used to generate random number to tell us when to explore and when to exploit (a simple solution for [exploration/exploitation trade-off](https://mohamedyosef101.github.io/publication/rl101/#13-the-explorationexploitation-trade-off)).

## Step 0. Set up the environment

We wil use a basic 5 by 5 grid world as our environment. Our agent should reach the goal in point (4, 4) without goint to any of the obstacles in the grid.

In [2]:
# Define the grid world
grid_size = (5, 5)
goal_state = (4, 4)
start_state = (0, 0)
obstacles = [(1, 1), (2, 2), (3, 3)]
actions = ['up', 'down', 'right', 'left']
action_space = [0, 1, 2, 3]
# I have turned actions from str to int for easier calculations

Now, let's show you how our grid world looks like...

In [3]:
def print_grid(state, goal_state, obstacles):
  """Function to print the grid with the agent's position.
  """
  grid = np.zeros(grid_size, dtype=str)
  grid[:] = '.'
  grid[goal_state] = 'G'
  for obs in obstacles:
    grid[obs] = 'X'
  grid[state] = 'A'
  for row in grid:
    print(' '.join(row))
  print()

In [4]:
print_grid((0, 0), goal_state, obstacles)

A . . . .
. X . . .
. . X . .
. . . X .
. . . . G



## Step 1. Environment dynamics
Here, we will say how the environment works and responses to the agent. I tried to make this code similar to what you can get in [Gymnasium](https://gymnasium.farama.org/api/env/) so you can also work with Gym enviroments if you need (and you will need it, trust me!).

In [5]:
def take_step(state, action, max_steps, step_count):
    """
    Takes the current state and the agent's action and returns
    the next state, reward, and tell if the process terminated or truncated,
    also it might give you some more info represented in a dictionary.

    Args:
        state (tuple): Current position of the agent (row, col).
        action (str): One of ['up', 'down', 'left', 'right'].
        max_steps (int): The max number of steps taken in each episode
        it is recommended to use the same max_steps in training
        step_count (int): count how many steps has the agent taken so far
        I put it here just to determin if the process truncated or not.

    Returns:
        next_state (tuple): New position of the agent.
        reward (float): Reward received for this action.
        terminated (bool): Whether the goal reached or obstacle hit.
        truncated (bool): Whether the episode was truncated (e.g., due to a steps limit).
        info (dict): Additional information (optional, can be empty for now).
    """

    terminated = False
    truncated = False
    info = {}

    # --- Calculate the potential next state based on the action ---
    x = 0
    y = 0
    x, y = state
    if action == 0: # up
        x = max(0, x - 1)
    elif action == 1: # down
        x = min(grid_size[0] - 1, x + 1)
    elif action == 2: # right
        y = min(grid_size[0] - 1, y + 1)
    elif action == 3: # left
        y = max(0, y - 1)
    next_state = (x, y)

    # --- Check for Obstacles and Goal ---
    if next_state in obstacles: # Obstacle state
        terminated = True
        reward = -100.0
    elif next_state == goal_state:
        terminated = True
        reward = 100.0
    else:
      reward = -1.0

    # --- Check for Truncation (max steps reached) ---
    if step_count >= max_steps and not terminated:
        truncated = True

    return next_state, reward, terminated, truncated, info

I have create the reward system so that the agent gets a reward of 100 in case of reaching the goal and -100 in case of reaching an obstacle while -1 otherwise.

## Step 2. Initialize Q-Table

Q-learning is a model-free, off-policy reinforcement learning algorithm. So, rather than learning a [value estimate](https://mohamedyosef101.github.io/publication/rl101/#14-two-approaches-for-solving-rl-problems), we learn action-value or the quality of the action, $Q(s,a)$, and to get the expected future reward of starting in state $s_t$ and taking action $a_t$ by using a [Temporal Difference](https://mohamedyosef101.github.io/publication/rl101/#24-temporal-difference-learning) (TD) learning approach and [Bellman equation](https://mohamedyosef101.github.io/publication/rl101/#21-the-bellman-equations) to optimize the value function:

$$
Q(s_t, a_t) \gets Q(s_t, a_t) + \alpha [R_{t+1} +
\gamma \max_a Q(s_{t+1}, a) - Q(s_t, a_t)]
$$

where $\alpha$ is the learning rate, $\gamma$ is the discount factor.

In [6]:
def Qtable0(grid_size, action_space):
  Qtable = np.zeros((grid_size[0], grid_size[1], len(action_space)))
  return Qtable

# create it for our environment
grid_Qtable = Qtable0(grid_size, action_space)

## Step 3. Epsilon-Greedy Policy

Based on the [exploration/exploitation trade-off](https://mohamedyosef101.github.io/publication/rl101/#13-the-explorationexploitation-trade-off), the agent doesn't know when to explore and when to exploit. Therefore, Q-learning often uses an epsilon-greedy policy (ϵ-greedy) in which the agent exploits '1-ϵ' times by taking the best action according to the agent information and explore ϵ times by taking a random action.

The greedy policy which we will use to exploit is used to select the action with the best Q-value in the Q-table.

$$
\text{greedy policy : } \\
\pi^*(s) = \arg \max_a Q^*(s,a)
$$

In [7]:
def greedy_policy(Qtable, state):
  action = np.argmax(Qtable[state][:])
  return action

In [8]:
def epsilon_greedy_policy(Qtable, state, epsilon):
  random_num = random.uniform(0, 1)
  if random_num > epsilon:
    action = greedy_policy(Qtable, state)
  else:
    action = random.choice(action_space)
  return action

## Step 4. Define parameters

In [9]:
alpha = 0.1   # Learning rate
gamma = 0.99  # Dicount factor
epsilon = 0.1   # Exploration rate
max_steps = 25  # max number of steps per episode

num_episodes = 1000  # Number of training episodes
max_epsilon = 1.0
min_epsilon = 0.05
decay_rate = 0.005

eval_episodes = 100   # Number of evaluation episodes

As the training continues, we progressively **reduce the epsilon value since we will need less and less exploration and more exploitation**.

$$
\epsilon = \epsilon_{\min} + (\epsilon_{\max} - \epsilon_\min)\ \times \ e^{- \text{ decay rate} \times \text{episode}}
$$

## Step 5. Start the training

---
**Algorithm**: Q-Learning

---

**Input**: num_episodes (positive int), learning rate $\alpha$ (small positive fraction), Discount factor $\gamma$ (a fraction close to 1),
 $\epsilon_\min$ (small positive fraction), $\epsilon_\max$ (fraction close to 1), decay_rate (very small positive fraction), max_steps (positive int), Qtable (array of q-values).

 <br>

**Output**: value function $Q$ ($\approx q_\pi$ if num_episodes is large enough)

<br>

Initialize Q arbitrarily (e.g., $Q(s,a) = 0$ for all $s \in \mathcal S$ and $a \in \mathcal A(s)$, and $Q(\text{terminal-state}, \cdot) = 0)$

* **for** $i \gets 1$ **to** num_episodes **do** <br>
  * $\epsilon \gets \epsilon_{\min} + (\epsilon_{\max} - \epsilon_\min)\ \times \ e^{- \text{ decay rate} \times \text{episode}}$
  * Observe $s_0$
  * $t \text{ (or step)} \gets 0$
  * **repeat**
    * Choose action $a_t$ using policy derived from Q (e.g., epsilon-greedy, greedy policy)
    * Take action $a_t$ and observe $r_{t+1}, s_{t+1}$
    * $Q_{\text{new}}(s_t, a_t) \gets Q(s_t, a_t) + \alpha [r_{t+1} + \gamma \max_a Q(s_{t+1}, a) - Q(s_t, a_t)]$
    * $t \gets t+1$
  * **until** $s_t$ is terminal;
* **EndFor**
* **return** $Q$
---

In [10]:
def train(num_episodes, alpha, gamma, max_epsilon,
          min_epsilon, decay_rate, max_steps, Qtable):
  for episode in range(num_episodes):

    # reduce epsilon
    epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp( - decay_rate)

    # reset the environment
    step = 0
    state = start_state
    terminated = False
    truncated = False

    for step in range(max_steps):
      action = epsilon_greedy_policy(Qtable, state, epsilon)
      new_state, reward, terminated, truncated, info = take_step(state, action, max_steps, step)

      # update Qtable
      Qtable[state][action] = Qtable[state][action] + alpha * (
          reward + gamma * np.max(Qtable[new_state]) - Qtable[state][action]
      )

      if terminated or truncated:
        break
      state = new_state
      step += 1
    episode += 1
  return Qtable

In [11]:
%%time
Qtable = train(num_episodes, alpha, gamma, max_epsilon,
          min_epsilon, decay_rate, max_steps, grid_Qtable)

CPU times: user 131 ms, sys: 1.01 ms, total: 132 ms
Wall time: 141 ms


# Step 6. Evaluate the result

In [12]:
def eval_agent(max_steps, eval_episdoes, Qtable):
  episode_rewards = []
  for episode in range(eval_episodes):
    state = start_state
    step = 0
    truncated = False
    terminated = False
    total_rewards_ep = 0

    for step in range(max_steps):
      action = greedy_policy(Qtable, state)
      new_state, reward, terminated, truncated, info = take_step(state, action, max_steps, step)
      total_rewards_ep += reward
      if terminated or truncated:
        break
      state = new_state
    episode_rewards.append(total_rewards_ep)
    episode += 1
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)
  return mean_reward, std_reward

In [13]:
mean_reward, std_reward = eval_agent(max_steps, eval_episodes, Qtable)
print(f"Mean reward = {mean_reward:.2f} +/- {std_reward:.2f}")

Mean reward = 93.00 +/- 0.00


This result means that our agent consistently reaches the goal within 7 moves or steps (`100 - 7 = 93`) and `+\- 0.00` mean that there is no standard divation which means there is no variation in the agent's performance across episodes. This consistent performance implies deterministic behavior, leading to the same outcome each time the environment is navigated.