<a href="https://colab.research.google.com/github/tobiasMarion/Estacionamento/blob/main/minicurso_rl_eramia25.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aprendizado por ReforÃ§o: Como Ensinar RobÃ´s a Maximizar Recompensas na PrÃ¡tica

Minicurso realizado no dia 12 de novembro durante o ERAMIA 2025 no Instituto de InformÃ¡tica da UFRGS.

Autor: Lucas N. Alegre


## Modelando Problemas com Gymnasium

Gymnasium Ã© a versÃ£o mantida do OpenAI Gym pela Farama Foundation. Ã‰ uma biblioteca Python para desenvolvimento e comparaÃ§Ã£o de algoritmos de Reinforcement Learning.

### InstalaÃ§Ã£o

```bash
pip install gymnasium
pip install gymnasium[all]  # Para ambientes adicionais
```

In [1]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

In [2]:
# @title Definindo um Agente AleatÃ³rio
class RandomAgent:
    def __init__(self, env):
        self.env = env
    def eval(self, obs):
        return self.env.action_space.sample()

In [None]:
## 1. Estrutura BÃ¡sica de um Ambiente

# Criar ambiente
# https://gymnasium.farama.org/environments/toy_text/frozen_lake/
def make_env():
    return gym.make(
                'FrozenLake-v1',
                desc=None,
                map_name="4x4",
                is_slippery=True,
                success_rate=1.0/3.0,
                reward_schedule=(1, 0, 0),
                render_mode="rgb_array"
            )

env = make_env()

# Observation Space
print(f"Observation space: {env.observation_space}")
print(f"NÃºmero de estados: {env.observation_space.n}")

# Action Space
print(f"\nAction space: {env.action_space}")
print(f"NÃºmero de aÃ§Ãµes: {env.action_space.n}")
print(f"AÃ§Ãµes: 0=Esquerda, 1=Baixo, 2=Direita, 3=Cima")

In [None]:
# Resetar ambiente
observation, info = env.reset(seed=42)

agent = RandomAgent(env)

print(f"Observation: {observation}")
print(f"Info: {info}")

# Loop de interaÃ§Ã£o
for _ in range(100):
    # Escolher aÃ§Ã£o (aleatÃ³ria neste exemplo)
    action = agent.eval(observation)

    # Executar aÃ§Ã£o
    observation, reward, terminated, truncated, info = env.step(action)

    # Verificar se episÃ³dio terminou
    if terminated or truncated:
        print(f"EpisÃ³dio terminou! Recompensa: {reward}")
        observation, info = env.reset()
        break

env.close()

In [None]:
# @title Animando o agente
from matplotlib import animation
from IPython.display import HTML

def animate_agent(agent, env, num_frames=100):
  s, info = env.reset()
  fig, axes = plt.subplots(1, 2, figsize=(10, 5))
  im = axes[0].imshow(env.render())
  frames = [env.render()]
  returns = [0]
  env_active = True
  for step in range(num_frames):
    a = agent.eval(s)
    s, r, terminated, truncated, info = env.step(a)
    done = terminated or truncated
    frames.append(env.render())
    returns.append(r + returns[-1])
    if env_active and done:
      env_active = False
      print(f'Game over! Your agent lasted {step} steps.')
  axes[1].set_title('Cumulative returns', fontsize=20)
  axes[1].set_xlim(0, num_frames)
  axes[1].set_ylim(0, max(returns) * 1.2)
  line, = axes[1].plot([], [], lw=2)

  def init():
    line.set_data([], [])
    im.set_data(frames[0])
    return [im]

  def animate(i):
    line.set_data(np.arange(i), returns[:i])
    im.set_data(frames[i])
    return [im]

  anim = animation.FuncAnimation(fig, animate, init_func=init, frames=num_frames,
                                 interval=50)
  plt.close()
  return HTML(anim.to_jshtml())


def eval_agent(agent, env) -> float:
    """Evaluate the agent for one episode and return the total reward."""
    s, info = env.reset()
    total_reward = 0
    done = False
    gamma = 1.0
    while not done:
        a = agent.eval(s)
        s, r, terminated, truncated, info = env.step(a)
        done = terminated or truncated
        total_reward += r * gamma
        gamma *= agent.gamma
    return total_reward

