## Aprendizaje por refuerzo desde retroalimentación humana (RLHF)

- Imitación aprendizaje (IL): Algoritmo de aprendizaje por refuerzo donde el agente aprende a imitar el comportamiento de un experto.
- Aprendizaje inverso de reforzamiento (IRL): Algoritmo donde se infiere la función de recompensa que un experto está optimizando a partir de sus demostraciones.
- Aprendizaje por preferencias: Un enfoque donde se usan comparaciones de pares de trayectorias en lugar de recompensas numéricas para guiar el aprendizaje del agente.

### El aprendizaje por imitación (IL) 

Es un campo del aprendizaje automático en el cual un agente aprende a realizar tareas replicando el comportamiento de un experto. Este enfoque es especialmente útil en escenarios donde es difícil definir una función de recompensa precisa o cuando la exploración directa en el entorno es costosa o peligrosa.

**1 . Behavioral Cloning (BC)**
Behavioral Cloning (BC) es una técnica básica de aprendizaje por imitación que utiliza aprendizaje supervisado para entrenar al agente a partir de demostraciones de expertos.

**Proceso de BC:**

- Recopilación de datos: Se recopilan datos de demostraciones de un experto, que consisten en pares de estados y acciones (s, a).
- Entrenamiento del modelo: Se entrena un modelo de política (por ejemplo, una red neuronal) utilizando los pares de estados y acciones. El objetivo es minimizar la diferencia entre las acciones predichas por el modelo y las acciones del experto en los estados correspondientes.
- Ejecución de la política: El modelo entrenado se utiliza para predecir acciones en nuevos estados durante la ejecución.

**Ventajas y desventajas**:

* Ventajas: Simple de implementar y puede funcionar bien si las demostraciones del experto cubren adecuadamente el espacio de estados.
* Desventajas: Puede sufrir de errores de compounding, donde pequeños errores en la predicción de acciones pueden acumularse y llevar a un desempeño pobre. BC también depende en gran medida de la calidad y cobertura de las demostraciones del experto.

**2 . DAgger (Dataset Aggregation)**

DAgger (Dataset Aggregation) es una técnica que mejora BC al abordar el problema de los errores de compounding. DAgger es un enfoque iterativo que combina el aprendizaje supervisado con la retroalimentación en tiempo real del experto.

**Proceso de DAgger:**

- Recopilación de datos inicial: Se recopila un conjunto inicial de demostraciones del experto.
- Entrenamiento inicial: Se entrena una política inicial usando BC con los datos iniciales.
- Iteraciones de DAgger:
    * Ejecución de la política actual: La política entrenada se ejecuta en el entorno, generando nuevas trayectorias de estados.
    * Corrección del experto: El experto revisa las acciones tomadas por la política en estos nuevos estados y proporciona las acciones correctas.
    * Agregación de datos: Los nuevos pares de estados y acciones proporcionados por el experto se agregan al conjunto de datos de entrenamiento.
    * Reentrenamiento: La política se reentrena con el conjunto de datos ampliado.
 
**3 . Generative adversarial imitation learning (GAIL):**

GAIL combina aprendizaje por imitación con técnicas de aprendizaje generativo adversarial. En GAIL, un generador (la política del agente) y un discriminador (que distingue entre trayectorias generadas por el agente y trayectorias del experto) se entrenan conjuntamente. El objetivo del generador es producir trayectorias que sean indistinguibles de las trayectorias del experto, según el discriminador.

### Ejemplos

