In [None]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import cv2
import random
import string
import os

In [None]:
# Environment setup
env = gym.make('CarRacing-v2', continuous=False)
print("Observation space:", env.observation_space)
print("Action space:", env.action_space)

s, info = env.reset()
print(s.shape)

plt.figure(figsize=(5, 5))
plt.imshow(s)
plt.axis('off')
plt.show()

In [None]:
# No-op action animation
import matplotlib.animation as animation
from IPython.display import HTML

frames = []
for i in range(50):
    s, r, terminated, truncated, info = env.step(0)  # 0th action is the no_op action
    frames.append(s)

# Create animation
fig = plt.figure(figsize=(5, 5))
plt.axis('off')
im = plt.imshow(frames[0])


In [None]:
def animate(i):
    im.set_array(frames[i])
    return im,

anim = animation.FuncAnimation(fig, animate, frames=len(frames))
HTML(anim.to_jshtml())

In [None]:
# Preprocessing function
def preprocess(img):
    img = img[:84, 6:90]  # Car Racing v2 specific cropping
    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) / 255.0
    return img

In [None]:
# ImageEnv Wrapper
class ImageEnv(gym.Wrapper):
    def __init__(self, env, skip_frames=4, stack_frames=4, initial_no_op=50, **kwargs):
        super(ImageEnv, self).__init__(env, **kwargs)
        self.initial_no_op = initial_no_op
        self.skip_frames = skip_frames
        self.stack_frames = stack_frames

    def reset(self):
        s, info = self.env.reset()
        for i in range(self.initial_no_op):
            s, r, terminated, truncated, info = self.env.step(0)
        s = preprocess(s)
        self.stacked_state = np.tile(s, (self.stack_frames, 1, 1))
        return self.stacked_state, info

    def step(self, action):
        reward = 0
        for _ in range(self.skip_frames):
            s, r, terminated, truncated, info = self.env.step(action)
            reward += r
            if terminated or truncated:
                break
        s = preprocess(s)
        self.stacked_state = np.concatenate((self.stacked_state[1:], s[np.newaxis]), axis=0)
        return self.stacked_state, reward, terminated, truncated, info

In [None]:
env = gym.make('CarRacing-v2', continuous=False)
env = ImageEnv(env)

s, _ = env.reset()
print("The shape of an observation:", s.shape)

In [None]:
fig, axes = plt.subplots(1, 4, figsize=(20, 5))
for i in range(4):
    axes[i].imshow(s[i], cmap='gray')
    axes[i].axis('off')
plt.show()

In [None]:
for i in range(4):
    s, r, terminated, truncated, info = env.step(3)  # 3rd action is gas action

fig, axes = plt.subplots(1, 4, figsize=(20, 5))
for i in range(4):
    axes[i].imshow(s[i], cmap='gray')
    axes[i].axis('off')
plt.show()