def average_eval_agent(agent, env, num_episodes=10) -> float:
    """Evaluate the agent for multiple episodes and return the average reward."""
    total_rewards = 0
    for _ in range(num_episodes):
        total_rewards += eval_agent(agent, env)
    return total_rewards / num_episodes

In [None]:
animate_agent(agent, env)

### Recursos Adicionais

- ðŸ“š DocumentaÃ§Ã£o oficial: https://gymnasium.farama.org/
- ðŸŽ® Lista de ambientes: https://gymnasium.farama.org/environments/
- ðŸ’» GitHub: https://github.com/Farama-Foundation/Gymnasium

# Q-Learning Tabular


In [None]:
class QLearning:

    def __init__(self, env, eval_env, learning_rate=0.1, gamma=0.99, exploration_rate=1.0, exploration_decay=0.995, min_exploration_rate=0.01):
        self.state_size = env.observation_space.n
        self.action_size = env.action_space.n
        self.eval_env = eval_env
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.exploration_rate = exploration_rate
        self.exploration_decay = exploration_decay
        self.min_exploration_rate = min_exploration_rate
        self.q_table = np.zeros((self.state_size, self.action_size))

    def choose_action(self, state):
        """Epsilon-greedy action selection."""
        # COMPLETE AQUI

    def eval(self, state):
        """Greedy action selection."""
        # COMPLETE AQUI

    def learn(self, state, action, reward, next_state, done):
        best_next_action = np.argmax(self.q_table[next_state])

        td_target = # COMPLETE AQUI
        td_error = # COMPLETE AQUI
        self.q_table[state][action] += self.learning_rate * td_error

        if done:
            self.exploration_rate = max(self.min_exploration_rate, self.exploration_rate * self.exploration_decay)

    def train(self, env, num_steps):
        eval_rewards = []
        state, _ = env.reset()
        for step in range(num_steps):
            action = self.choose_action(state)

            next_state, reward, terminated, truncated, _ = env.step(action)

            self.learn(state, action, reward, next_state, terminated)

            state = next_state
            if terminated or truncated:
                state, _ = env.reset()

            if step % 10000 == 0:
                eval_reward = average_eval_agent(self, self.eval_env)
                eval_rewards.append(eval_reward)
                print(f"Step: {step}, Eval Reward: {eval_reward}, Exploration Rate: {self.exploration_rate:.4f}")

        return eval_rewards


In [None]:
# https://gymnasium.farama.org/environments/toy_text/frozen_lake/
def make_env(render_mode=None):
    return gym.make(
                'FrozenLake-v1',
                desc=None,
                map_name="4x4",
                is_slippery=True,
                success_rate=0.9,
                reward_schedule=(1, 0, 0),
                render_mode=render_mode
            )
env = make_env()
eval_env = make_env(render_mode='rgb_array')

In [None]:
agent1 = QLearning(env, eval_env, exploration_decay=0.9999, learning_rate=0.5)
eval_rewards1 = agent1.train(env, num_steps=1000000)

agent2 = QLearning(env, eval_env, exploration_decay=0.9999, learning_rate=0.1)
eval_rewards2 = agent2.train(env, num_steps=1000000)

In [None]:
agent1.q_table

In [None]:
animate_agent(agent1, eval_env)

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(eval_rewards1, label='Agent 1')
plt.plot(eval_rewards2, label='Agent 2')
plt.xlabel('Evaluation Step (x10,000)')
plt.ylabel('Evaluation Reward')
plt.title('Q-Learning: Evaluation Reward Curves')
plt.legend()
plt.show()

## Deep Q-Networks (DQN)


In [None]:
from typing import Optional, Type
from dataclasses import dataclass
import numpy as np
import torch as th
import torch.nn as nn
import gymnasium as gym
from gymnasium import spaces

### Replay Buffer

