**INSTALLING THE MODULE**

In [1]:
!pip install swig

Collecting swig
  Downloading swig-4.2.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.6 kB)
Downloading swig-4.2.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━[0m [32m1.1/1.9 MB[0m [31m31.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.2.1


In [2]:
!pip install gymnasium[box2d] torch moviepy

Collecting gymnasium[box2d]
  Downloading gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[box2d])
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-c

**IMPORTING THE MODULE**

In [3]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gym
from collections import deque
import random

**DEFINE ACTOR NETWORK**

In [4]:
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Actor, self).__init__()
        self.conv1 = nn.Conv2d(state_dim[0], 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1_input_dim = self._get_conv_output(state_dim)
        self.fc1 = nn.Linear(self.fc1_input_dim, 256)
        self.fc2 = nn.Linear(256, action_dim)

    def _get_conv_output(self, shape):
        o = torch.zeros(1, *shape)
        o = self.conv1(o)
        o = self.conv2(o)
        o = self.conv3(o)
        return int(np.prod(o.size()))

    def forward(self, state):
        x = torch.relu(self.conv1(state))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.reshape(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return torch.tanh(self.fc2(x))

**DEFINE CRITIC NETWORK**

In [5]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.conv1 = nn.Conv2d(state_dim[0], 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1_input_dim = self._get_conv_output(state_dim) + action_dim
        self.fc1 = nn.Linear(self.fc1_input_dim, 256)
        self.fc2 = nn.Linear(256, 1)

    def _get_conv_output(self, shape):
        o = torch.zeros(1, *shape)
        o = self.conv1(o)
        o = self.conv2(o)
        o = self.conv3(o)
        return int(np.prod(o.size()))

    def forward(self, state, action):
        x = torch.relu(self.conv1(state))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.reshape(x.size(0), -1)
        x = torch.cat([x, action], dim=1)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

**REPLAY BUFFER**

In [6]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        return np.stack(state), np.stack(action), reward, np.stack(next_state), done

    def __len__(self):
        return len(self.buffer)

**NOISE**

In [7]:
class OrnsteinUhlenbeckNoise:
    def __init__(self, mu, theta=0.15, sigma=0.2):
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.state = np.zeros_like(self.mu)

    def __call__(self):
        dx = self.theta * (self.mu - self.state) + self.sigma * np.random.randn(len(self.mu))
        self.state += dx
        return self.state

**DDPG ALGORITHM**

In [8]:
class DDPG:
    def __init__(self, state_dim, action_dim, actor_lr=1e-4, critic_lr=1e-4, gamma=0.99, tau=1e-2, buffer_size=100000, batch_size=128):
        self.actor = Actor(state_dim, action_dim).cuda()
        self.critic = Critic(state_dim, action_dim).cuda()
        self.target_actor = Actor(state_dim, action_dim).cuda()
        self.target_critic = Critic(state_dim, action_dim).cuda()
        self.target_actor.load_state_dict(self.actor.state_dict())
        self.target_critic.load_state_dict(self.critic.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.replay_buffer = ReplayBuffer(buffer_size)
        self.batch_size = batch_size
        self.gamma = gamma
        self.tau = tau
        self.action_dim = action_dim
        self.noise = OrnsteinUhlenbeckNoise(mu=np.zeros(action_dim))

    def select_action(self, state):
        self.actor.eval()
        state = torch.FloatTensor(state).unsqueeze(0).cuda()
        action = self.actor(state).cpu().data.numpy().flatten()
        action += self.noise()
        return np.clip(action, -1, 1)

    def train(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)
        state = torch.FloatTensor(state).cuda()
        next_state = torch.FloatTensor(next_state).cuda()
        action = torch.FloatTensor(action).cuda()
        reward = torch.FloatTensor(reward).unsqueeze(1).cuda()
        done = torch.FloatTensor(np.float32(done)).unsqueeze(1).cuda()

        next_action = self.target_actor(next_state)
        target_q_value = self.target_critic(next_state, next_action)
        expected_q_value = reward + (1 - done) * self.gamma * target_q_value
        q_value = self.critic(state, action)

        critic_loss = nn.MSELoss()(q_value, expected_q_value)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        policy_loss = -self.critic(state, self.actor(state)).mean()
        self.actor_optimizer.zero_grad()
        policy_loss.backward()
        self.actor_optimizer.step()

        for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

        for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

**TRAINING LOOP**

In [10]:
def main():
    global agent
    env = gym.make("CarRacing-v2")
    state_dim = (3, 96, 96)
    action_dim = env.action_space.shape[0]
    agent = DDPG(state_dim, action_dim)

    num_episodes = 100
    for episode in range(num_episodes):
        state = env.reset()
        state = np.transpose(state, (2, 0, 1))
        episode_reward = 0
        done = False
        while not done:
            action = agent.select_action(state)
            next_state, reward, done, _ = env.step(action)
            next_state = np.transpose(next_state, (2, 0, 1))
            agent.replay_buffer.add(state, action, reward, next_state, done)
            agent.train()
            state = next_state
            episode_reward += reward
            print(f"Episode {episode}, Step Reward: {reward}, Cumulative Reward: {episode_reward}")

        print(f"Episode {episode} finished with total reward: {episode_reward}")

    env.close()

if __name__ == "__main__":
    main()

  deprecation(
  deprecation(


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 6.2259385665529035
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 6.125938566552904
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 6.025938566552904
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.9259385665529045
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.825938566552905
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.725938566552905
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.625938566552906
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.525938566552906
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.425938566552906
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.325938566552907
Episode 95, Step Reward: -0.09999999999999964, Cumulative Reward: 5.225938566552907
Episode 9

**INFERENCE**

In [11]:
import gym
from moviepy.editor import ImageSequenceClip

def generate_video(agent, env, video_filename, num_episodes=1):
    frames = []
    for _ in range(num_episodes):
        state = env.reset()
        state = np.transpose(state, (2, 0, 1))
        done = False
        while not done:
            frames.append(env.render(mode="rgb_array"))
            action = agent.select_action(state)
            next_state, _, done, _ = env.step(action)
            next_state = np.transpose(next_state, (2, 0, 1))
            state = next_state

    env.close()
    clip = ImageSequenceClip(frames, fps=30)
    clip.write_videofile(video_filename, codec='libx264')


generate_video(agent, gym.make("CarRacing-v2"), "carracing_ddpg.mp4")


  from scipy.ndimage.filters import sobel

  deprecation(

  deprecation(

See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(

  if not isinstance(terminated, (bool, np.bool8)):



Moviepy - Building video carracing_ddpg.mp4.
Moviepy - Writing video carracing_ddpg.mp4





Moviepy - Done !
Moviepy - video ready carracing_ddpg.mp4
