## Градиент стратегии: REINFORCE.


Теорема о градиенте стратегии связывает градиент целевой функции и градиент самой стратегии:

$$\nabla_\theta J(\theta) = \mathbb{E}_\pi [Q^\pi(s, a) \nabla_\theta \ln \pi_\theta(a \vert s)]$$

Если использовать метод Монте-Карло в качестве несмещенной оценки $Q^\pi(s, a)$ отдачу $R_t$, то тогда происходит переход к алгоритму REINFORCE и обновление весов будет осуществляться по правилу:

$$\nabla_\theta J(\theta) = [R_t \nabla_\theta \ln \pi_\theta(A_t \vert S_t)]$$


In [1]:
try:
    import google.colab

    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip -q install "gymnasium[classic-control, atari, accept-rom-license]"
    !pip -q install piglet
    !pip -q install imageio_ffmpeg
    !pip -q install moviepy==1.0.3

In [2]:
import torch
import torch.nn as nn
from torch.distributions import Categorical
import gymnasium as gym
import numpy as np

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

### Основной цикл


In [3]:
def print_mean_reward(step, episode_rewards):
    if not episode_rewards:
        return

    t = min(50, len(episode_rewards))
    mean_reward = sum(episode_rewards[-t:]) / t
    print(f"step: {str(step).zfill(6)}, mean reward: {mean_reward:.2f}")
    return mean_reward


class Rollout:
    def __init__(self):
        self.logprobs = []
        self.rewards = []
        self.is_terminals = []

    def append(self, log_prob, reward, done):
        self.logprobs.append(log_prob)
        self.rewards.append(reward)
        self.is_terminals.append(done)


def run(
    env: gym.Env,
    hidden_size: int,
    lr: float,
    gamma: float,
    max_episodes: int,
    rollout_size: int,
):
    # Инициализируйте агента `agent`
    """<codehere>"""
    agent = ReinforceAgent(
        env.observation_space.shape[0], env.action_space.n, hidden_size, lr, gamma
    )
    """</codehere>"""

    step = 0
    rollout = Rollout()
    episode_rewards = []
    episodes_visual = []
    for i_episode in range(1, max_episodes + 1):
        cumulative_reward = 0
        terminated = False
        state, _ = env.reset()
        episode_gif = []
        while not terminated:
            step += 1

            action, log_prob = agent.act(state)
            state, reward, terminated, truncated, _ = env.step(action)
            # сохраняем награды и флаги терминальных состояний:
            rollout.append(log_prob, reward, terminated)
            cumulative_reward += reward
            terminated |= truncated
        episode_rewards.append(cumulative_reward)

        # выполняем обновление
        if len(rollout.rewards) >= rollout_size:
            agent.update(rollout)
            mean_reward = print_mean_reward(step, episode_rewards)
            if mean_reward >= 200:
                print("Принято!")
                return
            rollout = Rollout()
            episode_rewards = []
    return episodes_visual

In [4]:
# Реализуйте класс, задающий стратегию агента.
# Подсказки:
#     1) можно воспользоваться базовым классом `torch.nn.Module`,
#     2) размер нейронной сети можно выбрать таким: (input_dim, hidden_dim, output_dim),
#     3) в качестве функции активации возьмите гиперболический тангенс или ReLU
#     4) подумайте, как получить на выходе из нейронной сети вероятности действий,
#     5) для выбора действия в соответствии со стратегией, можно воспользоваться `torch.distributions.Categorical`
#     6) помните, что помимо самого действия вам позже также пригодится логарифм его вероятности
"""<codehere>"""


class MLPModel(nn.Module):
    def __init__(self, state_dim, action_dim, hidden):
        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden),
            nn.Tanh(),
            nn.Linear(hidden, hidden),
            nn.Tanh(),
            nn.Linear(hidden, action_dim),
            nn.Softmax(dim=-1),
        )

    def forward(self, state):
        state = torch.from_numpy(state).float().to(device)

        action_probs = self.net(state)  # [0.1, 0.2, 0.7 ]
        # 100 200 200 100 20
        # np.random.choice([0,1,2], p = [0.1, 0.2, 0.7])
        dist = Categorical(action_probs)
        action = dist.sample()  # -> 2

        return action.item(), dist.log_prob(action)


"""</codehere>"""


