# Reinforcement Learning - an introduction (Part 2)

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/paolodeangelis/Sistemi_a_combustione/blob/main/4.2-Reinforcement_Learning_P2.ipynb)

# **1. Setup**


### **Install Packages**

In [1]:
# Install necessary packages
!apt install swig cmake ffmpeg xvfb python3-opengl
!pip install stable-baselines3==2.0.0a5 gymnasium[box2d] huggingface_sb3 pyvirtualdisplay imageio[ffmpeg]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
python3-opengl is already the newest version (3.1.5+dfsg-1).
swig is already the newest version (4.0.2-1ubuntu1).
cmake is already the newest version (3.22.1-1ubuntu1.22.04.1).
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.2).
0 upgraded, 0 newly installed, 0 to remove and 15 not upgraded.


The Next Cell will force the notebook runtime to restart. This is to ensure all the new libraries installed will be used.

In [None]:
import os
os.kill(os.getpid(), 9)

### **Start Virtual Display**

In [1]:
from pyvirtualdisplay import Display
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

<pyvirtualdisplay.display.Display at 0x7866a9b7d7b0>

# Model 2: [Frozen Lake](https://gymnasium.farama.org/environments/toy_text/frozen_lake/)

### Step1: Environment and Parameter Setup
load libraries

In [2]:
from pathlib import Path
from typing import NamedTuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map

In this section, we define the hyperparameters and configuration parameters using a `NamedTuple` called `Params`. These parameters include the total number of episodes, learning rate, discounting rate (gamma), exploration probability (epsilon), map size, seed for reproducibility, slippery environment flag, number of runs, action size, state size, probability of tiles being frozen, and the folder where plots will be saved.

We also set the random number generator (`rng`) seed and create the FrozenLake environment using Gym. The environment's description is generated with random frozen tiles based on the specified probability (`proba_frozen`). The action and state sizes are determined based on the environment's properties.

In [3]:
# Define hyperparameters and configuration parameters using a NamedTuple
class Params(NamedTuple):
    total_episodes: int  # Total episodes
    learning_rate: float  # Learning rate
    gamma: float  # Discounting rate
    epsilon: float  # Exploration probability
    map_size: int  # Number of tiles on one side of the squared environment
    seed: int  # Define a seed for reproducible results
    is_slippery: bool  # Slippery environment flag
    n_runs: int  # Number of runs
    action_size: int  # Number of possible actions
    state_size: int  # Number of possible states
    proba_frozen: float  # Probability that a tile is frozen
    savefig_folder: Path  # Root folder to save plots

# Initialize hyperparameters and configuration parameters
params = Params(
    total_episodes=5000,
    learning_rate=0.5,
    gamma=0.95,
    epsilon=0.1,
    map_size=6,
    seed=123,
    is_slippery=False,
    n_runs=4,
    action_size=None,
    state_size=None,
    proba_frozen=0.9,
    savefig_folder=Path("../../_static/img/tutorials/"),
)

# Set the seed
rng = np.random.default_rng(params.seed)

# Create the figure folder if it doesn't exist
params.savefig_folder.mkdir(parents=True, exist_ok=True)

# Create the FrozenLake environment with specified parameters
env = gym.make(
    "FrozenLake-v1",
    is_slippery=params.is_slippery,
    render_mode="rgb_array",
    desc=generate_random_map(
        size=params.map_size, p=params.proba_frozen, seed=params.seed
    ),
)

params = params._replace(action_size=env.action_space.n)
params = params._replace(state_size=env.observation_space.n)


### Step 2: Q-Learning Agent Definition

In this section, we define the Q-learning agent class (`Qlearning`). The agent is initialized with learning rate, discounting rate (gamma), state size, and action size as parameters. The Q-table is initialized with zeros.

The `update` method implements the Q-learning update equation to update Q-values based on the difference between the current estimate and actual rewards. This equation is expressed as:

