Ejemplo tomado de: https://www.learndatasci.com/tutorials/reinforcement-q-learning-scratch-python-openai-gym/

El diseño de una simulación de un taxi autónomo tiene como objetivo demostrar cómo las técnicas de aprendizaje por refuerzo (RL) pueden aplicarse para desarrollar un enfoque eficiente y seguro en este contexto. La tarea principal del taxi es recoger a un pasajero en una ubicación y dejarlo en otra, garantizando que se cumplan los objetivos de llegar al destino correcto en el menor tiempo posible, respetando la seguridad del pasajero y las normas de tráfico.

El problema se modela considerando aspectos clave como las recompensas, el espacio de estados y el espacio de acciones. Las recompensas guían al taxi, premiándolo por completar un viaje con éxito y penalizándolo por errores, como intentar dejar al pasajero en el lugar equivocado. El espacio de estados representa todas las situaciones posibles en las que el taxi puede encontrarse, mientras que el espacio de acciones define los movimientos y decisiones que el taxi puede tomar, como moverse en una dirección específica o recoger/dejar a un pasajero.

# Instalación de gym

In [1]:
!pip install cmake 'gym[atari]' scipy

Collecting ale-py~=0.7.5 (from gym[atari])
  Downloading ale_py-0.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.1 kB)
Downloading ale_py-0.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: ale-py
Successfully installed ale-py-0.7.5


# Se carga el ambiente de juego y se muestra

In [2]:
import gym

