Реализуйте алгоритм GAIL на среде Mountain Car. Перед этим сгенерируйте экспертные данные (из детерминированной стратегии с первой практики). Хорошей идеей будет добавить в state (observation) синус и косинус от временной метки t для лучшего обучения.

In [1]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
from torch.distributions.categorical import Categorical
import torch.optim as optim
import torch.nn.functional as F

from collections import deque
import random

In [2]:
# Создание среды
TIME_LIMIT = 250
env = gym.wrappers.TimeLimit(
    gym.make("MountainCar-v0"),
    max_episode_steps=TIME_LIMIT + 1,
)

# Детерминированная стратегия из первой практики
def expert_policy(obs, t):
    position, velocity = obs

    if t <= 50:
        return 0  # Влево
    elif t <= 100:
        return 2  # Вправо
    elif t <= 150:
        return 0  # Влево
    elif t <= 200:
        return 2  # Вправо
    else:
        return 2  # Вправо

# Генерация экспертных данных
def generate_expert_data(env, expert_policy, num_episodes=100):
    states = []
    actions = []

    for episode in range(num_episodes):
        obs, _ = env.reset()
        done = False
        t = 0  # Временная метка

        while not done:
            # Выбираем действие с помощью экспертной стратегии
            action = expert_policy(obs, t)

            # Совершаем шаг в среде
            next_obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Дополняем состояние синусом и косинусом временной метки t
            extended_state = np.append(obs, [np.sin(t), np.cos(t)])
            states.append(extended_state)
            actions.append(action)

            # Обновляем состояние и временную метку
            obs = next_obs
            t += 1

    return np.array(states, dtype=np.float32), np.array(actions, dtype=np.int64)

# Генерируем данные
print("Generating expert data...")
states, actions = generate_expert_data(env, expert_policy, num_episodes=100)

# Вывод информации о данных
print("Количество данных в states:", len(states))
print("Количество данных в actions:", len(actions))

Generating expert data...
Количество данных в states: 17145
Количество данных в actions: 17145


In [3]:
def test_policy(env, expert_policy, num_episodes=10):
    total_rewards = []
    episode_lengths = []

    for episode in range(num_episodes):
        obs, _ = env.reset()
        done = False
        total_reward = 0
        t = 0

        while not done:
            # Используем экспертную стратегию для выбора действия
            action = expert_policy(obs, t)
            obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward
            t += 1

        total_rewards.append(total_reward)
        episode_lengths.append(t)
        print(f"Episode {episode + 1}: Total Reward = {total_reward}, Steps = {t}")

    avg_reward = np.mean(total_rewards)
    avg_length = np.mean(episode_lengths)
    print(f"Average Reward: {avg_reward:.2f}, Average Episode Length: {avg_length:.2f}")

# Тестируем стратегию
print("Testing policy...")
test_policy(env, expert_policy, num_episodes=10)

Testing policy...
Episode 1: Total Reward = -186.0, Steps = 186
Episode 2: Total Reward = -184.0, Steps = 184
Episode 3: Total Reward = -185.0, Steps = 185
Episode 4: Total Reward = -184.0, Steps = 184
Episode 5: Total Reward = -185.0, Steps = 185
Episode 6: Total Reward = -184.0, Steps = 184
Episode 7: Total Reward = -184.0, Steps = 184
Episode 8: Total Reward = -185.0, Steps = 185
Episode 9: Total Reward = -184.0, Steps = 184
Episode 10: Total Reward = -184.0, Steps = 184
Average Reward: -184.50, Average Episode Length: 184.50


In [4]:
# Размерности для среды MountainCar
obs_dim = 4  # 2 исходных признака + sin(t) + cos(t)
act_dim = env.action_space.n  # Количество действий в среде

# Копирование экспертных данных
expert_obs = np.copy(states)  # Состояния с добавленными sin(t) и cos(t)
expert_acts = np.copy(actions)  # Действия эксперта

