<a href="https://colab.research.google.com/github/sepety/RL_Otus/blob/main/DDPG_Home_work.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**DDPG (Deep Deterministic Policy Gradient)**

DDPG (Deep Deterministic Policy Gradient) — это алгоритм обучения с подкреплением (RL), который подходит для сред с непрерывным пространством действий. Он относится к семейству методов Actor-Critic, где у нас есть две основные сети:

Actor (политика): аппроксимирует детерминированную стратегию
𝜇
(
𝑠
∣
𝜃
𝜇
)
μ(s∣θ
μ
​
 ), которая по состоянию
𝑠
s выдаёт действие
𝑎
a.
Critic (ценностная функция): аппроксимирует функцию Q-value,
𝑄
(
𝑠
,
𝑎
∣
𝜃
𝑄
)
Q(s,a∣θ
Q
​
 ), которая оценивает качество действия
𝑎
a в состоянии
𝑠
s.


**Математические основы**


Целевая функция для Critic:
Если агент исполняет стратегию
𝜇
μ, для обновления критика мы используем приближение целевого Q-значения:

𝑦
=
𝑟
+
𝛾
𝑄
′
(
𝑠
′
,
𝜇
′
(
𝑠
′
)
∣
𝜃
𝑄
′
)
.
y=r+γQ
′
 (s
′
 ,μ
′
 (s
′
 )∣θ
Q
′

​
 ).
Здесь:

𝑟
r — награда за текущее действие
𝛾
γ — коэффициент дисконтирования (обычно
𝛾
∈
[
0.9
,
0.99
]
γ∈[0.9,0.99])
𝑄
′
Q
′
  и
𝜇
′
μ
′
  — целевые сети критика и актёра, служащие для стабилизации обучения.
Критик обновляется путём минимизации ошибки МСЕ между предсказанным
𝑄
(
𝑠
,
𝑎
)
Q(s,a) и целевым значением
𝑦
y.

Обновление Actor:
Actor обновляется посредством политики градиента:

∇
𝜃
𝜇
𝐽
≈
𝐸
[
∇
𝑎
𝑄
(
𝑠
,
𝑎
∣
𝜃
𝑄
)
∇
𝜃
𝜇
𝜇
(
𝑠
∣
𝜃
𝜇
)
]
,
∇
θ
μ
​

​
 J≈E[∇
a
​
 Q(s,a∣θ
Q
​
 )∇
θ
μ
​

​
 μ(s∣θ
μ
​
 )],
что интуитивно означает улучшение стратегии так, чтобы увеличивать Q-значения выбранных действий.

**Установка библиотек**

устанавливка необходимых библиотек: gymnasium с поддержкой box2d для среды CarRacing, а также matplotlib, torch, torchvision и opencv-python для нейронных сетей и обработки изображений.

In [1]:
!pip install gymnasium[box2d]

Collecting gymnasium[box2d]
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[box2d])
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting swig==4.* (from gymnasium[box2d])
  Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m35.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━

In [None]:

!pip install matplotlib torch torchvision opencv-python





Импортируем все необходимые модули.
Создаем среду CarRacing-v3 с непрерывным пространством действий.
Сбрасываем среду и выводим размеры пространств состояний и действий для проверки.
Закрываем среду после проверки.

In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import matplotlib.pyplot as plt
import cv2  # Для предварительной обработки изображений

# Проверяем работу среды
env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=True)
obs, info = env.reset()
print("Observation Space:", env.observation_space)
print("Action Space:", env.action_space)
env.close()


Observation Space: Box(0, 255, (96, 96, 3), uint8)
Action Space: Box([-1.  0.  0.], 1.0, (3,), float32)


Повторно создаем среду CarRacing-v3.
Определяем форму состояния после преобразования изображения в одноканальное (градации серого) и уменьшения размера до 64x64.
Определяем размерность действия (обычно 3: поворот, газ, тормоз) и максимальное действие (1.0).
Выводим эти значения для проверки.


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random
import matplotlib.pyplot as plt

# Создаем среду
env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=True)

# Определяем форму состояния после предварительной обработки
state_shape = (1, 64, 64)  # Одноканальное изображение размера 64x64

# Получаем размерность действий и максимальное значение действия из среды
action_dim = env.action_space.shape[0]
max_action = env.action_space.high[0]

print(f"State Shape: {state_shape}")
print(f"Action Dimension: {action_dim}")
print(f"Max Action: {max_action}")



State Shape: (1, 64, 64)
Action Dimension: 3
Max Action: 1.0


