# Control con Función de Política: Actor-Critic (TD)

En este notebook implementaremos el algoritmo **Actor-Critic con TD (episódico)**, una técnica que combina la estimación de valores y la mejora de política en paralelo. A diferencia de los métodos puramente basados en valores como Q-learning o SARSA, los métodos Actor-Critic mantienen dos funciones diferenciadas: el **actor**, que representa la política, y el **crítico**, que estima el valor de los estados (o pares estado-acción).

Este enfoque aprovecha la estabilidad de las actualizaciones del valor para guiar el aprendizaje de la política directamente, haciendo uso del gradiente de política y del error de TD. Esto permite manejar entornos con espacios de acción continuos o políticas estocásticas.

## Objetivos

* Implementar el algoritmo **Actor-Critic (episódico)** utilizando PyTorch.

In [36]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchinfo import summary

In [37]:
import os

if os.name == 'posix' and os.uname().sysname == 'Darwin':
    # Set the path to ffmpeg for macOS, replace with your actual path
    os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg"

## Definir constantes y funciones auxiliares

In [38]:
DEVICE = 'cpu'  # por defecto, usamos la CPU
if torch.cuda.is_available():  # si hay una GPU disponible (y cuda está instalado)
    DEVICE = 'cuda'
elif torch.backends.mps.is_available():  # si hay un MPS disponible (Metal Performance Shaders)
    DEVICE = 'mps'

print(DEVICE)

cuda


Podemos cambiar el ambiente para probar otros casos.

In [39]:
ENVS = ["MountainCar-v0", "CartPole-v1"]
ENV_NAME = ENVS[1]  # Cambia esto para probar con otros entornos

In [40]:
def get_env(env_name, record_video=False, record_every=1, folder="./videos" ):
    """
    Create the environment with optional video recording and statistics.
    Args:
        env_name (str): Name of the environment to create.
        record_video (bool): Whether to record video of the episodes.
        record_every (int): Frequency of recording episodes.
        folder (str): Folder to save the recorded videos.
    Returns:
        env (gym.Env): The environment.
        
    See also:
        https://gymnasium.farama.org/introduction/record_agent/
    """
    # Initialise the environment
    env = gym.make(env_name, render_mode="rgb_array")

    if record_video:
        env = RecordVideo(env, video_folder=folder,
                    episode_trigger=lambda x: x % record_every == 0)
    
    return env

Exploramos minimamente el entorno (Observation space y Action space) para entender cómo interactuar con él.

In [41]:
env = get_env(ENV_NAME)
print(f"Environment: {ENV_NAME}")
print(f"Observation space: {env.observation_space}")
print(f"Observation space shape: {env.observation_space.shape}")
print(f"Action space: {env.action_space}")

INPUT_DIM = env.observation_space.shape[0]
N_ACTIONS = env.action_space.n

Environment: CartPole-v1
Observation space: Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Observation space shape: (4,)
Action space: Discrete(2)


Definimos una función que nos pase nuestra observación a un tensor de PyTorch, lo cual es necesario para trabajar con redes neuronales.

In [42]:
def process_state(obs, device=DEVICE):
    return torch.tensor(obs, device=device).unsqueeze(0)

## Redes (función de política y función de valor)

Vamos a definir dos redes neuronales: una para la política (actor) y otra para el valor (crítico). Ambas redes tendrán una arquitectura simple. Lo importante es que la red de política saldrá una distribución de probabilidad sobre las acciones, mientras que la red de valor saldrá un valor escalar para el estado actual.

> La red de política (actor) se encargará de seleccionar acciones basadas en la política aprendida, mientras que la red de valor (crítico) evaluará el estado actual y proporcionará retroalimentación al actor. Es importate e ultizar una función de activación adecuada para la salida de la red de política, como `softmax`.

In [43]:
tensor_actor_test = torch.tensor([1.0, -2.0, 3.0], device=DEVICE).unsqueeze(0)  # Añadimos una dimensión para simular un batch
print(f"{torch.nn.Softmax(dim=-1)(tensor_actor_test)}") 
print(f"{torch.nn.Softmax(dim=-1)(tensor_actor_test).sum()}")