print(f"Размерность состояний (obs_dim): {obs_dim}")
print(f"Размерность действий (act_dim): {act_dim}")


Размерность состояний (obs_dim): 4
Размерность действий (act_dim): 3


In [5]:
class Policy(nn.Module):
    def __init__(self, obs_dim, act_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim, 64),
            nn.ReLU(),
            nn.Linear(64, act_dim)
        )

    def forward(self, obs):
        # Преобразуем входные данные в тензор, если они не являются тензором
        if not isinstance(obs, torch.Tensor):
            obs = torch.tensor(obs, dtype=torch.float32)
        logits = self.net(obs)
        return Categorical(logits=logits)

    def get_action(self, obs):
        # Получаем распределение действий
        dist = self.forward(obs)
        # Выбираем действие
        action = dist.sample()
        return action.item()

    def get_log_prob(self, obs, actions):
        # Получаем распределение действий
        dist = self.forward(obs)
        # Возвращаем логарифм вероятности выбранных действий
        return dist.log_prob(actions)

In [6]:
class Discriminator(nn.Module):
    def __init__(self, obs_dim, act_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_dim + act_dim, 64), nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, obs, act):
        # Преобразуем действия в one-hot encoding
        act_onehot = F.one_hot(act, num_classes=3).float()  # num_classes = 3 для MountainCar-v0
        
        # Объединяем состояние и one-hot encoded действия
        x = torch.cat([obs, act_onehot], dim=1)
        
        # Пропускаем через сеть
        return self.net(x)



In [7]:
class TrajectoryBuffer:
    def __init__(self):
        # Инициализация списков для хранения данных
        self.obs, self.acts, self.rews = [], [], []

    def store(self, o, a, r):
        # Добавление состояния, действия и награды в буфер
        self.obs.append(o)
        self.acts.append(a)
        self.rews.append(r)

    def get(self):
        # Преобразование данных в тензоры PyTorch
        obs_tensor = torch.tensor(np.array(self.obs), dtype=torch.float32)
        acts_tensor = torch.tensor(np.array(self.acts), dtype=torch.long)
        rews_tensor = torch.tensor(np.array(self.rews), dtype=torch.float32)
        
        # Очистка буфера после извлечения данных
        self.clear()
        
        return obs_tensor, acts_tensor, rews_tensor

    def clear(self):
        # Очистка буфера
        self.obs, self.acts, self.rews = [], [], []

In [8]:
env = gym.make("MountainCar-v0", )
policy = Policy(obs_dim, act_dim)
discrim = Discriminator(obs_dim, act_dim)

policy_opt = optim.Adam(policy.parameters(), lr=1e-3)
discrim_opt = optim.Adam(discrim.parameters(), lr=1e-3)

In [9]:
for epoch in range(3000):
    buf = TrajectoryBuffer()
    obs, _ = env.reset()
    done = False
    total_reward = 0
    t = 0  # Временная метка

    while not done:
        # Добавляем sin(t) и cos(t) к состоянию
        augmented_obs = np.append(obs, [np.sin(t), np.cos(t)])
        
        # Преобразуем состояние в тензор
        obs_tensor = torch.tensor(augmented_obs, dtype=torch.float32).unsqueeze(0)
        
        # Получаем действие от политики
        action = policy.get_action(obs_tensor)
        
        # Совершаем шаг в среде
        next_obs, _, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        # Сохраняем данные в буфер
        buf.store(augmented_obs, action, 0)
        
        # Обновляем временную метку и состояние
        t += 1
        obs = next_obs

    # Получаем данные из буфера
    agent_obs, agent_acts, _ = buf.get()

    # Выбираем случайные данные эксперта
    idxs = np.random.choice(len(expert_obs), len(agent_obs), replace=False)
    exp_obs = torch.tensor(expert_obs[idxs], dtype=torch.float32)
    exp_acts = torch.tensor(expert_acts[idxs], dtype=torch.long)

    # Обучение дискриминатора
    for _ in range(2):
        discrim_opt.zero_grad()

        # Предсказания дискриминатора
        agent_preds = discrim(agent_obs, agent_acts)
        expert_preds = discrim(exp_obs, exp_acts)

        # Бинарная кросс-энтропия
        disc_loss = -(torch.log(expert_preds + 1e-8).mean() - torch.log(1 - agent_preds + 1e-8).mean())

        disc_loss.backward()
        discrim_opt.step()

    # Получение награды от дискриминатора
    with torch.no_grad():
        rewards = -torch.log(1 - discrim(agent_obs, agent_acts) + 1e-8)

    # Обучение политики
    policy_opt.zero_grad()

    # Логарифмы вероятностей выбранных действий
    log_probs = policy.get_log_prob(agent_obs, agent_acts)

    # Потери для стратегии
    loss = -(rewards * log_probs).mean()

    loss.backward()
    policy_opt.step()

    # Вывод информации каждые 10 эпох
    if epoch % 10 == 0:
        print(f"Epoch {epoch}: GAIL Loss {loss.item():.3f}, Disc Loss {disc_loss.item():.3f}")