In [None]:
@dataclass
class ReplayBufferSamples:
    """
    A dataclass containing transitions from the replay buffer.
    """
    observations: np.ndarray  # same as states in the theory
    next_observations: np.ndarray
    actions: np.ndarray
    rewards: np.ndarray
    terminateds: np.ndarray

In [None]:
class ReplayBuffer:
    """
    A simple replay buffer class to store and sample transitions.

    :param buffer_size: Max number of transitions to store
    :param observation_space: Observation space of the env,
        contains information about the observation type and shape.
    :param action_space: Action space of the env,
        contains information about the number of actions.
    """

    def __init__(
        self,
        buffer_size: int,
        observation_space: spaces.Box,
        action_space: spaces.Discrete,
    ) -> None:
        # Current position in the ring buffer
        self.current_idx = 0
        self.buffer_size = buffer_size
        # Boolean flag to know when the buffer has reached its maximal capacity
        self.is_full = False

        self.observation_space = observation_space
        self.action_space = action_space
        # Create the different buffers
        self.observations = np.zeros((buffer_size, *observation_space.shape), dtype=observation_space.dtype)
        self.next_observations = np.zeros((buffer_size, *observation_space.shape), dtype=observation_space.dtype)
        # The action is an integer
        action_dim = 1
        self.actions = np.zeros((buffer_size, action_dim), dtype=action_space.dtype)

        ### YOUR CODE HERE

        # TODO: create the buffers (numpy arrays) for the rewards (dtype=np.float32)
        # and the terminated signals (dtype=bool)


        ### END OF YOUR CODE

    def store_transition(
        self,
        obs: np.ndarray,
        next_obs: np.ndarray,
        action: int,
        reward: float,
        terminated: bool,
    ) -> None:
        """
        Store one transition in the buffer.

        :param obs: Current observation
        :param next_obs: Next observation
        :param action: Action taken for the current observation
        :param reward: Reward received after taking the action
        :param terminated: Whether it is the end of an episode or not
            (discarding episode truncation like timeout)
        """
        ### YOUR CODE HERE

        # TODO:
        # 1. Update the different buffers defined in the __init__
        # 2. Update the pointer (`self.current_idx`), careful
        # the pointer need to be set to zero when reaching the end of the ring buffer

        # Update the buffers to store the new transition


        # Update the pointer
        self.current_idx += 1
        # If the buffer is full, we start from zero again, this is a ring buffer
        # you also need to set the flag `is_full` to True (so we know the buffer has reached its max capacity)


        ### END OF YOUR CODE

    def sample(self, batch_size: int) -> ReplayBufferSamples:
        """
        Sample with replacement `batch_size` transitions from the buffer.

        :param batch_size: How many transitions to sample.
        :return: Samples from the replay buffer
        """

        # 1. Retrieve the upper bound (max index that can be sampled)
        #  it corresponds to `self.buffer_size` when the ring buffer is full (we can samples all indices)
        # 2. Sample `batch_size` indices with replacement from the buffer
        # (in the range [0, upper_bound[ ), numpy has a method `np.random.randint` for that ;)
        upper_bound = self.buffer_size if self.is_full else self.current_idx
        batch_indices = np.random.randint(0, upper_bound, size=batch_size)

        return ReplayBufferSamples(
            self.observations[batch_indices],
            self.next_observations[batch_indices],
            self.actions[batch_indices],
            self.rewards[batch_indices],
            self.terminateds[batch_indices],
        )

### Q Networks