env = gym.make("Taxi-v3").env
env.reset()
env.render()

  deprecation(
  deprecation(
If you want to render in human mode, initialize the environment in this way: gym.make('EnvName', render_mode='human') and don't call the render method.
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


In [3]:
env.reset() # reset environment to a new, random state
env.render()

print("Action Space {}".format(env.action_space))
print("State Space {}".format(env.observation_space))

  and should_run_async(code)


Action Space Discrete(6)
State Space Discrete(500)


In [4]:
state = env.encode(3, 1, 2, 0) # (taxi row, taxi column, passenger index, destination index)
print("State:", state)

env.s = state
env.render()

State: 328


# Tabla de recompensas

In [5]:
env.P[328]

{0: [(1.0, 428, -1, False)],
 1: [(1.0, 228, -1, False)],
 2: [(1.0, 348, -1, False)],
 3: [(1.0, 328, -1, False)],
 4: [(1.0, 328, -10, False)],
 5: [(1.0, 328, -10, False)]}

# Resolver el problema del entorno sin aprendizaje de refuerzo

In [6]:
env.s = 328  # set environment to illustration's state

epochs = 0
penalties, reward = 0, 0

frames = [] # for animation

done = False

while not done:
    action = env.action_space.sample()
    state, reward, done, info = env.step(action)

    if reward == -10:
        penalties += 1

    # Put each rendered frame into dict for animation
    frames.append({
        'frame': env.render(mode='ansi'),
        'state': state,
        'action': action,
        'reward': reward
        }
    )

    epochs += 1


print("Timesteps taken: {}".format(epochs))
print("Penalties incurred: {}".format(penalties))

  if not isinstance(terminated, (bool, np.bool8)):
See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


Timesteps taken: 2858
Penalties incurred: 919


In [7]:
from IPython.display import clear_output
from time import sleep

def print_frames(frames):
    for i, frame in enumerate(frames):
        clear_output(wait=True)
        print(frame['frame'])
        print(f"Timestep: {i + 1}")
        print(f"State: {frame['state']}")
        print(f"Action: {frame['action']}")
        print(f"Reward: {frame['reward']}")
        sleep(.1)

print_frames(frames)

+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|[35m[34;1m[43mY[0m[0m[0m| : |B: |
+---------+
  (Dropoff)

Timestep: 2858
State: 410
Action: 5
Reward: 20


# Aprendizaje de refuerzo Q-learning

### Entrenando al agente

In [8]:
import numpy as np
q_table = np.zeros([env.observation_space.n, env.action_space.n])

### Algoritmo para actualizar la tabla

In [9]:
%%time
"""Training the agent"""

import random
from IPython.display import clear_output

# Hyperparameters
alpha = 0.1
gamma = 0.6
epsilon = 0.1

# For plotting metrics
all_epochs = []
all_penalties = []

for i in range(1, 100001):
    state = env.reset()

    epochs, penalties, reward, = 0, 0, 0
    done = False

    while not done:
        if random.uniform(0, 1) < epsilon:
            action = env.action_space.sample() # Explore action space
        else:
            action = np.argmax(q_table[state]) # Exploit learned values

        next_state, reward, done, info = env.step(action)

        old_value = q_table[state, action]
        next_max = np.max(q_table[next_state])

        new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
        q_table[state, action] = new_value

        if reward == -10:
            penalties += 1

        state = next_state
        epochs += 1

    if i % 100 == 0:
        clear_output(wait=True)
        print(f"Episode: {i}")

print("Training finished.\n")

Episode: 100000
Training finished.

CPU times: user 1min 19s, sys: 7.22 s, total: 1min 26s
Wall time: 1min 28s


# Ver tabla

In [10]:
q_table[328]

array([ -2.40442342,  -2.27325184,  -2.4099454 ,  -2.36184501,
        -9.61148211, -10.32447486])

# Evaluación del agente

In [None]:
"""Evaluate agent's performance after Q-learning"""

total_epochs, total_penalties = 0, 0
episodes = 100

for _ in range(episodes):
    state = env.reset()
    epochs, penalties, reward = 0, 0, 0

    done = False

    while not done:
        action = np.argmax(q_table[state])
        state, reward, done, info = env.step(action)

        if reward == -10:
            penalties += 1

        epochs += 1

    total_penalties += penalties
    total_epochs += epochs

print(f"Results after {episodes} episodes:")
print(f"Average timesteps per episode: {total_epochs / episodes}")
print(f"Average penalties per episode: {total_penalties / episodes}")

Results after 100 episodes:
Average timesteps per episode: 13.11
Average penalties per episode: 0.0


In [11]:
import numpy as np

total_epochs, total_penalties, total_rewards = 0, 0, 0
successes = 0
all_epochs = []
all_penalties = []
all_rewards = []
episodes = 100

for _ in range(episodes):
    state = env.reset()
    epochs, penalties, reward_sum = 0, 0, 0

    done = False

    while not done:
        action = np.argmax(q_table[state])
        state, reward, done, info = env.step(action)

        reward_sum += reward

        if reward == -10:
            penalties += 1

        epochs += 1

    if penalties == 0:  # Consider a successful episode if no penalties were incurred
        successes += 1

    total_penalties += penalties
    total_epochs += epochs
    total_rewards += reward_sum

    all_epochs.append(epochs)
    all_penalties.append(penalties)
    all_rewards.append(reward_sum)

# Calcular métricas
avg_timesteps_per_episode = total_epochs / episodes
avg_penalties_per_episode = total_penalties / episodes
avg_rewards_per_episode = total_rewards / episodes
success_rate = successes / episodes * 100
std_epochs = np.std(all_epochs)
std_penalties = np.std(all_penalties)
max_reward = np.max(all_rewards)
min_reward = np.min(all_rewards)

# Imprimir los resultados
print(f"Results after {episodes} episodes:")
print(f"Average timesteps per episode: {avg_timesteps_per_episode}")
print(f"Average penalties per episode: {avg_penalties_per_episode}")
print(f"Average rewards per episode: {avg_rewards_per_episode}")
print(f"Success rate: {success_rate}%")
print(f"Standard deviation of timesteps per episode: {std_epochs}")
print(f"Standard deviation of penalties per episode: {std_penalties}")
print(f"Maximum reward in an episode: {max_reward}")
print(f"Minimum reward in an episode: {min_reward}")


Results after 100 episodes:
Average timesteps per episode: 12.86
Average penalties per episode: 0.0
Average rewards per episode: 8.14
Success rate: 100.0%
Standard deviation of timesteps per episode: 2.959797290356216
Standard deviation of penalties per episode: 0.0
Maximum reward in an episode: 14
Minimum reward in an episode: 3


# Ejemplo 2


In [None]:
!pip install "gymnasium[accept-rom-license]"
!pip install autorom[accept-rom-license]
!AutoROM --accept-license



AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/usr/local/lib/python3.10/dist-packages/AutoROM/roms

Existing ROMs will be overwritten.
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/adventure.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/air_raid.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/alien.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/amidar.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/assault.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/asterix.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/asteroids.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/atlantis.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/atlantis2.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/backgammon.bin
Installed /usr/local/lib/python3.10/dist-packages/AutoROM/roms/bank_heist.bin
Inst

In [None]:
import gymnasium as gym
env = gym.make("ALE/Breakout-v5")

In [None]:
epochs = 0

frames = []  # for animation
done = False

env = gym.make("ALE/Breakout-v5", render_mode="rgb_array")
observation, info = env.reset()

while not done:
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)

    # Put each rendered frame into dict for animation
    frames.append(
        {
            "frame": env.render(),
            "state": observation,
            "action": action,
            "reward": reward,
        }
    )

    epochs += 1
    if epochs == 1000:
        break

  logger.warn(


In [None]:
from moviepy.editor import ImageSequenceClip
# !pip install moviepy - if you don’t have moviepy

def create_gif(frames: dict, filename, fps=100):
    """
    Creates a GIF animation from a list of RGBA NumPy arrays.

    Args:
        frames: A list of RGBA NumPy arrays representing the animation frames.
        filename: The output filename for the GIF animation.
        fps: The frames per second of the animation (default: 10).
    """
    rgba_frames = [frame["frame"] for frame in frames]

    clip = ImageSequenceClip(rgba_frames, fps=fps)
    clip.write_gif(filename, fps=fps)

# Example usage
create_gif(frames, "animation.gif") #saves the GIF locally


  from scipy.ndimage.filters import sobel



MoviePy - Building file animation.gif with imageio.




In [None]:
from moviepy.config import change_settings
change_settings({"FFMPEG_BINARY": "/usr/bin/ffmpeg"})