# Installations / Préparations

## install

In [1]:
!pip install importlib-metadata==4.13.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting importlib-metadata==4.13.0
  Downloading importlib_metadata-4.13.0-py3-none-any.whl (23 kB)
Installing collected packages: importlib-metadata
  Attempting uninstall: importlib-metadata
    Found existing installation: importlib-metadata 6.0.0
    Uninstalling importlib-metadata-6.0.0:
      Successfully uninstalled importlib-metadata-6.0.0
Successfully installed importlib-metadata-4.13.0


In [2]:
!pip install git+https://github.com/osigaud/bbrl_gym

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/osigaud/bbrl_gym
  Cloning https://github.com/osigaud/bbrl_gym to /tmp/pip-req-build-qgy962aj
  Running command git clone --filter=blob:none --quiet https://github.com/osigaud/bbrl_gym /tmp/pip-req-build-qgy962aj
  Resolved https://github.com/osigaud/bbrl_gym to commit 5557075ecd7d4171ac0c21be3c69a94bcae655a9
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Installing backend dependencies ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting mazemdp>=0.7.3
  Downloading mazemdp-0.7.3-py3-none-any.whl (15 kB)
Collecting swig
  Downloading swig-4.1.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m21.4 MB/s[0m eta [36m0:00:00[0m
Collecting gym==0.21.0
  D

## import

In [3]:
import os
from typing import Tuple, List

import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

from mazemdp.toolbox import egreedy, egreedy_loc
from mazemdp.toolbox import softmax, sample_categorical
from mazemdp.maze_plotter import show_videos
from mazemdp.mdp import Mdp
from bbrl_gym.envs.maze_mdp import MazeMDPEnv

# For visualization
os.environ["VIDEO_FPS"] = "5"
if not os.path.isdir("./videos"):
    os.mkdir("./videos")

from IPython.display import Video

import gym
import bbrl_gym

Matplotlib backend: module://ipykernel.pylab.backend_inline


## settings

In [4]:
# Settings
NB_EPISODES = 100
TIMEOUT = 100
MAZE_LARGEUR = 4
MAZE_HAUTEUR = 3
EPSILON = 0.02
TAU = 0.1

## création du maze

In [6]:
env = gym.make("MazeMDP-v0", kwargs={"width": MAZE_LARGEUR, "height": MAZE_HAUTEUR, "ratio": 0.25})
env.reset()

# in dynamic programming, there is no agent moving in the environment
env.init_draw("The maze")

Output()

# Q-Learning 3D

## utils

In [9]:
def get_policy_from_q(Q: np.ndarray, but: int) -> np.ndarray:
    # Outputs a policy given the action values
    policy = np.argmax(Q[:, but, :], axis=1)
    return policy

In [8]:
def calcul_transition(mdp: MazeMDPEnv, but):
    transition_matrix = np.zeros( (mdp.nb_states, mdp.action_space.n, mdp.nb_states) )

    for s in range(mdp.nb_states-1):
        x = mdp.coord_x[s]
        y = mdp.coord_y[s]

        # NORD #
        # si case tout au nord, pas de changement d'état possible dans cette direction
        if x == 0 :
            transition_matrix[s][0][s] = 1.0
        # SUD #
        # si case tout au sud, pas de changement d'état possible dans cette direction
        if x == MAZE_HAUTEUR-1 :
            transition_matrix[s][1][s] = 1.0
        # EST #
        # si case tout à l'est, pas de changement d'état possible dans cette direction
        if y == MAZE_LARGEUR-1 :
            transition_matrix[s][2][s] = 1.0
        # OUEST
        # si case tout à l'ouest, pas de changement d'état possible dans cette direction
        if y == 0 :
            transition_matrix[s][3][s] = 1.0

        for s_prime in range(mdp.nb_states-1):
            # Regarde les relations entre s et les autres états
            if (s != s_prime):
                x_prime = mdp.coord_x[s_prime]
                y_prime = mdp.coord_y[s_prime]
                # NORD #
                if (x_prime == x-1) and (y_prime == y):
                    transition_matrix[s][0][s_prime] = 1.0
                # SUD #
                if (x_prime == x+1) and (y_prime == y):
                    transition_matrix[s][1][s_prime] = 1.0
                # EST #
                if (x_prime == x) and (y_prime == y+1):
                    transition_matrix[s][2][s_prime] = 1.0
                # OUEST #
                if (x_prime == x) and (y_prime == y-1):
                    transition_matrix[s][3][s_prime] = 1.0

        # Si une action n'a aucune transition, alors on boucle sur le même état
        for a in range(mdp.action_space.n):
            if all(transition_matrix[s,a] == 0):
                transition_matrix[s,a,s] = 1.0

    # Transition Matrix of terminal states
    transition_matrix[but, :, :] = 0
    transition_matrix[but, :, -1] = 1

    return transition_matrix

In [10]:
# calcul et stocke les matrices de récompense et de transition pour un but
def calcul_goal(mdp: MazeMDPEnv, but: int):
    # calcul du nouveau point d'origine des récompenses
    new_r = np.zeros((mdp.nb_states, mdp.action_space.n))
    new_r[but] = np.ones(mdp.action_space.n)
    new_r[but] = np.ones(mdp.action_space.n)

    # calcul de la nouvelle matrice de transition
    new_P = calcul_transition(mdp, but)

    return new_r, new_P

In [11]:
# modifie la MDP en fonction du goal
def maj_goal(mdp: MazeMDPEnv, but, r_list, P_list):
    # modification de l'état but
    mdp.mdp.r = r_list[but]
    mdp.P = P_list[but]
    mdp.mdp.P = P_list[but]

## epsilon-greedy

In [45]:
# --------------------------- Q-Learning epsilon-greedy version -------------------------------#

# Given an exploration rate epsilon, the QLearning algorithm computes the state action-value function
# based on an epsilon-greedy policy
# alpha is the learning rate


def q_learning_eps(
    mdp: MazeMDPEnv,
    epsilon: float,
    nb_episodes: int = 20,
    timeout: int = 50,
    alpha: float = 0.5,
    render: bool = True,
) -> Tuple[np.ndarray]:
    # Initialize the state-goal-action value function
    # 3D : états / buts / actions
    Q = np.zeros((mdp.nb_states, mdp.nb_states, mdp.action_space.n))
    r_list = []   # récompenses selon les buts
    P_list = []   # transition selon les buts

    # calcul initial des matrices pour chacun des buts
    for s in range(mdp.nb_states-1):
        new_r, new_P = calcul_goal(mdp, s)
        r_list.append(new_r)
        P_list.append(new_P)
  
    # Run learning cycle
    mdp.timeout = timeout  # episode length

    if render:
        mdp.init_draw("Q-learning e-greedy")

    for _ in tqdm(range(nb_episodes)):
        # Draw the first state of episode i using a uniform distribution over all the states
        s = mdp.reset(uniform=True)
        but = np.random.randint(mdp.nb_states-1)   # tirage de l'état but (hors état puit)

        maj_goal(mdp, but, r_list, P_list)    # modification des matrices correspondante à ce but

        done = mdp.mdp.done()

        while not done:
            if render:
                # Show the agent in the maze
                mdp.mdp.plotter.terminal_states = [but]    # modifie le plotter
                mdp.draw_v_pi(Q[:,but,:], get_policy_from_q(Q, but))

            # Draw an action using an epsilon-greedy policy
            a = egreedy(Q[:, but, :], s, epsilon)

            # Perform a step of the MDP
            [s_prime, r, done, _] = mdp.step(a)

            # Calculs en fonction du but de l'épisode
            delta = r + mdp.gamma * np.max(Q[s_prime, but]) - Q[s, but, a]

            Q[s, but, a] += alpha * delta

            # Update the agent position
            s = s_prime


    if render:
        # Show the final policy
        mdp.mdp.plotter.terminal_states = [mdp.nb_states-1]
        mdp.draw_v_pi(Q[:,mdp.nb_states-1,:], get_policy_from_q(Q, mdp.nb_states-1), title="Q-learning e-greedy")

    return Q

In [46]:
Q = q_learning_eps(env, EPSILON, NB_EPISODES, TIMEOUT)

Output()

  0%|          | 0/100 [00:00<?, ?it/s]

## Visualisation epsilon-greedy

In [48]:
show_videos("videos/", "Q-learninge-greedy")

Converting videos/Q-learninge-greedy.avi


In [50]:
# visualisation
but = 3
print("état but : ", but)
env.mdp.plotter.terminal_states = [but]
env.draw_v_pi(Q[:,but,:], get_policy_from_q(Q, but), title="Q-learning e-greedy"+str(but))

état but :  3


Output()

## Softmax

In [60]:
# --------------------------- Q-Learning softmax version ----------------------------#

# Given a temperature "tau", the QLearning algorithm computes the state action-value function
# based on a softmax policy
# alpha is the learning rate


def q_learning_soft(
    mdp: MazeMDPEnv,
    tau: float,
    nb_episodes: int = 20,
    timeout: int = 50,
    alpha: float = 0.5,
    render: bool = True,
) -> Tuple[np.ndarray]:
    # Initialize the state-action value function
    # alpha is the learning rate
    Q = np.zeros((mdp.nb_states,mdp.nb_states, mdp.action_space.n))

    r_list = []
    P_list =[]

    # calcul initial des matrices pour chacun des buts
    for s in range(mdp.nb_states-1):
        new_r, new_P = calcul_goal(mdp, s)
        r_list.append(new_r)
        P_list.append(new_P)
    

    # Run learning cycle
    mdp.timeout = timeout  # episode length

    if render:
        mdp.init_draw("Q-learning softmax")

    for episode in tqdm(range(nb_episodes)):
        # Draw the first state of episode i using a uniform distribution over all the states
        x = mdp.reset(uniform=True)
        but = np.random.randint(0,mdp.nb_states-1)

        maj_goal(mdp, but, r_list, P_list)

        # on explore moins au fil des épisodes
        t = tau / (1 + np.log(1+episode))

        done = mdp.mdp.done()
        while not done:
            if render:
                # Show the agent in the maze
                mdp.mdp.plotter.terminal_states = [but]
                mdp.draw_v_pi(Q[:,but,:], get_policy_from_q(Q, but))

            u = sample_categorical(softmax(Q[:, but, :], x, t))
            # Perform a step of the MDP
            [y, r, done, _] = mdp.step(u)

            # Update the state-action value function
            delta = r + mdp.gamma * np.max(Q[y, but]) - Q[x, but, u]
            Q[x,but, u] = Q[x,but, u] + alpha * delta
            
            x = y


    if render:
        # Show the final policy
        mdp.mdp.plotter.terminal_states = [mdp.nb_states-1]
        mdp.draw_v_pi(Q[:,mdp.nb_states-1,:], get_policy_from_q(Q, mdp.nb_states-1), title="Q-learning softmax")

    return Q


In [61]:
q = q_learning_soft(env, TAU, NB_EPISODES, TIMEOUT)

Output()

  0%|          | 0/100 [00:00<?, ?it/s]

## Visualisation Softmax

In [62]:
show_videos("videos/", "Q-learningsoftmax")

Converting videos/Q-learningsoftmax.avi


In [64]:
# visualisation
but = 4
print("état but : ", but)
env.mdp.plotter.terminal_states = [but]
env.draw_v_pi(q[:,but,:], get_policy_from_q(q, but), title="Q-learning e-greedy"+str(but))

état but :  4


Output()