In [None]:
class QNetwork(nn.Module):
    """
    A Q-Network for the DQN algorithm
    to estimate the q-value for a given observation.

    :param observation_space: Observation space of the env,
        contains information about the observation type and shape.
    :param action_space: Action space of the env,
        contains information about the number of actions.
    :param n_hidden_units: Number of units for each hidden layer.
    :param activation_fn: Activation function (ReLU by default)
    """

    def __init__(
        self,
        observation_space: spaces.Box,
        action_space: spaces.Discrete,
        n_hidden_units: int = 64,
        activation_fn: Type[nn.Module] = nn.ReLU,
    ) -> None:
        super().__init__()
        # Assume 1d space
        obs_dim = observation_space.shape[0]

        # 1. Retrieve the number of discrete actions,
        # that will be the number of ouputs of the q-network
        # 2. Create the q-network, it will be a two layers fully-connected
        # neural network which take the state (observation) as input
        # and outputs the q-values for all possible actions

        # Retrieve the number of discrete actions (using attribute `n` from `action_space`)
        n_actions = int(action_space.n)

        # Create the q network: a 2 fully connected hidden layers with `n_hidden_units` each
        # with `activation_fn` for the activation function after each hidden layer.
        # You should use `nn.Sequential` (combine several layers to create a network)
        # `nn.Linear` (fully connected layer) from PyTorch.
        self.q_net = nn.Sequential(
            nn.Linear(obs_dim, n_hidden_units),
            activation_fn(),
            nn.Linear(n_hidden_units, n_hidden_units),
            activation_fn(),
            nn.Linear(n_hidden_units, n_actions),
        )

    def forward(self, observations: th.Tensor) -> th.Tensor:
        """
        :param observations: A batch of observation (batch_size, obs_dim)
        :return: The Q-values for the given observations
            for all the action (batch_size, n_actions)
        """
        return self.q_net(observations)

### DQN

