# Demo: Reinforcement Learning Gym


We will learn how to use [*Gymnasium*](https://gymnasium.farama.org/), an open framework to evaluate RL algorithms on different environments. We will train a simple Q-learning agent for the [Taxi environment](https://gymnasium.farama.org/environments/toy_text/taxi/). You can find more tutorials and infos how to [train a RL agent](https://gymnasium.farama.org/introduction/train_agent/) on the Gymnasium website.

## Taxi environment

<img src='./img/taxi_env.png' height=300></img>

The RL agent has to navigate a grid world, pick up a passenger at one of 4 possible locations and drop the passenger off at one of 4 possible locations.

**Action space (6):**
  - Move up, down, left, right
  - Pickup passenger
  - Drop-off passenger
  
**States (500):**
  - Taxi position (25)
  - Passenger location (5)
  - Drop off location (4)
  
**Rewards:**
  - -1 per step (if no other rewards)
  - +20 for delivering passenger
  - -10 for wrong pickup/drop-off actions

In [None]:
from collections import defaultdict
import gymnasium as gym
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from tqdm import tqdm
from matplotlib.animation import ArtistAnimation
from IPython.display import HTML

from taxi_gym_utils import *

matplotlib.rcParams['animation.embed_limit'] = 1000

### Taxi agent

We first have to create the taxi agent. We will use the famous Q-Learning algorithm:


$$
Q(s, a) = Q(s, a) + \alpha [r + \gamma \max_{a'} Q(s', a') - Q(s, a)]
$$

$Q(s, a)$ &emsp; Q-value \
$a$ &emsp; action \
$s$ &emsp; state \
$r$ &emsp; reward \
$\alpha \in [0, 1]$ &emsp; learning rate \
$\gamma$ &emsp; discount factor \
$\max_{a'} Q(s', a')$ &emsp; maximum Q-value for all actions $a'$ in the next state $s'$

**Goal**: find the *optimal policy* $\pi^*$ that, in every state $s$, takes the action $a$ that *maximizes* $Q$.

To learn the optimal policy, the RL agent has to *exploit* good actions while also *exploring* the environment by taking actions with unknow outcome. This is called the *exploration-exploitation dilemma*. In the algorithm, the willingness of the RL agent to explore new states is controlled by the constant $\epsilon$: it defines the probability that the agent takes a random action. The action with highest expected return is taken with probability $1 - \epsilon$ (exploitation).

In [None]:
class TaxiAgent:
    def __init__(
        self,
        env: gym.Env,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            env: The training environment
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.env = env
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, obs: tuple[int, int, bool]) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return int(np.argmax(self.q_values[obs]))

    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Updates the Q-value of an action."""
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

Initialize environment and agent:

In [None]:
# === hyperparameters ===
learning_rate = 0.01
n_episodes = 10000
max_steps = 200
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1
# =======================

# initialize environment
env = gym.make("Taxi-v3", max_episode_steps=max_steps, render_mode="rgb_array")
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=n_episodes)
env.reset()

# create agent
agent = TaxiAgent(
    env=env,
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

Run training loop:

In [None]:
obs, info = env.reset() # reset environment
frames_first_ep = []
frames_last_ep = []
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False

    # play one episode
    while not done:
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        if episode == 0:
            frame = env.env.render()
            frames_first_ep.append(frame)
        elif episode == n_episodes - 1:
            frame = env.env.render()
            frames_last_ep.append(frame)

        # update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # stop if goal is achieved or max_steps is reached
        done = terminated or truncated
        obs = next_obs

    agent.decay_epsilon()

In [None]:
plot_training_statistics(env, agent)

Performance without training:

In [None]:
%matplotlib notebook
fig, ax = plt.subplots(1, 1)
fig.set_frameon(False)
ax.set_axis_off()
fig.tight_layout()
ims = [[ax.imshow(f)] for f in frames_first_ep]
ani = ArtistAnimation(fig, ims, interval=100, blit=True)
HTML(ani.to_jshtml())

Performance after 10000 epochs:

In [None]:
%matplotlib notebook
fig, ax = plt.subplots(1, 1)
fig.set_frameon(False)
ax.set_axis_off()
fig.tight_layout()
ims = [[ax.imshow(f)] for f in frames_last_ep]
ani = ArtistAnimation(fig, ims, interval=100, blit=True)
HTML(ani.to_jshtml())