$$
Q(s, a) \leftarrow Q(s, a) + \alpha \cdot \left( R(s, a) + \gamma \cdot \max_{a'} Q(s', a') - Q(s, a) \right)
$$

Where:
- $ Q(s, a) $ is the current estimate of the Q-value for state 's' and action 'a.'
- $ \alpha $ is the learning rate.
- $ R(s, a) $ is the immediate reward received after taking action 'a' in state 's.'
- $ \gamma $ is the discounting rate.
- $ \max_{a'} Q(s', a') $ is the maximum Q-value among possible actions in the next state 's'.

The `reset_qtable` method initializes the Q-table with zeros.

In [4]:
class Qlearning:
    def __init__(self, learning_rate, gamma, state_size, action_size):
        """
        Initialize the Q-learning agent.

        Args:
            learning_rate (float): The learning rate.
            gamma (float): The discounting rate.
            state_size (int): Number of possible states.
            action_size (int): Number of possible actions.
        """
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.reset_qtable()

    def update(self, state, action, reward, new_state):
        """
        Update the Q-table using the Q-learning algorithm.

        Args:
            state (int): Current state.
            action (int): Chosen action.
            reward (float): Reward received.
            new_state (int): New state after taking action.

        Returns:
            float: Updated Q-value.
        """
        delta = (
            reward
            + self.gamma * np.max(self.qtable[new_state, :])
            - self.qtable[state, action]
        )
        q_update = self.qtable[state, action] + self.learning_rate * delta
        return q_update

    def reset_qtable(self):
        """Reset the Q-table."""
        self.qtable = np.zeros((self.state_size, self.action_size))

### Step 3: Epsilon-Greedy Exploration Strategy

In this section, we define the Epsilon-Greedy exploration strategy class (`EpsilonGreedy`). The strategy is initialized with an exploration probability `epsilon`. The `choose_action` method implements the Epsilon-Greedy strategy to select an action based on whether exploration or exploitation should be performed.

Exploration: With probability `epsilon`, a random action is chosen.
Exploitation: With probability `1 - epsilon`, the action with the highest Q-value in the current state is selected. If there are multiple actions with the same maximum Q-value, one is chosen randomly.

In [5]:
class EpsilonGreedy:
    def __init__(self, epsilon):
        """
        Initialize the Epsilon-Greedy exploration strategy.

        Args:
            epsilon (float): Exploration probability.
        """
        self.epsilon = epsilon

    def choose_action(self, action_space, state, qtable):
        """
        Choose an action based on the Epsilon-Greedy strategy.

        Args:
            action_space: Action space of the environment.
            state (int): Current state.
            qtable: Q-table of the agent.

        Returns:
            int: Chosen action.
        """
        explor_exploit_tradeoff = rng.uniform(0, 1)

        if explor_exploit_tradeoff < self.epsilon:


            action = action_space.sample()
        else:
            if np.all(qtable[state, :]) == qtable[state, 0]:
                action = action_space.sample()
            else:
                action = np.argmax(qtable[state, :])
        return action

### Step 4: Training Loop

In this section, we execute the training loop. The outer loop (`run`) runs multiple times to account for stochasticity. Within each run, we reset the Q-table using `learner.reset_qtable()`.

The inner loop (`episode`) represents each training episode. We reset the environment, initialize step count and total rewards, and enter the episode loop. In the episode loop, we select actions using the Epsilon-Greedy strategy, perform actions in the environment, and update the Q-table using the Q-learning update equation.

We log the rewards, steps, and Q-tables for analysis. The `tqdm` progress bar is used to visualize the progress of training episodes within each run.

### Section 5: Analysis and Visualization

In this section, you can perform analysis and visualization of training results using the logged data (e.g., rewards, Q-tables).

In [6]:
learner = Qlearning(
    learning_rate=params.learning_rate,
    gamma=params.gamma,
    state_size=params.state_size,
    action_size=params.action_size,
)
explorer = EpsilonGreedy(
    epsilon=params.epsilon,
)

# Initialize arrays to store rewards, steps, episodes, and Q-tables
rewards = np.zeros((params.total_episodes, params.n_runs))
steps = np.zeros((params.total_episodes, params.n_runs))
episodes = np.arange(params.total_episodes)
qtables = np.zeros((params.n_runs, params.state_size, params.action_size))
all_states = []
all_actions = []

# Training loop
for run in range(params.n_runs):  # Run multiple times for stochasticity
    learner.reset_qtable()  # Reset Q-table between runs

    for episode in tqdm(
        episodes, desc=f"Run {run}/{params.n_runs} - Episodes", leave=False
    ):
        state = env.reset(seed=params.seed)[0]  # Reset the environment
        step = 0
        done = False
        total_rewards = 0

        while not done:
            action = explorer.choose_action(
                action_space=env.action_space, state=state, qtable=learner.qtable
            )

            all_states.append(state)
            all_actions.append(action)

            new_state, reward, terminated, truncated, info = env.step(action)

            done = terminated or truncated

            learner.qtable[state, action] = learner.update(
                state, action, reward, new_state
            )

            total_rewards += reward
            step += 1

            state = new_state

        rewards[episode, run] = total_rewards
        steps[episode, run] = step
    qtables[run, :, :] = learner.qtable



### Visualization

In [7]:
import gymnasium as gym
from gymnasium.utils.save_video import save_video
env = gym.make(
    "FrozenLake-v1",
    is_slippery=params.is_slippery,
    render_mode="rgb_array_list",
    desc=generate_random_map(
        size=params.map_size, p=params.proba_frozen, seed=params.seed
    ),
)
_ = env.reset()
step_starting_index = 0
episode_index = 0
explorer.epsilon = 0.0
for step_index in range(199):
    action = explorer.choose_action(
        action_space=env.action_space, state=state, qtable=learner.qtable
    )

    # Log all states and actions
    all_states.append(state)
    all_actions.append(action)

    learner.qtable[state, action] = learner.update(
        state, action, reward, new_state
    )
    # Take the action (a) and observe the outcome state(s') and reward (r)
    new_state, reward, terminated, truncated, info = env.step(action)
    # Our new state is state
    state = new_state
    _, _, terminated, truncated, _ = env.step(action)

    if terminated or truncated:
        save_video(
            env.render(),
            "videos",
            fps=env.metadata["render_fps"],
            step_starting_index=step_starting_index,
            episode_index=episode_index
        )
        step_starting_index = step_index + 1
        episode_index += 1
        env.reset()
env.close()


Moviepy - Building video /content/videos/rl-video-episode-0.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-0.mp4
Moviepy - Building video /content/videos/rl-video-episode-1.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-1.mp4



                                                   

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-1.mp4




Moviepy - Building video /content/videos/rl-video-episode-8.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-8.mp4





Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-8.mp4
Moviepy - Building video /content/videos/rl-video-episode-27.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-27.mp4



                                                   

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-27.mp4




In [8]:
from IPython.display import HTML
from base64 import b64encode
mp4 = open('/content/videos/rl-video-episode-0.mp4','rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=600 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)

# Model 3: [Blackjack](https://gymnasium.farama.org/environments/toy_text/blackjack/)

### Step1: Environment and Parameter Setup
load libraries

In [12]:
from pathlib import Path
from typing import NamedTuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import gymnasium as gym

In this section, we define the hyperparameters and configuration parameters using a `NamedTuple` called `Params`. These parameters include the total number of episodes, learning rate, discounting rate (gamma), exploration probability (epsilon), map size, seed for reproducibility, slippery environment flag, number of runs, action size, state size, probability of tiles being frozen, and the folder where plots will be saved.

We also set the random number generator (`rng`) seed and create the FrozenLake environment using Gym. The environment's description is generated with random frozen tiles based on the specified probability (`proba_frozen`). The action and state sizes are determined based on the environment's properties.

In [33]:
# Define hyperparameters and configuration parameters using a NamedTuple
class Params(NamedTuple):
    total_episodes: int  # Total episodes
    learning_rate: float  # Learning rate
    seed: int  # Define a seed for reproducible results
    n_runs: int  # Number of runs
    action_size: int  # Number of possible actions
    state_size: int  # Number of possible states
    proba_frozen: float  # Probability that a tile is frozen
    savefig_folder: Path  # Root folder to save plots
    n_episodes : int
    start_epsilon : float
    epsilon_decay : float
    final_epsilon : float

# Initialize hyperparameters and configuration parameters
start_epsilon = 1.0
n_episodes = 100_000
params = Params(
    total_episodes=5000,
    learning_rate = 0.01,
    n_episodes = n_episodes,
    start_epsilon = start_epsilon,
    epsilon_decay = start_epsilon / (n_episodes / 2),  # reduce the exploration over time
    final_epsilon = 0.1,
    seed=123,
    n_runs=4,
    action_size=None,
    state_size=None,
    proba_frozen=0.9,
    savefig_folder=Path("../../_static/img/tutorials/"),
)

# Set the seed
rng = np.random.default_rng(params.seed)

# Create the figure folder if it doesn't exist
params.savefig_folder.mkdir(parents=True, exist_ok=True)

# Create the FrozenLake environment with specified parameters
env = gym.make('Blackjack-v1',render_mode="rgb_array", natural=False, sab=False)

params = params._replace(action_size=env.action_space.n)
params = params._replace(state_size=env.observation_space)


### Step 2: Q-Learning Agent Definition

In this section, we define the Q-learning agent class (`Qlearning`). The agent is initialized with learning rate, discounting rate (gamma), state size, and action size as parameters. The Q-table is initialized with zeros.

The `update` method implements the Q-learning update equation to update Q-values based on the difference between the current estimate and actual rewards. This equation is expressed as:

$$
Q(s, a) \leftarrow Q(s, a) + \alpha \cdot \left( R(s, a) + \gamma \cdot \max_{a'} Q(s', a') - Q(s, a) \right)
$$

Where:
- $ Q(s, a) $ is the current estimate of the Q-value for state 's' and action 'a.'
- $ \alpha $ is the learning rate.
- $ R(s, a) $ is the immediate reward received after taking action 'a' in state 's.'
- $ \gamma $ is the discounting rate.
- $ \max_{a'} Q(s', a') $ is the maximum Q-value among possible actions in the next state 's'.

The `reset_qtable` method initializes the Q-table with zeros.

In [34]:
from collections import defaultdict


class BlackjackAgent:
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, obs: tuple[int, int, bool]) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return env.action_space.sample()

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return int(np.argmax(self.q_values[obs]))

    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple[int, int, bool],
    ):
        """Updates the Q-value of an action."""
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [35]:
agent = BlackjackAgent(
    learning_rate=params.learning_rate,
    initial_epsilon=params.start_epsilon,
    epsilon_decay=params.epsilon_decay,
    final_epsilon=params.final_epsilon,
)

### Step 4: Training Loop

In this section, we execute the training loop. The outer loop (`run`) runs multiple times to account for stochasticity. Within each run, we reset the Q-table using `learner.reset_qtable()`.

The inner loop (`episode`) represents each training episode. We reset the environment, initialize step count and total rewards, and enter the episode loop. In the episode loop, we select actions using the Epsilon-Greedy strategy, perform actions in the environment, and update the Q-table using the Q-learning update equation.

We log the rewards, steps, and Q-tables for analysis. The `tqdm` progress bar is used to visualize the progress of training episodes within each run.


In [36]:
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False

    # play one episode
    while not done:
        action = agent.get_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)

        # update the agent
        agent.update(obs, action, reward, terminated, next_obs)

        # update if the environment is done and the current obs
        done = terminated or truncated
        obs = next_obs

    agent.decay_epsilon()

100%|██████████| 100000/100000 [00:25<00:00, 3958.85it/s]


### Visualization

In [37]:
import gymnasium as gym
from gymnasium.utils.save_video import save_video
env = gym.make('Blackjack-v1',render_mode="rgb_array_list", natural=False, sab=False)

state, info = env.reset()
step_starting_index = 0
episode_index = 0
explorer.epsilon = 0.0
for step_index in range(199):
    action = agent.get_action(state)
    # Take the action (a) and observe the outcome state(s') and reward (r)
    new_state, reward, terminated, truncated, info = env.step(action)

    # update the agent
    agent.update(state, action, reward, terminated, new_state)

    # Our new state is state
    state = new_state
    _, _, terminated, truncated, _ = env.step(action)

    if terminated or truncated:
        save_video(
            env.render(),
            "videos",
            fps=env.metadata["render_fps"],
            step_starting_index=step_starting_index,
            episode_index=episode_index
        )
        step_starting_index = step_index + 1
        episode_index += 1
        state, info = env.reset()
env.close()


Moviepy - Building video /content/videos/rl-video-episode-0.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-0.mp4





Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-0.mp4
Moviepy - Building video /content/videos/rl-video-episode-1.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-1.mp4



                                                  

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-1.mp4




Moviepy - Building video /content/videos/rl-video-episode-8.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-8.mp4



                                                  

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-8.mp4




Moviepy - Building video /content/videos/rl-video-episode-27.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-27.mp4



                                                  

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-27.mp4




Moviepy - Building video /content/videos/rl-video-episode-64.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-64.mp4



                                                  

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-64.mp4




Moviepy - Building video /content/videos/rl-video-episode-125.mp4.
Moviepy - Writing video /content/videos/rl-video-episode-125.mp4



                                                  

Moviepy - Done !
Moviepy - video ready /content/videos/rl-video-episode-125.mp4




In [42]:
from IPython.display import HTML
from base64 import b64encode
mp4 = open('/content/videos/rl-video-episode-64.mp4','rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=600 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)