Aquí hay un ejemplo simple de cómo implementar Behavioral Cloning en PyTorch:

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Definir la red neuronal
class BCModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(BCModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Datos de ejemplo (estados y acciones)
states = torch.rand((1000, 4))  # 1000 estados con 4 características cada uno
actions = torch.rand((1000, 2))  # 1000 acciones con 2 dimensiones cada una

# Crear el dataset y dataloader
dataset = TensorDataset(states, actions)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Inicializar el modelo, la pérdida y el optimizador
model = BCModel(input_dim=4, output_dim=2)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Entrenamiento
num_epochs = 50
for epoch in range(num_epochs):
    for batch_states, batch_actions in dataloader:
        optimizer.zero_grad()
        outputs = model(batch_states)
        loss = criterion(outputs, batch_actions)
        loss.backward()
        optimizer.step()
    print(f"Epoca [{epoch+1}/{num_epochs}], Perdida: {loss.item():.4f}")

print("Entrenamiento completado")


Este código entrena un modelo de clonación de comportamiento utilizando un conjunto de datos de estados y acciones.

Aquí hay un ejemplo de cómo implementar DAgger en PyTorch:

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

# Definir la red neuronal
class DAggerModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DAggerModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Función simulada para obtener la acción del experto
def expert_policy(state):
    return state.sum(dim=-1, keepdim=True) * 0.5  # Acción simulada

# Datos iniciales de ejemplo
states = torch.rand((1000, 4))
actions = expert_policy(states)

# Crear el dataset inicial y dataloader
dataset = TensorDataset(states, actions)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Inicializar el modelo, la pérdida y el optimizador
model = DAggerModel(input_dim=4, output_dim=1)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Entrenamiento inicial con BC
num_epochs = 50
for epoch in range(num_epochs):
    for batch_states, batch_actions in dataloader:
        optimizer.zero_grad()
        outputs = model(batch_states)
        loss = criterion(outputs, batch_actions)
        loss.backward()
        optimizer.step()
    print(f"Epoca [{epoch+1}/{num_epochs}], Perida: {loss.item():.4f}")

# Iteraciones de DAgger
num_iterations = 10
for iteration in range(num_iterations):
    # Ejecutar la política actual y recopilar nuevos datos
    new_states = torch.rand((500, 4))  # Nuevos estados alcanzados por la política
    new_actions = expert_policy(new_states)  # Acciones del experto para los nuevos estados

    # Agregar los nuevos datos al dataset existente
    states = torch.cat([states, new_states], dim=0)
    actions = torch.cat([actions, new_actions], dim=0)

    # Crear un nuevo dataloader con el dataset ampliado
    dataset = TensorDataset(states, actions)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Reentrenar el modelo
    for epoch in range(num_epochs):
        for batch_states, batch_actions in dataloader:
            optimizer.zero_grad()
            outputs = model(batch_states)
            loss = criterion(outputs, batch_actions)
            loss.backward()
            optimizer.step()
        print(f"Iteracion [{iteration+1}/{num_iterations}], Epoca [{epoch+1}/{num_epochs}], Perdida: {loss.item():.4f}")

print("Entrenamiento completado")


Este código muestra un ejemplo simplificado de DAgger. En cada iteración, el agente ejecuta su política actual para recolectar nuevos estados y obtiene las acciones correctas del experto para estos estados. Luego, reentrena la política con el conjunto de datos ampliado.

A continuación, se presenta una implementación básica de GAIL en PyTorch:


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym

# Definir la red neuronal para la política (generador)
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.fc3(x), dim=-1)
        return action_probs