Устанавливаем дополнительные системные пакеты, которые могут потребоваться для сборки некоторых зависимостей. Обычно в Colab или подобных средах это может быть не обязательно, но на локальной машине может пригодиться.

In [None]:
import pickle
import os


In [None]:
!apt-get install -y swig build-essential cmake
!pip install swig


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
build-essential is already the newest version (12.9ubuntu3).
cmake is already the newest version (3.22.1-1ubuntu1.22.04.2).
The following additional packages will be installed:
  swig4.0
Suggested packages:
  swig-doc swig-examples swig4.0-examples swig4.0-doc
The following NEW packages will be installed:
  swig swig4.0
0 upgraded, 2 newly installed, 0 to remove and 49 not upgraded.
Need to get 1,116 kB of archives.
After this operation, 5,542 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig4.0 amd64 4.0.2-1ubuntu1 [1,110 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/universe amd64 swig all 4.0.2-1ubuntu1 [5,632 B]
Fetched 1,116 kB in 1s (754 kB/s)
Selecting previously unselected package swig4.0.
(Reading database ... 123630 files and directories currently installed.)
Preparing to unpack .../swig4.0_4.0.2-1ubuntu1_amd64.deb ...
Un

Это класс буфера воспроизведения, в котором хранится опыт агента. При обучении мы будем брать из него случайные переходы, чтобы избежать корреляций и улучшить стабильность обучения.

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        actions = np.array(actions)
        rewards = np.array(rewards)
        dones = np.array(dones)
        return states, actions, rewards, next_states, dones


    def __len__(self):
        return len(self.buffer)


Функция уменьшает размер входного кадра, делает его одноканальным и нормализует. Это уменьшает вычислительную нагрузку и упрощает извлечение признаков из изображения.

In [None]:
def preprocess_state(state):
    """
    Преобразует состояние в тензор с уменьшенным размером изображения и в оттенках серого.
    """
    state = cv2.resize(state, (64, 64))  # Изменение размера изображения
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)  # Преобразование в оттенки серого
    state = state / 255.0  # Нормализация
    state = np.expand_dims(state, axis=0)  # Добавляем канал (1, 64, 64)
    return torch.tensor(state, dtype=torch.float32)


Акторная сеть обрабатывает изображение, извлекает признаки и предсказывает действие в непрерывном пространстве. Использование свёрток позволяет сети самостоятельно учить важные визуальные признаки.

Критик оцениват качество действия в данном состоянии, выдавая Q-значение. Это позволяет актору улучшать свою стратегию.

In [None]:
class Actor(nn.Module):
    def __init__(self, state_shape, action_dim, max_action):
        super(Actor, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)

        # Вычисляем размер выхода после свёрток
        self._init_conv_output(state_shape)

        self.fc1 = nn.Linear(self.conv_output_dim, 256)
        self.fc2 = nn.Linear(256, action_dim)
        self.max_action = max_action

    def _init_conv_output(self, shape):
        with torch.no_grad():
            input = torch.zeros(1, *shape)
            x = F.relu(self.conv1(input))
            x = F.relu(self.conv2(x))
            self.conv_output_dim = x.view(1, -1).shape[1]

    def forward(self, state):
        x = F.relu(self.conv1(state))
        x = F.relu(self.conv2(x))
        x = x.view(-1, self.conv_output_dim)
        x = F.relu(self.fc1(x))
        x = torch.tanh(self.fc2(x)) * self.max_action
        return x