Epoch 0: GAIL Loss 0.756, Disc Loss 0.008
Epoch 10: GAIL Loss 0.988, Disc Loss -0.387
Epoch 20: GAIL Loss 1.300, Disc Loss -0.815
Epoch 30: GAIL Loss 1.728, Disc Loss -1.316
Epoch 40: GAIL Loss 2.326, Disc Loss -1.947
Epoch 50: GAIL Loss 3.098, Disc Loss -2.749
Epoch 60: GAIL Loss 4.076, Disc Loss -3.730
Epoch 70: GAIL Loss 5.387, Disc Loss -4.986
Epoch 80: GAIL Loss 7.332, Disc Loss -6.512
Epoch 90: GAIL Loss 9.272, Disc Loss -8.375
Epoch 100: GAIL Loss 11.577, Disc Loss -10.498
Epoch 110: GAIL Loss 14.317, Disc Loss -12.920
Epoch 120: GAIL Loss 17.072, Disc Loss -15.594
Epoch 130: GAIL Loss 20.244, Disc Loss -18.421
Epoch 140: GAIL Loss 19.981, Disc Loss -18.421
Epoch 150: GAIL Loss 19.910, Disc Loss -18.421
Epoch 160: GAIL Loss 20.292, Disc Loss -18.421
Epoch 170: GAIL Loss 20.108, Disc Loss -18.421
Epoch 180: GAIL Loss 19.904, Disc Loss -18.421
Epoch 190: GAIL Loss 20.288, Disc Loss -18.421
Epoch 200: GAIL Loss 20.293, Disc Loss -18.421
Epoch 210: GAIL Loss 20.190, Disc Loss -18.42

Протестируйте ваш алгоритм

In [10]:
for episode in range(10):
    obs, _ = env.reset()  # Получаем начальное состояние (игнорируем info)
    done = False
    total_reward = 0
    t = 0  # Временная метка

    while not done:
        # Добавляем sin(t) и cos(t) к состоянию
        augmented_obs = np.append(obs, [np.sin(t), np.cos(t)])
        
        # Преобразуем состояние в тензор
        obs_tensor = torch.tensor(augmented_obs, dtype=torch.float32).unsqueeze(0)
        
        # Получаем действие от политики
        action = policy.get_action(obs_tensor)
        
        # Совершаем шаг в среде
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        # Обновляем состояние и награду
        obs = next_obs
        total_reward += reward
        t += 1

    print(f"Episode {episode + 1}: Total Reward = {total_reward}")
env.close()

Episode 1: Total Reward = -200.0
Episode 2: Total Reward = -200.0
Episode 3: Total Reward = -200.0
Episode 4: Total Reward = -200.0
Episode 5: Total Reward = -200.0
Episode 6: Total Reward = -200.0
Episode 7: Total Reward = -200.0
Episode 8: Total Reward = -200.0
Episode 9: Total Reward = -200.0
Episode 10: Total Reward = -200.0