# Definir la red neuronal para el discriminador
class Discriminator(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 1)
    
    def forward(self, state, action):
        action_onehot = torch.zeros(action.size(0), action_dim)
        action_onehot.scatter_(1, action.long(), 1)  # Convertir las acciones a one-hot encoding
        x = torch.cat([state, action_onehot], dim=-1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        prob = torch.sigmoid(self.fc3(x))
        return prob

# Generar una trayectoria utilizando la política actual
def generate_trajectory(policy_net, env, max_steps=200):
    state = env.reset()
    trajectory = []
    for _ in range(max_steps):
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        action_probs = policy_net(state_tensor)
        action = np.random.choice(len(action_probs[0]), p=action_probs.detach().numpy()[0])
        next_state, reward, done, _ = env.step(action)
        trajectory.append((state, action))
        if done:
            break
        state = next_state
    return trajectory

# Generar trayectorias de un experto
def generate_expert_trajectories(env, expert_policy, num_trajectories=10, max_steps=200):
    expert_trajectories = []
    for _ in range(num_trajectories):
        trajectory = []
        state = env.reset()
        for _ in range(max_steps):
            action = expert_policy(state)
            next_state, reward, done, _ = env.step(action)
            trajectory.append((state, action))
            if done:
                break
            state = next_state
        expert_trajectories.append(trajectory)
    return expert_trajectories

# Entrenar el discriminador
def train_discriminator(discriminator, expert_trajectories, generated_trajectories, optimizer, num_epochs=10):
    criterion = nn.BCELoss()
    for epoch in range(num_epochs):
        total_loss = 0
        for expert_traj, gen_traj in zip(expert_trajectories, generated_trajectories):
            expert_states, expert_actions = zip(*expert_traj)
            gen_states, gen_actions = zip(*gen_traj)
            
            expert_states = torch.tensor(np.array(expert_states), dtype=torch.float32)
            expert_actions = torch.tensor(np.array(expert_actions), dtype=torch.float32).unsqueeze(-1)
            gen_states = torch.tensor(np.array(gen_states), dtype=torch.float32)
            gen_actions = torch.tensor(np.array(gen_actions), dtype=torch.float32).unsqueeze(-1)
            
            expert_labels = torch.ones(len(expert_states), 1)
            gen_labels = torch.zeros(len(gen_states), 1)
            
            optimizer.zero_grad()
            expert_preds = discriminator(expert_states, expert_actions)
            gen_preds = discriminator(gen_states, gen_actions)
            
            loss = criterion(expert_preds, expert_labels) + criterion(gen_preds, gen_labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoca {epoch+1}, Perdida: {total_loss / len(expert_trajectories)}')

# Entrenar la política
def train_policy(policy_net, discriminator, optimizer, env, num_epochs=10, max_steps=200):
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        trajectory = generate_trajectory(policy_net, env, max_steps)
        states, actions = zip(*trajectory)
        
        states = torch.tensor(np.array(states), dtype=torch.float32)
        actions = torch.tensor(np.array(actions), dtype=torch.float32).unsqueeze(-1)
        
        rewards = -torch.log(discriminator(states, actions))
        loss = -rewards.mean()
        loss.backward()
        optimizer.step()
        
        print(f'Epoca {epoch+1}, Politica perdida: {loss.item()}')

# Inicialización del entorno y modelos
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = PolicyNetwork(state_dim, action_dim)
discriminator = Discriminator(state_dim, action_dim)

optimizer_policy = optim.Adam(policy_net.parameters(), lr=3e-4)
optimizer_discriminator = optim.Adam(discriminator.parameters(), lr=3e-4)

# Generar trayectorias de expertos (esto debería ser reemplazado con una política de experto real)
expert_policy = lambda state: env.action_space.sample()  # Ejemplo simple
expert_trajectories = generate_expert_trajectories(env, expert_policy, num_trajectories=10)

for iteration in range(10):
    generated_trajectories = [generate_trajectory(policy_net, env) for _ in range(10)]
    train_discriminator(discriminator, expert_trajectories, generated_trajectories, optimizer_discriminator)
    train_policy(policy_net, discriminator, optimizer_policy, env)

print("Entrenamiento completado")


Esta implementación básica de GAIL en PyTorch proporciona una base para experimentar con la combinación de aprendizaje por imitación y aprendizaje generativo adversarial. Se pueden mejorar diversos aspectos del código, como la arquitectura de las redes, los hiperparámetros y las técnicas de optimización, para ajustarse mejor a aplicaciones específicas y mejorar el rendimiento del agente.

### Ejercicios

1 . Implementa y entrena un modelo de Behavioral Cloning en el entorno LunarLander-v2.

Instrucciones:

- Utiliza el entorno LunarLander-v2 de OpenAI Gym.
- Recopila un conjunto de datos de demostraciones de un experto.
- Entrena una política utilizando Behavioral Cloning.
- Evalúa la política clonada en el entorno y compara su rendimiento con el experto.

In [None]:
## Tu respuesta

2 . Implementa y entrena un modelo usando DAgger en el entorno MountainCarContinuous-v0.

Instrucciones:

- Utiliza el entorno MountainCarContinuous-v0 de OpenAI Gym.
- Recopila un conjunto inicial de datos de demostraciones de un experto.
- Implementa el algoritmo DAgger para mejorar la política iterativamente.
- Evalúa la política final en el entorno y compara su rendimiento con el experto.

In [None]:
# Tu respuesta

3 . Implementa y entrena un modelo de GAIL en el entorno BipedalWalker-v3.

Instrucciones:

- Utiliza el entorno BipedalWalker-v3 de OpenAI Gym.
- Recopila un conjunto de datos de demostraciones de un experto.
- Implementa el algoritmo GAIL para entrenar tanto el generador como el discriminador.
- Evalúa la política final en el entorno y compara su rendimiento con el experto.

In [None]:
## Tu respuesta

### Aprendiza inverso de reforzamiento (IRL)

El aprendizaje por refuerzo inverso (Inverse Reinforcement Learning, IRL) es un campo de la inteligencia artificial donde el objetivo es inferir la función de recompensa que un agente está tratando de maximizar a partir de su comportamiento observado. A diferencia del aprendizaje por refuerzo tradicional, donde la función de recompensa está dada y el objetivo es encontrar la política óptima, en IRL se observa la política del experto y se trata de descubrir la función de recompensa que podría haber llevado a esa política.

#### Máxima entropía inverse reinforcement learning (MaxEnt IRL)

MaxEnt IRL es un algoritmo que busca inferir una función de recompensa a partir de demostraciones de expertos, utilizando el principio de máxima entropía para manejar la incertidumbre de manera robusta. Este enfoque garantiza que entre todas las distribuciones posibles de trayectorias que coinciden con las demostraciones, se selecciona aquella que tiene la máxima entropía, evitando suposiciones innecesarias y promoviendo la diversidad.

**Proceso de MaxEnt IRL**

- Recopilación de datos: Obtener demostraciones de expertos que consisten en trayectorias de estados y acciones.
- Modelado de trayectorias: Representar las trayectorias como secuencias de estados y acciones.
- Cálculo de la entropía: Utilizar el principio de máxima entropía para modelar la probabilidad de las trayectorias.
- Optimización: Ajustar la función de recompensa para maximizar la probabilidad de las trayectorias observadas.

A continuación, se presenta un ejemplo simplificado de cómo implementar MaxEnt IRL en PyTorch. Este ejemplo asume un entorno simple donde se conocen las transiciones de estados.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Definir la red neuronal para la función de recompensa
class RewardModel(nn.Module):
    def __init__(self, input_dim):
        super(RewardModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        reward = self.fc2(x)
        return reward

# Función para calcular la probabilidad de trayectorias usando máxima entropía
def compute_trajectory_probabilities(trajectories, reward_model, gamma=0.99):
    probabilities = []
    for trajectory in trajectories:
        states = torch.tensor(trajectory, dtype=torch.float32, requires_grad=True)
        rewards = reward_model(states).squeeze()
        discounted_rewards = (gamma ** torch.arange(len(rewards))).to(rewards) * rewards
        trajectory_prob = torch.exp(discounted_rewards.sum())
        probabilities.append(trajectory_prob)
    probabilities = torch.stack(probabilities)
    return probabilities / probabilities.sum()

# Función para entrenar el modelo de recompensa
def train_maxent_irl(trajectories, state_dim, lr=0.01, num_epochs=100):
    reward_model = RewardModel(state_dim)
    optimizer = optim.Adam(reward_model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        optimizer.zero_grad()
        
        # Calcular las probabilidades de las trayectorias
        trajectory_probs = compute_trajectory_probabilities(trajectories, reward_model)
        
        # Calcular la función de pérdida (negativa de la suma de log-probabilidades)
        loss = -torch.log(trajectory_probs).mean()
        
        # Optimizar el modelo de recompensa
        loss.backward()
        optimizer.step()
        
        if epoch % 10 == 0:
            print(f'Epoca {epoch}, Perdida: {loss.item()}')

    return reward_model

# Datos de ejemplo: trayectorias observadas del experto (cada trayectoria es una secuencia de estados)
trajectories = [
    [[0.0, 0.0], [0.1, 0.1], [0.2, 0.2]],
    [[0.1, 0.0], [0.2, 0.1], [0.3, 0.2]],
    # Más trayectorias del experto
]

# Entrenar el modelo de recompensa usando MaxEnt IRL
state_dim = 2  # Dimensionalidad del estado
reward_model = train_maxent_irl(trajectories, state_dim)

# Evaluar el modelo de recompensa en un nuevo estado
new_state = torch.tensor([[0.1, 0.1]], dtype=torch.float32)
predicted_reward = reward_model(new_state)
print(f'Recompensa predicha para el estado {new_state.numpy()}: {predicted_reward.item()}')


### Apprenticeship Learning via Inverse Reinforcement Learning

El concepto de "Apprenticeship Learning via Inverse Reinforcement Learning" implica aprender una política similar a la de un experto mediante el aprendizaje de la función de recompensa subyacente que el experto está optimizando. Aquí te presento una implementación básica utilizando PyTorch.

La idea principal es iterar entre dos fases:

* Actualizar la política: Usar una técnica de RL estándar para optimizar la política dada una función de recompensa.
* Actualizar la función de recompensa: Usar las trayectorias generadas por la política actual para ajustar la función de recompensa.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym

# Definir la red neuronal para la función de recompensa
class RewardModel(nn.Module):
    def __init__(self, input_dim):
        super(RewardModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        reward = self.fc2(x)
        return reward

# Definir la política para espacios de acción discretos
class DiscretePolicyModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DiscretePolicyModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        action_probs = torch.softmax(self.fc2(x), dim=-1)
        return action_probs

def select_action(policy, state):
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action_probs = policy(state_tensor)
    action = torch.multinomial(action_probs, num_samples=1).item()
    return action

# Función para calcular el retorno acumulado de una trayectoria
def compute_return(trajectory, reward_model, gamma=0.99):
    states = torch.tensor(np.array(trajectory), dtype=torch.float32, requires_grad=True)
    rewards = reward_model(states).squeeze()
    discounted_rewards = (gamma ** torch.arange(len(rewards))).to(rewards) * rewards
    return discounted_rewards.sum()

# Función para recolectar una trayectoria usando la política actual
def collect_trajectory(env, policy):
    state = env.reset()
    trajectory = []
    done = False
    while not done:
        action = select_action(policy, state)
        next_state, reward, done, _ = env.step(action)
        trajectory.append(state)
        state = next_state
    return trajectory

# Función para entrenar la política usando la recompensa aprendida
def train_policy(env, reward_model, policy, optimizer, num_episodes=100, gamma=0.99):
    for _ in range(num_episodes):
        state = env.reset()
        trajectory = []
        done = False
        while not done:
            action = select_action(policy, state)
            next_state, _, done, _ = env.step(action)
            trajectory.append(state)
            state = next_state
        G = compute_return(trajectory, reward_model, gamma)
        optimizer.zero_grad()
        G.backward()
        optimizer.step()

# Función principal de entrenamiento de IRL
def train_apprenticeship_irl(env_name, expert_trajectories, lr=0.01, num_epochs=100, gamma=0.99):
    env = gym.make(env_name)
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    
    reward_model = RewardModel(state_dim)
    policy_model = DiscretePolicyModel(state_dim, action_dim)
    reward_optimizer = optim.Adam(reward_model.parameters(), lr=lr)
    policy_optimizer = optim.Adam(policy_model.parameters(), lr=lr)
    
    for epoch in range(num_epochs):
        # Fase 1: Optimizar la política con la recompensa actual
        train_policy(env, reward_model, policy_model, policy_optimizer)
        
        # Fase 2: Optimizar el modelo de recompensa con las trayectorias del experto y las generadas
        reward_optimizer.zero_grad()
        expert_returns = torch.stack([compute_return(traj, reward_model, gamma) for traj in expert_trajectories])
        policy_trajectories = [collect_trajectory(env, policy_model) for _ in range(len(expert_trajectories))]
        policy_returns = torch.stack([compute_return(traj, reward_model, gamma) for traj in policy_trajectories])
        
        loss = -expert_returns.mean() + policy_returns.mean()
        loss.backward()
        reward_optimizer.step()
        
        if epoch % 10 == 0:
            print(f'Epoca {epoch}, Perdida: {loss.item()}')

    return policy_model, reward_model

# Ejemplo de uso con el entorno "CartPole-v1"
expert_trajectories = [
    [[0.0, 0.0, 0.1, 0.1], [0.1, 0.1, 0.2, 0.2], [0.2, 0.2, 0.3, 0.3]],
    [[0.0, 0.1, 0.1, 0.2], [0.1, 0.2, 0.2, 0.3], [0.2, 0.3, 0.3, 0.4]],
    # Más trayectorias del experto
]

policy_model, reward_model = train_apprenticeship_irl("CartPole-v1", expert_trajectories)

# Evaluar la política entrenada en un nuevo estado
env = gym.make("CartPole-v1")
state = env.reset()
done = False
total_reward = 0
while not done:
    action = select_action(policy_model, state)
    state, reward, done, _ = env.step(action)
    total_reward += reward

print(f'Recompensa total obtenida por la política aprendida: {total_reward}')


### Deep Inverse Reinforcement Learning (Deep IRL)

Deep IRL extiende los métodos tradicionales de IRL utilizando redes neuronales profundas para modelar la función de recompensa. Este enfoque permite manejar problemas más complejos con estados de alta dimensionalidad.

* Definir una red neural para la función de recompensa.
* Optimizar la función de recompensa usando trayectorias observadas.
* Entrenar una política usando RL con la función de recompensa aprendida.

Revisar: [Maximum Entropy Deep Inverse Reinforcement Learning](https://arxiv.org/abs/1507.04888).

### Ejercicios

1 . Implementa y entrena un modelo usando el algoritmo de Máxima Entropía IRL en el entorno LunarLander-v2.

Instrucciones:

* Utiliza el entorno LunarLander-v2 de OpenAI Gym.
* Recopila un conjunto de datos de demostraciones de un experto.
* Implementa el algoritmo de Máxima Entropía IRL para aprender la función de recompensa.
* Entrena una política utilizando la función de recompensa aprendida.

In [None]:
## Tu respuesta

2 .Implementa y entrena un modelo usando el algoritmo de Máxima Entropía IRL en el entorno MountainCarContinuous-v0.

Instrucciones:

* Utiliza el entorno MountainCarContinuous-v0 de OpenAI Gym.
* Recopila un conjunto de datos de demostraciones de un experto.
* Implementa el algoritmo de Máxima Entropía IRL para aprender la función de recompensa.
* Entrena una política utilizando la función de recompensa aprendida.

In [None]:
## Tu respuesta

### Aprendizaje por preferencia

El aprendizaje por preferencias es una subárea del aprendizaje automático que se centra en aprender a partir de preferencias en lugar de ejemplos etiquetados de manera explícita. En lugar de aprender a partir de pares de datos de entrada y salida, el modelo aprende a partir de comparaciones o rankings de diferentes opciones. Este enfoque es especialmente útil en situaciones donde las etiquetas exactas son difíciles de obtener, pero es fácil obtener preferencias relativas.

Por ejemplo, en lugar de saber que la opción A tiene una recompensa de 10 y la opción B tiene una recompensa de 5, el modelo sólo sabe que A es preferida sobre B. Este tipo de aprendizaje es común en aplicaciones como sistemas de recomendación, donde las preferencias de los usuarios son más fáciles de obtener que las evaluaciones cuantitativas exactas.

En el contexto del aprendizaje por refuerzo, el aprendizaje por refuerzo basado en preferencias (PBRL) se centra en aprender políticas óptimas a partir de preferencias en lugar de recompensas numéricas explícitas. Esto es útil en situaciones donde es difícil definir una función de recompensa exacta, pero se pueden obtener comparaciones de resultados o trayectorias.


Aquí hay un ejemplo simple de cómo se podría implementar un algoritmo de aprendizaje por preferencias en PyTorch:

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Definir el modelo de preferencia
class PreferenceModel(nn.Module):
    def __init__(self, input_dim):
        super(PreferenceModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        preference = self.fc2(x)
        return preference

# Generar datos de preferencia simulados
def generate_preference_data(num_samples, input_dim):
    data = torch.randn(num_samples, input_dim)
    preferences = torch.randint(0, 2, (num_samples,))  # 0 o 1 para indicar preferencia
    return data, preferences

# Función para entrenar el modelo de preferencia
def train_preference_model(data, preferences, input_dim, lr=0.01, num_epochs=100):
    model = PreferenceModel(input_dim)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.BCEWithLogitsLoss()
    
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        outputs = model(data).squeeze()
        loss = loss_fn(outputs, preferences.float())
        loss.backward()
        optimizer.step()
        
        if epoch % 10 == 0:
            print(f'Epoca {epoch}, Perdida: {loss.item()}')

    return model

# Ejemplo de uso
input_dim = 4
data, preferences = generate_preference_data(100, input_dim)
trained_model = train_preference_model(data, preferences, input_dim)


#### El modelo Bradley-Terry 

Es un modelo probabilístico utilizado para predecir el resultado de comparaciones binarias entre pares de opciones. Es ampliamente utilizado en la teoría de la decisión y en el aprendizaje por preferencias para modelar preferencias en varios contextos, como competiciones deportivas, sistemas de recomendación y encuestas de opinión.

En el modelo Bradley-Terry, se asume que cada opción $i$ tiene una habilidad o "score" $\theta_i$. La probabilidad de que la opción $i$ sea preferida sobre la opción $j$ se modela como:

$$P(i \succ j) = \frac{\theta_i}{\theta_i + \theta_j}$$

donde $i \succ j$ indica que la opción $i$ es preferida sobre la opción $j$.

**Estimación de parámetros**

Para estimar los parámetros $\theta_i$ de cada opción $i$, se puede utilizar el método de máxima verosimilitud. La función de verosimilitud para el conjunto de comparaciones observadas se maximiza con respecto a los parámetros $\theta$.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Definir el modelo Bradley-Terry
class BradleyTerryModel(nn.Module):
    def __init__(self, num_items):
        super(BradleyTerryModel, self).__init__()
        self.scores = nn.Parameter(torch.randn(num_items))
    
    def forward(self, i, j):
        score_i = self.scores[i]
        score_j = self.scores[j]
        prob = torch.sigmoid(score_i - score_j)
        return prob

# Generar datos de comparación simulados
def generate_comparison_data(num_pairs, num_items):
    comparisons = torch.randint(0, num_items, (num_pairs, 2))
    preferences = torch.randint(0, 2, (num_pairs,))
    return comparisons, preferences

# Función para entrenar el modelo Bradley-Terry
def train_bradley_terry_model(comparisons, preferences, num_items, lr=0.01, num_epochs=100):
    model = BradleyTerryModel(num_items)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.BCELoss()
    
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        i = comparisons[:, 0]
        j = comparisons[:, 1]
        outputs = model(i, j)
        loss = loss_fn(outputs, preferences.float())
        loss.backward()
        optimizer.step()
        
        if epoch % 10 == 0:
            print(f'Epoca {epoch}, Perdida: {loss.item()}')

    return model

# Ejemplo de uso
num_items = 10
comparisons, preferences = generate_comparison_data(100, num_items)
trained_model = train_bradley_terry_model(comparisons, preferences, num_items)


El PBRL es una extensión del aprendizaje por refuerzo (RL) que utiliza preferencias en lugar de recompensas numéricas explícitas para guiar el aprendizaje del agente. En PBRL, el agente recibe retroalimentación en forma de comparaciones entre diferentes trayectorias o acciones, indicando cuál es preferida.

**Conceptos clave en PBRL**

* Preferencias en lugar de recompensas: En lugar de recibir recompensas numéricas después de cada acción, el agente recibe comparaciones de trayectorias o acciones.
* Función de recompensa implícita: El agente aprende una función de recompensa implícita que satisface las preferencias observadas.
* Actualización de la política: La política del agente se actualiza para maximizar la probabilidad de generar trayectorias que sean consistentes con las preferencias observadas.
  
#### Algoritmos de PBRL

**Preferencia por trayectorias (trajectory preference learning):**

En este enfoque, el agente aprende a partir de comparaciones entre trayectorias completas. Las trayectorias son secuencias de estados y acciones, y el agente recibe retroalimentación sobre qué trayectorias son preferidas.

- Ejemplo de algoritmo: Maximum Entropy Inverse Reinforcement Learning (MaxEnt IRL), donde se aprende una función de recompensa que maximiza la entropía de la política condicionada a las preferencias observadas.

**Preferencia por parejas de estados (state pair preference learning):**

Aquí, el agente aprende a partir de comparaciones entre pares de estados o pares de estado-acción. La retroalimentación indica qué par es preferido.

- Ejemplo de algoritmo: Preference-Based Policy Iteration (PBPI), donde se actualiza la política basada en las preferencias entre pares de estados.

**Preferencia por acciones (action preference learning):**

En este enfoque, el agente aprende preferencias entre diferentes acciones en un estado dado. La retroalimentación se proporciona sobre cuál acción es preferida en un estado específico.

- Ejemplo de algoritmo: Bayesian Preference Learning, donde se usa inferencia bayesiana para aprender las preferencias y actualizar la política en consecuencia.

A continuación, se presenta un ejemplo de cómo implementar un algoritmo de PBRL usando el modelo Bradley-Terry para aprender una función de recompensa implícita:

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym

# Definir el modelo de preferencia Bradley-Terry
class BradleyTerryModel(nn.Module):
    def __init__(self, state_dim):
        super(BradleyTerryModel, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, state):
        x = torch.relu(self.fc1(state))
        score = self.fc2(x)
        return score

# Función para calcular la probabilidad de preferencia
def preference_probability(model, state_i, state_j):
    score_i = model(state_i)
    score_j = model(state_j)
    prob = torch.sigmoid(score_i - score_j)
    return prob

# Función para entrenar el modelo de preferencia
def train_preference_model(model, optimizer, states, preferences, num_epochs=100):
    loss_fn = nn.BCELoss()
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        state_i, state_j = states
        outputs = preference_probability(model, state_i, state_j).squeeze()
        loss = loss_fn(outputs, preferences.float())
        loss.backward()
        optimizer.step()
        
        if epoch % 10 == 0:
            print(f'Epoca {epoch}, Perdida: {loss.item()}')

# Generar datos de comparación simulados
def generate_comparison_data(num_pairs, state_dim):
    state_i = torch.randn(num_pairs, state_dim)
    state_j = torch.randn(num_pairs, state_dim)
    preferences = torch.randint(0, 2, (num_pairs,))
    return (state_i, state_j), preferences

# Entrenar una política usando la recompensa implícita aprendida
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.fc3(x), dim=-1)
        return action_probs

def select_action(policy, state):
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action_probs = policy(state_tensor)
    action = torch.multinomial(action_probs, num_samples=1).item()
    return action

def collect_trajectory(env, policy, max_steps=200):
    state = env.reset()
    trajectory = []
    for _ in range(max_steps):
        action = select_action(policy, state)
        next_state, reward, done, _ = env.step(action)
        trajectory.append(state)
        if done:
            break
        state = next_state
    return trajectory

def compute_reward(trajectory, model):
    rewards = [model(torch.tensor(state, dtype=torch.float32, requires_grad=True).unsqueeze(0)) for state in trajectory]
    rewards_tensor = torch.cat(rewards)
    return rewards_tensor.sum()

def train_policy(env, model, policy, optimizer, num_episodes=100):
    for episode in range(num_episodes):
        trajectory = collect_trajectory(env, policy)
        reward = compute_reward(trajectory, model)
        optimizer.zero_grad()
        loss = -reward
        loss.backward()
        optimizer.step()
        if episode % 10 == 0:
            print(f'Episodio {episode}, Politica perdida: {loss.item()}')

# Ejemplo de uso
state_dim = 4
action_dim = 2

# Generar datos de preferencia
states, preferences = generate_comparison_data(100, state_dim)

# Entrenar el modelo de preferencia
preference_model = BradleyTerryModel(state_dim)
optimizer = optim.Adam(preference_model.parameters(), lr=0.01)
train_preference_model(preference_model, optimizer, states, preferences)

# Entrenar la política usando la recompensa implícita aprendida
env = gym.make('CartPole-v1')
policy = PolicyNetwork(state_dim, action_dim)
optimizer_policy = optim.Adam(policy.parameters(), lr=0.01)
train_policy(env, preference_model, policy, optimizer_policy)

# Evaluar la política entrenada
for _ in range(5):
    state = env.reset()
    done = False
    while not done:
        action = select_action(policy, state)
        state, reward, done, _ = env.step(action)
        # env.render()  # Comentado para evitar el uso de pyglet si no está instalado
env.close()


Hay varios algoritmos más de PBRL que utilizan técnicas como el modelo de Bradley-Terry para manejar preferencias en lugar de recompensas numéricas tradicionales. 

**Learning from ranked trajectories (RaPP)**

El algoritmo RaPP se basa en la idea de aprender una política óptima a partir de trayectorias que están ordenadas según las preferencias. En este enfoque, el agente aprende una función de valor que respeta el orden de las trayectorias proporcionadas.

Proceso de RaPP

- Recopilación de datos: Obtener conjuntos de trayectorias ordenadas por preferencias.
- Modelado de la función de valor: Utilizar un modelo para estimar los valores de las trayectorias.
- Optimización: Ajustar la política para maximizar la preferencia de las trayectorias ordenadas.

**Coactive learning**

Coactive learning es un enfoque donde un agente interactúa con un experto y recibe feedback en forma de preferencias sobre acciones. Este feedback se utiliza para ajustar la política del agente.

Proceso de coactive learning

- Interacción: El agente toma una acción y recibe feedback en forma de preferencia.
- Actualización: La política se actualiza para preferir las acciones que el experto prefiere.
- Repetición: Este proceso se repite para refinar la política del agente.

**Active preference-based learning (APBL)**

APBL es un enfoque activo donde el agente selecciona pares de trayectorias para comparación con el fin de aprender de manera más eficiente. Este enfoque utiliza técnicas de selección activa para mejorar el aprendizaje.

Proceso de APBL

- Selección activa: El agente selecciona pares de trayectorias que son más informativas.
- Feedback de preferencias: El agente recibe feedback en forma de preferencias.
- Actualización de la política: La política se ajusta para reflejar el feedback recibido.

### Ejercicios

1 . Implementa un algoritmo de iteración de políticas basado en preferencias (PBPI) en un entorno de OpenAI Gym.

Instrucciones:

- Utiliza el entorno CartPole-v1.
- Implementa un modelo de preferencias para aprender la función de recompensa.
- Implementa la iteración de políticas basada en preferencias para mejorar la política del agente.


In [None]:
## Tu respuesta

2 .  Implementa un algoritmo de aprendizaje por preferencias Bayesiano en un entorno de OpenAI Gym.

Instrucciones:

- Utiliza el entorno MountainCar-v0.
- Implementa un modelo Bayesiano para aprender la función de recompensa a partir de preferencias.
- Utiliza inferencia Bayesiana para actualizar la política del agente.

In [None]:
## Tu respuesta

3 . Implementa un algoritmo de aprendizaje a partir de trayectorias ordenadas en un entorno de OpenAI Gym.

Instrucciones:

- Utiliza el entorno LunarLander-v2.
- Recopila un conjunto de trayectorias ordenadas por un experto.
- Implementa un modelo para aprender la función de recompensa a partir de estas trayectorias ordenadas.

In [None]:
## Tu respuesta

4 . Implementar un algoritmo de coactive learning en un entorno de OpenAI Gym.

Instrucciones:

- Utiliza el entorno Acrobot-v1.
- Implementa un modelo para aprender a partir de interacciones iterativas con el experto.
- Ajusta la política del agente en función de las sugerencias del experto.


In [None]:
## Tu respuesta

5 . Implementa un algoritmo de aprendizaje activo basado en preferencias en un entorno de OpenAI Gym.

Instrucciones:

- Utiliza el entorno Pendulum-v0.
- Implementa un modelo para aprender la función de recompensa a partir de consultas activas de preferencias.
- Mejora la política del agente utilizando estas preferencias activas.

Referencia:

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym

class PreferenceModel(nn.Module):
    def __init__(self, state_dim):
        super(PreferenceModel, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, state):
        x = torch.relu(self.fc1(state))
        score = self.fc2(x)
        return score

class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        action_probs = torch.softmax(self.fc3(x), dim=-1)
        return action_probs

def select_action(policy, state):
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    action_probs = policy(state_tensor)
    action = torch.multinomial(action_probs, num_samples=1).item()
    return action

def collect_trajectory(env, policy, max_steps=200):
    state = env.reset()
    trajectory = []
    for _ in range(max_steps):
        action = select_action(policy, state)
        next_state, reward, done, _ = env.step(action)
        trajectory.append((state, action, reward))
        if done:
            break
        state = next_state
    return trajectory

def train_preference_model(model, optimizer, trajectories, num_epochs=100):
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        total_loss = 0
        for traj in trajectories:
            reward = compute_reward(traj, model)
            loss = -reward
            loss.backward()
            total_loss += loss.item()
        optimizer.step()
        if epoch % 10 == 0:
            print(f'Epoca {epoch}, Perdida: {total_loss / len(trajectories)}')

def train_policy(env, model, policy, optimizer, num_episodes=100):
    for episode in range(num_episodes):
        trajectory = collect_trajectory(env, policy)
        reward = compute_reward(trajectory, model)
        optimizer.zero_grad()
        loss = -reward
        loss.backward()
        optimizer.step()
        if episode % 10 == 0:
            print(f'Episodio {episode}, Politica perdida: {loss.item()}')

# Puedes utilizar esta estructura base para implementar los algoritmos específicos mencionados en los ejercicios.


In [None]:
# Tu respuesta