# Actor-Critic

Теорема о градиенте стратегии связывает градиент целевой функции  и градиент самой стратегии:

$$\nabla_\theta J(\theta) = \mathbb{E}_\pi [Q^\pi(s, a) \nabla_\theta \ln \pi_\theta(a \vert s)]$$

Встает вопрос, как оценить $Q^\pi(s, a)$? В чистом policy-based алгоритме REINFORCE используется отдача $G_t$, полученная методом Монте-Карло в качестве несмещенной оценки $Q^\pi(s, a)$. В Actor-Critic же предлагается отдельно обучать нейронную сеть Q-функции — критика.

Актор-критиком часто называют обобщенный фреймворк (подход), нежели какой-то конкретный алгоритм. Как подход актор-критик не указывает, каким конкретно [policy gradient] методом обучается актор и каким [value based] методом обучается критик. Таким образом актор-критик задает целое [семейство](https://proceedings.neurips.cc/paper_files/paper/1999/file/6449f44a102fde848669bdd9eb6b76fa-Paper.pdf) различных алгоритмов. Рекомендую в качестве шпаргалки использовать упомянутый в тетрадке с REINFORCE [пост из блога Lilian Weng](https://lilianweng.github.io/posts/2018-04-08-policy-gradient/), посвященный наиболее популярным алгоритмам семейства актор-критиков

В данной тетрадке познакомимся с наиболее простым вариантом актор-критика, который так и называют Actor-Critic:

In [1]:
# Cтавим нужные зависимости, если это колаб
try:
    import google.colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip -q install "gymnasium[classic-control, atari, accept-rom-license]"
    !pip -q install piglet
    !pip -q install imageio_ffmpeg
    !pip -q install moviepy==1.0.3

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.5/67.5 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
from collections import deque

import gymnasium as gym
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.distributions import Categorical

%matplotlib inline

In [3]:
env = gym.make("CartPole-v1")
env.reset()

print(f'{env.observation_space=}')
print(f'{env.action_space=}')

n_actions = env.action_space.n
state_dim = env.observation_space.shape
print(f'Action_space: {n_actions} | State_space: {env.observation_space.shape}')

env.observation_space=Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
env.action_space=Discrete(2)
Action_space: 2 | State_space: (4,)


(1 балл)

In [5]:
def to_tensor(x, dtype=np.float32):
    if isinstance(x, torch.Tensor):
        return x
    x = np.asarray(x, dtype=dtype)
    x = torch.from_numpy(x)
    return x

def symlog(x):
    """Compute symlog values for a vector `x`. It's an inverse operation for symexp."""
    return torch.sign(x) * torch.log(torch.abs(x) + 1)

def symexp(x):
    """Compute symexp values for a vector `x`. It's an inverse operation for symlog."""
    return torch.sign(x) * (torch.exp(torch.abs(x)) - 1.0)


class SymExpModule(nn.Module):
    def forward(self, x):
        return symexp(x)

def select_action_eps_greedy(Q, state, epsilon):
    if not isinstance(state, torch.Tensor):
        state = torch.tensor(state, dtype=torch.float32)

    Q_s = Q(state).detach().numpy()

    if np.random.rand() < epsilon:
        action = np.random.randint(len(Q_s))
    else:
        action = np.argmax(Q_s)

    action = int(action)
    return action

def sample_batch(replay_buffer, n_samples):
    batch = np.random.choice(replay_buffer, size=n_samples, replace=False)

    states, actions, rewards, next_states, terminateds = zip(*batch)

    return (
        np.array(states),
        np.array(actions),
        np.array(rewards, dtype=np.float32),
        np.array(next_states),
        np.array(terminateds, dtype=np.float32)
    )


## Shared-body Actor-Critic

Актор и критик могут обучаться в разных режимах — актор только on-policy (шаг обучения на текущей собранной подтраектории), а критик on-policy или off-policy (шаг обучения на текущей подтраектории или на батче из replay buffer). Это с одной стороны привносит гибкость в обучение, с другой — усложняет его.

Если актор и критик оба обучаются on-policy, то имеет смысл объединить их сетки в одну и делать общий шаг обратного распространения ошибки. Однако, если они обучаются в разных режимах (и с разной частотой обновления), то велика вероятность, что их шаги обучения могут начать конфликтовать в случае общего тела — для такого варианта намного предпочтительнее разделить их на разные подсети (либо аккуратно настраивать гиперпарметры, чтобы стабилизировать обучение). В целом, рекомендуется использовать общий энкодер наблюдений, а далее как можно скорее разделять головы.

Сделаем реализацию актор-критика с общим телом и с on-policy вариантом обучения.

In [6]:
class ActorBatch:
    def __init__(self):
        self.logprobs = []
        self.values = []
        self.rewards = []

    def append(self, log_prob, value, reward):
        self.logprobs.append(log_prob)
        self.values.append(value)
        self.rewards.append(reward)

    def clear(self):
        self.logprobs.clear()
        self.values.clear()
        self.rewards.clear()

(3 балла)

In [7]:
class Actor(nn.Module):
    def __init__(self, input_dim, hidden_dims, output_dim):
        super().__init__()
        layers = []
        last_dim = input_dim
        for h in hidden_dims:
            layers.append(nn.Linear(last_dim, h))
            layers.append(nn.ReLU())
            last_dim = h
        self.net = nn.Sequential(*layers)
        self.head = nn.Linear(last_dim, output_dim)

    def forward(self, x):
        if not isinstance(x, torch.Tensor):
            x = torch.tensor(x, dtype=torch.float32)
        h = self.net(x)
        logits = self.head(h)
        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action.item(), log_prob


class Critic(nn.Module):
    def __init__(self, input_dim, hidden_dims):
        super().__init__()
        layers = []
        last_dim = input_dim
        for h in hidden_dims:
            layers.append(nn.Linear(last_dim, h))
            layers.append(nn.ReLU())
            last_dim = h
        self.net = nn.Sequential(*layers)
        self.head = nn.Linear(last_dim, 1)

    def forward(self, x):
        if not isinstance(x, torch.Tensor):
            x = torch.tensor(x, dtype=torch.float32)
        h = self.net(x)
        value = self.head(h).squeeze(-1)
        return value

(6 баллов)

In [8]:
class ActorCriticAgent:
    def __init__(self, state_dim, action_dim, hidden_dims, lr_actor=1e-3, lr_critic=1e-3, gamma=0.99):
        self.gamma = gamma
        self.actor = Actor(state_dim, hidden_dims, action_dim)
        self.critic = Critic(state_dim, hidden_dims)
        self.opt_actor = optim.Adam(self.actor.parameters(), lr=lr_actor)
        self.opt_critic = optim.Adam(self.critic.parameters(), lr=lr_critic)
        self.batch = ActorBatch()

    def act(self, state, reward=None):
        action, log_prob = self.actor(state)
        value = self.critic(state)
        r = reward if reward is not None else 0.0
        self.batch.append(log_prob, value, reward)
        return action

    def update(self):
        if len(self.batch.rewards) == 0:
            return

        rewards = torch.tensor(self.batch.rewards, dtype=torch.float32)
        values = torch.stack(self.batch.values)
        log_probs = torch.stack(self.batch.logprobs)

        returns = torch.zeros_like(values)
        G = 0.0
        for t in reversed(range(len(rewards))):
            G = rewards[t] + self.gamma * G
            returns[t] = G
        returns = returns.detach()
        critic_loss = F.mse_loss(values, returns)
        self.opt_critic.zero_grad()
        critic_loss.backward()
        self.opt_critic.step()
        advantages = returns - values.detach()
        actor_loss = -torch.mean(log_probs * advantages)
        self.opt_actor.zero_grad()
        actor_loss.backward()
        self.opt_actor.step()

        self.batch.clear()

In [10]:
from torch import optim
import torch.nn.functional as F

def run_actor_critic(
        env_name="CartPole-v1",
        hidden_dims=(128, 128),
        lr_actor=1e-3,
        lr_critic=1e-3,
        total_max_steps=100_000,
        train_schedule=16,
        eval_schedule=2000,
        smooth_ret_window=10,
        success_ret=195.
):

    env = gym.make(env_name)
    episode_return_history = deque(maxlen=smooth_ret_window)

    agent = ActorCriticAgent(
        state_dim=env.observation_space.shape[0],
        action_dim=env.action_space.n,
        hidden_dims=hidden_dims,
        lr_actor=lr_actor,
        lr_critic=lr_critic,
    )

    s, _ = env.reset()
    done, episode_return = False, 0.
    eval = False

    for global_step in range(1, total_max_steps + 1):
        a = agent.act(s)
        s_next, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated
        episode_return += r

        agent.batch.rewards[-1] = r

        if done or len(agent.batch.rewards) >= train_schedule:
            agent.update()

        if global_step % eval_schedule == 0:
            eval = True

        s = s_next
        if done:
            if eval:
                episode_return_history.append(episode_return)
                avg_return = np.mean(episode_return_history)
                print(f"{global_step=} | {avg_return=:.3f}")
                if avg_return >= success_ret:
                    print("Решено!")
                    break

            s, _ = env.reset()
            done, episode_return = False, 0.
            eval = False


run_actor_critic(
    eval_schedule=2000,
    total_max_steps=50_000,
)


global_step=2036 | avg_return=73.000
global_step=4273 | avg_return=214.500
Решено!