In [None]:
# DQN Network
class CNNActionValue(nn.Module):
    def __init__(self, state_dim, action_dim, activation=F.relu):
        super(CNNActionValue, self).__init__()
        self.conv1 = nn.Conv2d(state_dim, 16, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2)
        self.in_features = 32 * 9 * 9
        self.fc1 = nn.Linear(self.in_features, 256)
        self.fc2 = nn.Linear(256, action_dim)
        self.activation = activation

    def forward(self, x):
        x = self.activation(self.conv1(x))
        x = self.activation(self.conv2(x))
        x = x.view(-1, self.in_features)
        x = self.activation(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
# Replay Buffer
class ReplayBuffer:
    def __init__(self, state_dim, action_dim, max_size=int(1e5)):
        self.s = np.zeros((max_size, *state_dim), dtype=np.float32)
        self.a = np.zeros((max_size, *action_dim), dtype=np.int64)
        self.r = np.zeros((max_size, 1), dtype=np.float32)
        self.s_prime = np.zeros((max_size, *state_dim), dtype=np.float32)
        self.terminated = np.zeros((max_size, 1), dtype=np.float32)

        self.ptr = 0
        self.size = 0
        self.max_size = max_size

    def update(self, s, a, r, s_prime, terminated):
        self.s[self.ptr] = s
        self.a[self.ptr] = a
        self.r[self.ptr] = r
        self.s_prime[self.ptr] = s_prime
        self.terminated[self.ptr] = terminated

        self.ptr = (self.ptr + 1) % self.max_size
        self.size = min(self.size + 1, self.max_size)

    def sample(self, batch_size):
        ind = np.random.randint(0, self.size, batch_size)
        return (torch.FloatTensor(self.s[ind]), torch.FloatTensor(self.a[ind]),
                torch.FloatTensor(self.r[ind]), torch.FloatTensor(self.s_prime[ind]),
                torch.FloatTensor(self.terminated[ind]))

In [None]:
# DQN Agent
class DQN:
    def __init__(self, state_dim, action_dim, lr=0.00025, epsilon=1.0, epsilon_min=0.1, gamma=0.99,
                 batch_size=32, warmup_steps=5000, buffer_size=int(1e5), target_update_interval=10000):

        self.action_dim = action_dim
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.gamma = gamma
        self.batch_size = batch_size
        self.warmup_steps = warmup_steps
        self.target_update_interval = target_update_interval

        self.network = CNNActionValue(state_dim[0], action_dim)
        self.target_network = CNNActionValue(state_dim[0], action_dim)
        self.target_network.load_state_dict(self.network.state_dict())
        self.optimizer = torch.optim.RMSprop(self.network.parameters(), lr)

        self.buffer = ReplayBuffer(state_dim, (1,), buffer_size)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.network.to(self.device)
        self.target_network.to(self.device)

        self.total_steps = 0
        self.epsilon_decay = (epsilon - epsilon_min) / 1e6

    @torch.no_grad()
    def act(self, x, training=True):
        self.network.train(training)
        if training and ((np.random.rand() < self.epsilon) or (self.total_steps < self.warmup_steps)):
            a = np.random.randint(0, self.action_dim)
        else:
            x = torch.from_numpy(x).float().unsqueeze(0).to(self.device)
            q = self.network(x)
            a = torch.argmax(q).item()
        return a

    def learn(self):
        s, a, r, s_prime, terminated = map(lambda x: x.to(self.device), self.buffer.sample(self.batch_size))
        next_q = self.target_network(s_prime).detach()
        td_target = r + (1. - terminated) * self.gamma * next_q.max(dim=1, keepdim=True).values
        loss = F.mse_loss(self.network(s).gather(1, a.long()), td_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        result = {'total_steps': self.total_steps, 'value_loss': loss.item()}
        return result

    def process(self, transition):
        result = {}
        self.total_steps += 1
        self.buffer.update(*transition)

        if self.total_steps > self.warmup_steps:
            result = self.learn()

        if self.total_steps % self.target_update_interval == 0:
            self.target_network.load_state_dict(self.network.state_dict())
        self.epsilon = max(self.epsilon_min, self.epsilon - self.epsilon_decay)
        return result

In [None]:
env = gym.make('CarRacing-v2', continuous=False)
env = ImageEnv(env)

max_steps = int(2e6)
eval_interval = 10000
state_dim = (4, 84, 84)
action_dim = env.action_space.n

agent = DQN(state_dim, action_dim)

In [None]:
def evaluate(agent, n_evals=5):
    eval_env = gym.make('CarRacing-v2', continuous=False)
    eval_env = ImageEnv(eval_env)
    scores = 0
    for _ in range(n_evals):
        s, _ = eval_env.reset()
        terminated = False
        truncated = False
        while not terminated and not truncated:
            a = agent.act(s, training=False)
            s, r, terminated, truncated, info = eval_env.step(a)
            scores += r
    return scores / n_evals

In [None]:
returns, steps = [], []
for step in range(0, max_steps + 1):
    s, _ = env.reset()
    terminated = False
    truncated = False
    while not terminated and not truncated:
        a = agent.act(s)
        s_prime, r, terminated, truncated, info = env.step(a)
        terminated = terminated or truncated
        result = agent.process((s, [a], [r], s_prime, [terminated]))
        s = s_prime

        if len(result):
            steps.append(result['total_steps'])
            returns.append(evaluate(agent))
            print(f"Steps: {result['total_steps']} | Returns: {returns[-1]}")

plt.figure(figsize=(10, 5))
plt.plot(steps, returns)
plt.xlabel('steps')
plt.ylabel('return')
plt.grid()
plt.show()

In [None]:

# Animation function to save the agent's performance as a video
def save_video(agent, filename, n_episodes=1):
    frames = []
    for _ in range(n_episodes):
        s, _ = env.reset()
        terminated = False
        truncated = False
        while not terminated and not truncated:
            a = agent.act(s, training=False)
            s, r, terminated, truncated, info = env.step(a)
            frames.append(env.render(mode='rgb_array'))
    
    height, width, layers = frames[0].shape
    video = cv2.VideoWriter(filename, cv2.VideoWriter_fourcc(*'mp4v'), 30, (width, height))
    
    for frame in frames:
        video.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    
    video.release()
    print(f"Video saved as {filename}")

# Save the trained agent's performance as a video
save_video(agent, 'carracing_dqn.mp4', n_episodes=5)