In [None]:
class DQNAgent:
    """
    A DQN agent implementation.

    Mnih, V., Kavukcuoglu, K., Silver, D. et al. Human-level control through deep reinforcement learning. Nature 518, 529â€“533 (2015).
    https://doi.org/10.1038/nature14236
    """
    def __init__(
        self,
        env,
        eval_env,
        n_hidden_units=256,
        learning_rate=1e-3,
        gamma=0.99,
        buffer_size=200000,
        batch_size=256,
        target_update_freq=1000,
        device="auto",
    ):
        self.env = env
        self.eval_env = eval_env
        self.action_space = env.action_space
        self.observation_space = env.observation_space
        self.gamma = gamma
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq
        self.device = th.device("cuda" if th.cuda.is_available() else "cpu") if device == "auto" else device

        self.q_net = QNetwork(self.observation_space, self.action_space, n_hidden_units).to(device)
        self.target_q_net = QNetwork(self.observation_space, self.action_space, n_hidden_units).to(device)
        self.target_q_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = th.optim.Adam(self.q_net.parameters(), lr=learning_rate)

        self.replay_buffer = ReplayBuffer(
            buffer_size=buffer_size,
            observation_space= self.observation_space,
            action_space= self.action_space
        )
        self.learn_step = 0

    def select_action(self, state, epsilon=0.05):
        if np.random.rand() < epsilon:
            return self.action_space.sample()
        state_tensor = th.tensor(state, dtype=th.float32, device=self.device).unsqueeze(0)
        with th.no_grad():
            q_values = self.q_net(state_tensor)
        return int(q_values.argmax().item())

    def store_transition(self, obs, next_obs, action, reward, terminated):
        self.replay_buffer.store_transition(obs, next_obs, action, reward, terminated)

    def train_step(self):
        if self.replay_buffer.current_idx < self.batch_size and not self.replay_buffer.is_full:
            return

        samples = self.replay_buffer.sample(self.batch_size)
        obs = th.tensor(samples.observations, dtype=th.float32, device=self.device)
        next_obs = th.tensor(samples.next_observations, dtype=th.float32, device=self.device)
        actions = th.tensor(samples.actions, dtype=th.int64, device=self.device).squeeze(-1)
        rewards = th.tensor(samples.rewards, dtype=th.float32, device=self.device)
        dones = th.tensor(samples.terminateds, dtype=th.float32, device=self.device)

        q_values = self.q_net(obs).gather(1, actions.unsqueeze(1)).squeeze(1)
        with th.no_grad():
            # Double DQN target calculation
            next_actions = # COMPLETE AQUI
            next_q_values = self.target_q_net(next_obs).gather(1, next_actions.unsqueeze(1)).squeeze(1)
            target = # COMPLETE AQUI

        loss = nn.functional.mse_loss(q_values, target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.learn_step += 1
        # Udpate target network
        if self.learn_step % self.target_update_freq == 0:
            self.target_q_net.load_state_dict(self.q_net.state_dict())

    def eval(self, state):
        state_tensor = th.tensor(state, dtype=th.float32, device=self.device).unsqueeze(0)
        with th.no_grad():
            q_values = self.q_net(state_tensor)
        return int(q_values.argmax().item())

    def train(self, num_steps, epsilon_start=1.0, epsilon_end=0.1, epsilon_decay=0.999):
        epsilon = epsilon_start
        eval_rewards = []

        state, _ = self.env.reset()
        for step in range(num_steps):

            action = self.select_action(state, epsilon)

            next_state, reward, terminated, truncated, _ = self.env.step(action)

            self.store_transition(state, next_state, action, reward, terminated)

            self.train_step()

            state = next_state
            if terminated or truncated:
                state, _ = self.env.reset()

            if step % 1000 == 0:
                eval_reward = average_eval_agent(self, self.eval_env)
                eval_rewards.append(eval_reward)
                print(f"Step: {step}, Eval Reward: {eval_reward}")

            epsilon = max(epsilon_end, epsilon * epsilon_decay)

        return eval_rewards

## Training the Agent

In [None]:
@title Ambiente Fetch com AÃ§Ãµes Discretas e ObservaÃ§Ãµes Modificadas

import gymnasium_robotics
from gymnasium.core import ActionWrapper, ObservationWrapper
from gymnasium.spaces import Discrete

class FetchObservationWrapper(ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space=gym.spaces.Box(
            low=np.concatenate((env.unwrapped.observation_space['observation'].low, env.unwrapped.observation_space['desired_goal'].low)),
            high=np.concatenate((env.unwrapped.observation_space['observation'].high, env.unwrapped.observation_space['desired_goal'].high)),
            dtype=np.float32
        )

    def observation(self, observation):
        # concatenate observation and desired_goal to form a single observation
        return np.hstack((observation["observation"], observation["desired_goal"]))


class FetchDiscreteManhattanAction(ActionWrapper):
    def __init__(self, env, use_gripper=False, use_null_action=False):
        super().__init__(env)
        self.use_gripper = use_gripper
        self.use_null_action = use_null_action

        self.action_dict = {
            0: np.array([1.0, 0.0, 0.0, -1.0], dtype=np.float32),
            1: np.array([-1.0, 0.0, 0.0, -1.0], dtype=np.float32),
            2: np.array([0.0, 1.0, 0.0, -1.0], dtype=np.float32),
            3: np.array([0.0, -1.0, 0.0, -1.0], dtype=np.float32),
            4: np.array([0.0, 0.0, 1.0, -1.0], dtype=np.float32),
            5: np.array([0.0, 0.0, -1.0, -1.0], dtype=np.float32)
        }
        self.num_actions = 6
        if self.use_gripper:
            self.action_dict[self.num_actions] = np.array([0.0, 0.0, 0.0, 1.0], dtype=np.float32)
            self.action_dict[self.num_actions + 1] = np.array([0.0, 0.0, 0.0, -1.0], dtype=np.float32)
            self.num_actions += 2
        if self.use_null_action:
            self.action_dict[self.num_actions] = np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32)
            self.num_actions += 1

        self.action_space = Discrete(self.num_actions)

    def action(self, action):
        real_action = self.action_dict[int(action)]
        return real_action

In [None]:
def make_env(render_mode=None):
    env = gym.make(
        "FetchReachDense-v4",
        render_mode=render_mode,
    )
    env = FetchObservationWrapper(env)
    env = FetchDiscreteManhattanAction(env, use_gripper=False, use_null_action=True)
    return env

env = make_env()
eval_env = make_env(render_mode='rgb_array')

agent = DQNAgent(
    env,
    eval_env,
    n_hidden_units=128,
    learning_rate=1e-3,
    buffer_size=50000,
    batch_size=64,
    target_update_freq=1000,
    device="cpu"
)

In [None]:
agent.train(num_steps=50000)

In [None]:
animate_agent(agent, eval_env, num_frames=10)