class Critic(nn.Module):
    def __init__(self, state_shape, action_dim):
        super(Critic, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)

        # Вычисляем размер выхода после свёрток
        self._init_conv_output(state_shape)

        self.fc1 = nn.Linear(self.conv_output_dim + action_dim, 256)
        self.fc2 = nn.Linear(256, 1)

    def _init_conv_output(self, shape):
        with torch.no_grad():
            input = torch.zeros(1, *shape)
            x = F.relu(self.conv1(input))
            x = F.relu(self.conv2(x))
            self.conv_output_dim = x.view(1, -1).shape[1]

    def forward(self, state, action):
        x = F.relu(self.conv1(state))
        x = F.relu(self.conv2(x))
        x = x.view(-1, self.conv_output_dim)
        x = torch.cat([x, action], dim=1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x



Агент реализует DDPG. Он использует буфер, целевые сети и мягкое обновление. Есть методы для сохранения и загрузки моделей.

In [None]:
class DDPGAgent:
    def __init__(self, state_shape, action_dim, max_action):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.action_dim = action_dim
        self.max_action = max_action

        self.actor = Actor(state_shape, action_dim, max_action).to(self.device)
        self.actor_target = Actor(state_shape, action_dim, max_action).to(self.device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)

        self.critic = Critic(state_shape, action_dim).to(self.device)
        self.critic_target = Critic(state_shape, action_dim).to(self.device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

        self.replay_buffer = ReplayBuffer(100000)
        self.gamma = 0.99
        self.tau = 0.005

        self.best_avg_reward = -np.inf

    def select_action(self, state):
        state = preprocess_state(state).unsqueeze(0).to(self.device)  # Добавляем batch размерность
        self.actor.eval()  # Переводим модель в режим оценки
        with torch.no_grad():
            action = self.actor(state).cpu().numpy()[0]
        self.actor.train()  # Возвращаем модель в режим тренировки
        # Добавляем шум для исследования
        noise = np.random.normal(0, self.max_action * 0.1, size=self.action_dim)
        action = action + noise
        action = np.clip(action, -self.max_action, self.max_action)
        return action

    def train(self, batch_size):
        if len(self.replay_buffer) < batch_size:
            return

        # Извлекаем батч из реплей буфера
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)

        # Преобразуем списки тензоров в тензоры батча
        states = torch.stack(states).to(self.device)
        next_states = torch.stack(next_states).to(self.device)
        actions = torch.tensor(actions, dtype=torch.float32).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1).to(self.device)

        # Обновление Critic
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_Q = self.critic_target(next_states, next_actions)
            target_Q = rewards + (1 - dones) * self.gamma * target_Q

        current_Q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_Q, target_Q)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Обновление Actor
        actor_loss = -self.critic(states, self.actor(states)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Мягкое обновление целевых сетей
        self.soft_update(self.actor_target, self.actor)
        self.soft_update(self.critic_target, self.critic)

    def soft_update(self, target_net, source_net):
        for target_param, param in zip(target_net.parameters(), source_net.parameters()):
            target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)

    def save_models(self, episode):
        torch.save(self.actor.state_dict(), os.path.join(model_dir, f'ddpg_actor_episode_{episode}.pth'))
        torch.save(self.critic.state_dict(), os.path.join(model_dir, f'ddpg_critic_episode_{episode}.pth'))
        print(f"Models saved at episode {episode}")

    def load_models(self, actor_path, critic_path):
        self.actor.load_state_dict(torch.load(os.path.join(model_dir, actor_path)))
        self.critic.load_state_dict(torch.load(os.path.join(model_dir, critic_path)))
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        print("Models loaded successfully")

    def save_models(self, episode):
        torch.save({
        'actor_state_dict': self.actor.state_dict(),
        'critic_state_dict': self.critic.state_dict(),
        'actor_optimizer_state_dict': self.actor_optimizer.state_dict(),
        'critic_optimizer_state_dict': self.critic_optimizer.state_dict(),
        }, os.path.join(model_dir, f'ddpg_checkpoint_episode_{episode}.pth'))
        print(f"Models and optimizers saved at episode {episode}")
    def load_models(self, checkpoint_path):
        checkpoint = torch.load(os.path.join(model_dir, checkpoint_path))
        self.actor.load_state_dict(checkpoint['actor_state_dict'])
        self.critic.load_state_dict(checkpoint['critic_state_dict'])
        self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer_state_dict'])
        self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer_state_dict'])
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        print("Models and optimizers loaded successfully")



In [None]:
import os

model_dir = '/content/drive/MyDrive/DDPG AGENT'
os.makedirs(model_dir, exist_ok=True)


Создаем директорию для сохранения моделей. Если вы работаете в Google Colab и подключили Google Drive, это создаст папку в вашем облачном хранилище.



Выполняется основной цикл обучения на 500 эпизодов.
Для каждого эпизода агент взаимодействует со средой, собирает награды и сохраняет переходы в буфер.
Обновление актёра и критика выполняется после набора достаточного количества переходов.
Каждые 5 эпизодов печатается средняя награда, чтобы отслеживать прогресс обучения.

In [None]:
# Инициализация агента
agent = DDPGAgent(state_shape, action_dim, max_action)
episodes = 500  # Можно увеличить количество эпизодов для лучшего обучения
batch_size = 16
episode_rewards = []