class ReinforceAgent:
    def __init__(self, state_dim, action_dim, n_latent_var, lr, gamma):
        self.lr = lr
        self.gamma = gamma

        # Инициализируйте стратегию агента и SGD оптимизатор (например, `torch.optim.Adam`)
        """<codehere>"""
        self.policy = MLPModel(state_dim, action_dim, n_latent_var).to(device)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=lr)
        """</codehere>"""

    def act(self, state):
        # Произведите выбор действия и верните кортеж (действие, логарифм вероятности этого действия)
        """<codehere>"""
        return self.policy(state)
        """</codehere>"""

    def update(self, rollout: Rollout):
        # Конвертируйте накопленный список вознаграждений в список отдач. Назовем его `rewards`
        # Подсказки:
        #    1) обход списка стоит делать в обратном порядке,
        #    2) не забывайте сбрасывать отдачу при окончании эпизода
        # rewards = !!! returns
        """<codehere>"""
        rollout_len = len(rollout.rewards)
        rewards = np.empty(rollout_len, dtype=float)
        ret = 0

        # R(1) = 1 + gamma(R(2)) - 1
        # R(2) = 3 + gamma(R(3)) - # 5, 1 -> V[s] - aggr
        # R(3) = 2 + gamma(R(4)) - 6
        # ret = R(4)
        # R(4) = 1 + gamma(0) - 1

        for i in reversed(range(rollout_len)):
            reward, is_terminal = rollout.rewards[i], rollout.is_terminals[i]
            ret = reward + (self.gamma * ret * (not is_terminal))
            rewards[i] = ret
        """</codehere>"""

        # Выполните нормализацию вознаграждений (отдач)
        # rewards =
        """<codehere>"""
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-7)
        """</codehere>"""

        # Вычислите ошибку `loss` и произведите шаг обновления градиентным спуском
        # Подсказки: используйте `.to(device)`, чтобы разместить тензор на соотв. цпу/гпу
        """<codehere>"""
        rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
        logprobs = torch.stack(rollout.logprobs).to(device)
        # print(f'R: {rewards[:3]}, logprobs: {logprobs[:3]}')

        loss = -logprobs * rewards
        loss = loss.mean()

        print(f"L: {loss}")

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        """</codehere>"""

### Определяем гиперпараметры и запускаем обучение


In [5]:
from gymnasium.wrappers.time_limit import TimeLimit

env_name = "CartPole-v1"

run(
    env=TimeLimit(gym.make(env_name), 1000),
    max_episodes=50000,  # количество эпизодов обучения
    hidden_size=64,  # кол-во переменных в скрытых слоях
    rollout_size=500,  # через столько шагов стратегия будет обновляться
    lr=0.01,  # learning rate
    gamma=0.995,  # дисконтирующий множитель,
)

L: -0.004577735438942909
step: 000541, mean reward: 21.64
L: -0.008220987394452095
step: 001074, mean reward: 38.07
L: 0.011626758612692356
step: 001604, mean reward: 48.18
L: 0.005892101675271988
step: 002148, mean reward: 90.67
L: 0.012964273802936077
step: 002689, mean reward: 60.11
L: -0.007339139934629202
step: 003220, mean reward: 44.25
L: 0.007557057309895754
step: 003752, mean reward: 106.40
L: 0.02649216167628765
step: 004267, mean reward: 73.57
L: 0.0035177310928702354
step: 004862, mean reward: 99.17
L: -0.018950680270791054
step: 005369, mean reward: 126.75
L: 0.005300226155668497
step: 005982, mean reward: 153.25
L: -0.008382213301956654
step: 006596, mean reward: 153.50
L: 0.005981395486742258
step: 007149, mean reward: 110.60
L: -0.025074629113078117
step: 007792, mean reward: 128.60
L: -0.00324205681681633
step: 008304, mean reward: 170.67
L: -0.005932739470154047
step: 008804, mean reward: 250.00
Принято!


In [7]:
"""<comment>"""

from codehere import convert
import os

os.makedirs("render", exist_ok=True)

nb_name = "reinforce"
convert(
    file=f"{nb_name}.ipynb",
    outfile=f"render/{nb_name}.ipynb",
    clear=True,
    replacement=" Здесь ваш код ",
)
convert(
    file=f"{nb_name}.ipynb",
    outfile=f"render/{nb_name}-solution.ipynb",
    clear=True,
    solution=True,
    replacement=" Здесь ваш код ",
)
"""</comment>"""

ModuleNotFoundError: No module named 'codehere'