tensor([[0.1185, 0.0059, 0.8756]], device='cuda:0')
0.9999999403953552


### Definir la red de política (actor)

In [44]:
class ActorCNN(nn.Module):
    def __init__(self, input_dim, n_actions):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, n_actions)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

actor_model = ActorCNN(INPUT_DIM, N_ACTIONS).to(DEVICE)

summary(actor_model, input_size=(1, INPUT_DIM), device=DEVICE)

Layer (type:depth-idx)                   Output Shape              Param #
ActorCNN                                 [1, 2]                    --
├─Linear: 1-1                            [1, 128]                  640
├─Linear: 1-2                            [1, 2]                    258
Total params: 898
Trainable params: 898
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0.00
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.00
Estimated Total Size (MB): 0.00

Probamos con un tensor

In [45]:
obs, _ = env.reset() # obtenermos un estado inicial del entorno
tensor_obs_test = process_state(obs, device=DEVICE) # lo pasamos a un tensor
actor_model(tensor_obs_test) # lo pasamos por la red neuronal (debería devolver una distribución de probabilidad sobre las acciones)

tensor([[0.0805, 0.0557]], device='cuda:0', grad_fn=<AddmmBackward0>)

### Definir la red valor (crítico)

In [46]:
class CriticCNN(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

critic_model = CriticCNN(INPUT_DIM).to(DEVICE)

summary(critic_model, input_size=(1, INPUT_DIM), device=DEVICE)

Layer (type:depth-idx)                   Output Shape              Param #
CriticCNN                                [1, 1]                    --
├─Linear: 1-1                            [1, 128]                  640
├─Linear: 1-2                            [1, 1]                    129
Total params: 769
Trainable params: 769
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0.00
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.00
Estimated Total Size (MB): 0.00

In [47]:
obs, _ = env.reset() # obtenermos un estado inicial del entorno
tensor_obs_test = process_state(obs, device=DEVICE) # lo pasamos a un tensor
critic_model(tensor_obs_test) # lo pasamos por la red neuronal (esto debería devolver un valor de estado, no una distribución de probabilidad)

tensor([[-0.0047]], device='cuda:0', grad_fn=<AddmmBackward0>)

## Sampleo de acciones
Para samplear acciones vamos a utilizar la red de política. Dado que la salida de la red de política es una distribución de probabilidad, utilizaremos `torch.distributions.Categorical` para muestrear acciones basadas en esta distribución.

El paquete `torch.distributions` nos permite trabajar con distribuciones probabilísticas de manera sencilla, pudienendo muestrear acciones y calcular log-probabilidades de las acciones seleccionadas.

Dichos tensores (log-probabilidades) contienen los gradientes necesarios para actualizar la red de política durante el entrenamiento.

Ver [torch.distributions](https://docs.pytorch.org/docs/stable/distributions.html) y [Categorical](https://docs.pytorch.org/docs/stable/distributions.html#torch.distributions.categorical.Categorical).

In [48]:
prob_tensor = torch.tensor([0.1, 0.2, 0.7], device=DEVICE, requires_grad=True).unsqueeze(0)  # Simulamos una distribución de probabilidad
distribtion = torch.distributions.Categorical(prob_tensor)
print(f"Probabilidades: {prob_tensor}")
action = distribtion.sample()  # Muestreamos una acción de la distribución
print(f"Acción muestreada: {action}")
print(f"Log-probabilidades: {distribtion.log_prob(action)}")

Probabilidades: tensor([[0.1000, 0.2000, 0.7000]], device='cuda:0',
       grad_fn=<UnsqueezeBackward0>)
Acción muestreada: tensor([2], device='cuda:0')
Log-probabilidades: tensor([-0.3567], device='cuda:0', grad_fn=<SqueezeBackward1>)


## Algoritmo Actor-Critic

**Input:**  
- una parametrización diferenciable de la política $\pi(a|s, \theta)$  
- una parametrización diferenciable de la función de valor de estado $\hat{v}(s, \mathbf{w})$  

**Parámetros:** tasas de aprendizaje $\alpha^\theta > 0$, $\alpha^{\mathbf{w}} > 0$

**Inicializar:**  
- parámetros de la política $\theta \in \mathbb{R}^{d'}$
- pesos del valor de estado $\mathbf{w} \in \mathbb{R}^d$ (por ejemplo, a $0$)

$$
\begin{array}{l}
\textbf{Loop forever (para cada episodio):} \\
\quad \text{Inicializar } S \text{ (primer estado del episodio)} \\
\quad I \leftarrow 1 \\
\quad \textbf{Loop mientras } S \text{ no sea terminal (para cada paso de tiempo):} \\
\quad\quad A \sim \pi(\cdot|S, \theta) \\
\quad\quad \text{Tomar acción } A, \text{ observar } S', R \\
\quad\quad \delta \leftarrow R + \gamma \hat{v}(S', \mathbf{w}) - \hat{v}(S, \mathbf{w}) \quad (\text{si } S' \text{ es terminal, } \hat{v}(S', \mathbf{w}) \doteq 0) \\
\quad\quad \mathbf{w} \leftarrow \mathbf{w} + \alpha^{\mathbf{w}} \delta \nabla \hat{v}(S, \mathbf{w}) \\
\quad\quad \theta \leftarrow \theta + \alpha^{\theta} I \delta \nabla \ln \pi(A|S, \theta) \\
\quad\quad I \leftarrow \gamma I \\
\quad\quad S \leftarrow S' \\
\end{array}
$$

In [None]:
def select_action(action_probs):
    distribtion = torch.distributions.Categorical(action_probs)
    action = distribtion.sample()
    log_prob = distribtion.log_prob(action)
    return action.item(), log_prob


def train(env, actor_net, critic_net, process_state_fn, num_episodes=10_000, actor_lr=.0001, critic_lr=.0001, gamma=.99):
    actor_opt = torch.optim.Adam(actor_net.parameters(), lr=actor_lr)
    critic_opt = torch.optim.Adam(critic_net.parameters(), lr = critic_lr)
    
    for _ in tqdm(range(num_episodes)):
        obs, _ = env.reset()
        obs_t = process_state_fn(obs)
        I = 1
        done = False
        while not done:
            action, log_prob = select_action(actor_net(obs_t))
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            next_obs_t = process_state(next_obs)

            obs_value = critic_net(obs_t)
            next_obs_value = critic_net(next_obs_t) * (1-float(done))

            delta = reward + (gamma * next_obs_value.detach()) - obs_value

            #Actualizar los pesos del critic (w)
            critic_opt.zero_grad()
            critic_loss = delta.pow(2)
            critic_loss.backward()
            critic_opt.step()

            #Actualizar los pesos del actor (teta)
            actor_opt.zero_grad()
            actor_loss = -I * delta.detach() * log_prob
            actor_loss.backward()
            actor_opt.step()

            I = I * gamma
            obs = next_obs
            obs_t = next_obs_t


def play_episodes(env, actor_net, process_state_fn, num_episodes=5):
    for _ in tqdm(range(num_episodes)):
        obs, _ = env.reset()
        done = False
        while not done:
            obs_t = process_state_fn(obs)
            action, _ = select_action(actor_net(obs_t))
            next_obs, _, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            next_obs_t = process_state(next_obs)

            obs = next_obs

In [50]:
actor_net = ActorCNN(INPUT_DIM, N_ACTIONS).to(DEVICE)
critic_net = CriticCNN(INPUT_DIM).to(DEVICE)

In [51]:
env = get_env(ENV_NAME, record_video=True, record_every=100, folder="./videos/train" )
train(env, actor_net, critic_net, process_state_fn=process_state, num_episodes=2_000, actor_lr=0.0001, critic_lr=0.0005)

  logger.warn(
  0%|          | 0/2000 [00:01<?, ?it/s]


ValueError: Expected parameter probs (Tensor of shape (1, 2)) of distribution Categorical(probs: torch.Size([1, 2])) to satisfy the constraint Simplex(), but found invalid values:
tensor([[-1.9793,  2.9793]], device='cuda:0', grad_fn=<DivBackward0>)

In [None]:
env = get_env(ENV_NAME, record_video=True, record_every=1, folder="./videos/test" )
play_episodes(env, actor_net, process_state_fn=process_state, num_episodes=5)

  logger.warn(