for episode in range(episodes):
    state, _ = env.reset()
    episode_reward = 0
    done = False
    steps = 0  # Счетчик шагов в эпизоде

    while not done:
        action = agent.select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Предобрабатываем состояния
        state_processed = preprocess_state(state)
        next_state_processed = preprocess_state(next_state)

        agent.replay_buffer.add(state_processed, action, reward, next_state_processed, done)
        agent.train(batch_size)

        state = next_state
        episode_reward += reward
        steps += 1

        if steps >= 1000:
            # Ограничиваем максимальное количество шагов в эпизоде для ускорения обучения
            break

    episode_rewards.append(episode_reward)

    if (episode + 1) % 5 == 0:
        avg_reward = np.mean(episode_rewards[-5:])
        print(f"Episode {episode + 1}, Average Reward: {avg_reward:.2f}")

env.close()


# После обучения построение графика
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Training Reward per Episode')
plt.show()


Episode 5, Average Reward: -81.57
Episode 10, Average Reward: -77.91
Episode 15, Average Reward: -82.39
Episode 20, Average Reward: -81.82
Episode 25, Average Reward: -64.67
Episode 30, Average Reward: -57.68
Episode 35, Average Reward: -61.23
Episode 40, Average Reward: -58.13
Episode 45, Average Reward: -27.66
Episode 50, Average Reward: -36.34
Episode 55, Average Reward: -58.34
Episode 60, Average Reward: -33.99
Episode 65, Average Reward: -44.32
Episode 70, Average Reward: -15.60
Episode 75, Average Reward: -37.79
Episode 80, Average Reward: -8.40
Episode 85, Average Reward: -5.61
Episode 90, Average Reward: -44.17
Episode 95, Average Reward: -27.59


In [None]:
# Построение графика награды по эпизодам
plt.figure(figsize=(12, 6))
plt.plot(episode_rewards, label='Episode Reward')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Training Reward per Episode')
plt.legend()
plt.grid(True)
plt.show()


NameError: name 'plt' is not defined

In [None]:
!pip install imageio_ffmpeg
!pip install imageio




In [None]:
import os
from gymnasium.wrappers import RecordVideo

# Указываем путь для сохранения видео
video_folder = 'videos'
os.makedirs(video_folder, exist_ok=True)

# Создаем среду с обёрткой RecordVideo
env = gym.make("CarRacing-v3", render_mode="rgb_array", continuous=True)
env = RecordVideo(env, video_folder=video_folder, episode_trigger=lambda episode_id: True)

state, _ = env.reset()
done = False
total_reward = 0
agent.actor.eval()  # Переводим модель в режим оценки

while not done:
    state_processed = preprocess_state(state).unsqueeze(0).to(agent.device)
    with torch.no_grad():
        action = agent.actor(state_processed).cpu().numpy()[0]
    state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated
    total_reward += reward

env.close()
print(f"Total Reward: {total_reward}")


  logger.warn(


Total Reward: -85.13011152416297


In [None]:
import io
import base64
from IPython.display import HTML

# Функция для отображения видео внутри ноутбука
def show_video(video_path):
    video = io.open(video_path, 'r+b').read()
    encoded = base64.b64encode(video)
    HTML(data='''
        <video width="640" height="480" controls>
            <source src="data:video/mp4;base64,{0}" type="video/mp4">
        </video>
    '''.format(encoded.decode('ascii')))

# Получаем путь к последнему записанному видео
import glob
list_of_videos = glob.glob(os.path.join(video_folder, '*.mp4'))
latest_video = max(list_of_videos, key=os.path.getctime)

# Отображаем видео
show_video(latest_video)


In [None]:
num_episodes = 3  # Количество эпизодов для записи

for episode in range(num_episodes):
    state, _ = env.reset()
    done = False
    total_reward = 0
    agent.actor.eval()  # Переводим модель в режим оценки

    while not done:
        state_processed = preprocess_state(state).unsqueeze(0).to(agent.device)
        with torch.no_grad():
            action = agent.actor(state_processed).cpu().numpy()[0]
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")


Episode 1, Total Reward: -83.27759197324364
Episode 2, Total Reward: -83.05084745762663
Episode 3, Total Reward: -83.27759197324364


In [None]:
# Выводим список всех видео
for idx, video_path in enumerate(list_of_videos):
    print(f"{idx + 1}: {video_path}")

# Вводим номер видео для отображения
video_number = int(input("Enter the number of the video to display: ")) - 1
selected_video = list_of_videos[video_number]

# Отображаем выбранное видео
show_video(selected_video)


1: videos/rl-video-episode-0.mp4
Enter the number of the video to display: 1


In [None]:
torch.cuda.is_available()


NameError: name 'torch' is not defined