In [None]:
!pip install gymnasium[atari]
!pip install gymnasium[accept-rom-license]



In [None]:
%%capture
!apt install python-opengl
!apt install xvfb
!pip3 install pyvirtualdisplay


In [None]:
!pip install ale-py



In [None]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import ale_py
import cv2
import imageio
from collections import deque
from IPython.display import HTML
from base64 import b64encode

# Neural Network for DQN
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(4, 32, kernel_size=8, stride=4)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=4, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)
        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, output_dim)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

# Hyperparameters
env_name = "ALE/BeamRider-v5"
gamma = 0.99
learning_rate = 1e-4
batch_size = 64
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 50000
memory_size = 50000
target_update = 1000
num_episodes = 500

# Experience Replay Memory
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Epsilon-greedy policy
def epsilon_greedy_policy(state, epsilon, n_actions, policy_net):
    if random.random() < epsilon:
        return random.randint(0, n_actions - 1)
    else:
        with torch.no_grad():
            state = torch.tensor(np.array(state), dtype=torch.float32).unsqueeze(0)
            return policy_net(state).argmax(dim=1).item()

# Training loop
def train_dqn():
    env = gym.make(env_name, render_mode='rgb_array')
    n_actions = env.action_space.n

    policy_net = DQN((4, 84, 84), n_actions)
    target_net = DQN((4, 84, 84), n_actions)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    memory = ReplayMemory(memory_size)

    steps_done = 0
    for episode in range(num_episodes):
        state, _ = env.reset()
        state_buffer = deque(maxlen=4)
        state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
        state = cv2.resize(state, (84, 84))
        state = np.array(state, dtype=np.uint8)  # Resize to 84x84
        state_buffer.extend([state] * 4)
        total_reward = 0
        done = False

        step = 0
        max_steps = 1000
        while not done and step < max_steps:
            stacked_state = np.stack(state_buffer, axis=0)
            epsilon = epsilon_end + (epsilon_start - epsilon_end) * np.exp(-1. * steps_done / epsilon_decay)
            action = epsilon_greedy_policy(stacked_state, epsilon, n_actions, policy_net)
            next_state, reward, done, truncated, _ = env.step(action)
            next_state = cv2.cvtColor(next_state, cv2.COLOR_RGB2GRAY)
            next_state = cv2.resize(next_state, (84, 84))
            next_state = np.array(next_state, dtype=np.uint8)  # Resize to 84x84
            state_buffer.append(next_state)
            done = done or truncated
            memory.push(stacked_state, action, reward, np.stack(state_buffer, axis=0), done)
            total_reward += reward
            steps_done += 1
            step += 1

            if len(memory) > batch_size:
                transitions = memory.sample(batch_size)
                batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*transitions)

                batch_state = torch.tensor(np.array(batch_state), dtype=torch.float32) / 255.0
                batch_action = torch.tensor(batch_action, dtype=torch.int64).unsqueeze(1)
                batch_reward = torch.tensor(batch_reward, dtype=torch.float32).unsqueeze(1)
                batch_next_state = torch.tensor(np.array(batch_next_state), dtype=torch.float32) / 255.0
                batch_done = torch.tensor(batch_done, dtype=torch.float32).unsqueeze(1)

                current_q_values = policy_net(batch_state).gather(1, batch_action)
                next_q_values = target_net(batch_next_state).max(1)[0].detach().unsqueeze(1)
                expected_q_values = batch_reward + (gamma * next_q_values * (1 - batch_done))

                loss = nn.MSELoss()(current_q_values, expected_q_values)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

        print(f"Episode {episode}, Total Reward: {total_reward}")

    env.close()

In [None]:
def evaluate_agent(env, max_steps, n_eval_episodes, policy_net, epsilon=0):
    episode_rewards = []

    for episode in range(n_eval_episodes):
        state, _ = env.reset()
        state_buffer = deque(maxlen=4)
        state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
        state = cv2.resize(state, (84, 84))
        state_buffer.extend([state] * 4)
        total_reward = 0
        done = False

        for step in range(max_steps):
            stacked_state = np.stack(state_buffer, axis=0)
            action = epsilon_greedy_policy(stacked_state, epsilon, env.action_space.n, policy_net)
            next_state, reward, done, truncated, _ = env.step(action)
            next_state = cv2.cvtColor(next_state, cv2.COLOR_RGB2GRAY)
            next_state = cv2.resize(next_state, (84, 84))
            state_buffer.append(next_state)
            total_reward += reward

            if done or truncated:
                break

        episode_rewards.append(total_reward)

    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)
    return mean_reward, std_reward

In [None]:
def record_video(env, policy_net, out_directory, fps=30):
    images = []
    state, _ = env.reset()
    state_buffer = deque(maxlen=4)
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
    state = cv2.resize(state, (84, 84))
    state_buffer.extend([state] * 4)
    done = False

    while not done:
        img = env.render()
        images.append(img)
        stacked_state = np.stack(state_buffer, axis=0)
        action = epsilon_greedy_policy(stacked_state, 0, env.action_space.n, policy_net)
        next_state, reward, done, truncated, _ = env.step(action)
        next_state = cv2.cvtColor(next_state, cv2.COLOR_RGB2GRAY)
        next_state = cv2.resize(next_state, (84, 84))
        state_buffer.append(next_state)
        done = done or truncated

    imageio.mimsave(out_directory, [np.array(img) for img in images], fps=fps)
    print(f"Video saved at {out_directory}")


In [None]:
# Function to display video
def show_video(video_path, video_width=500):
    video_file = open(video_path, "r+b").read()
    video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
    return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")

In [None]:
if __name__ == "__main__":
    train_dqn()
    env = gym.make(env_name, render_mode='rgb_array')
    mean_reward, std_reward = evaluate_agent(env, max_steps=1000, n_eval_episodes=10, policy_net=DQN((4, 84, 84), env.action_space.n))
    print(f"Evaluation - Mean Reward: {mean_reward}, Std Reward: {std_reward}")
    record_video(env, DQN((4, 84, 84), env.action_space.n), "Agent.mp4")
    show_video("Agent.mp4")

Episode 0, Total Reward: 176.0
Episode 1, Total Reward: 352.0
Episode 2, Total Reward: 396.0
Episode 3, Total Reward: 264.0
Episode 4, Total Reward: 440.0
Episode 5, Total Reward: 220.0
Episode 6, Total Reward: 308.0
Episode 7, Total Reward: 352.0
Episode 8, Total Reward: 132.0
Episode 9, Total Reward: 352.0
Episode 10, Total Reward: 264.0
Episode 11, Total Reward: 440.0
Episode 12, Total Reward: 264.0
Episode 13, Total Reward: 352.0
Episode 14, Total Reward: 132.0
Episode 15, Total Reward: 176.0
Episode 16, Total Reward: 88.0
Episode 17, Total Reward: 264.0
Episode 18, Total Reward: 264.0
Episode 19, Total Reward: 88.0
Episode 20, Total Reward: 176.0
Episode 21, Total Reward: 220.0
Episode 22, Total Reward: 264.0
Episode 23, Total Reward: 264.0
Episode 24, Total Reward: 440.0
Episode 25, Total Reward: 176.0
Episode 26, Total Reward: 528.0
Episode 27, Total Reward: 308.0
Episode 28, Total Reward: 264.0
Episode 29, Total Reward: 440.0
Episode 30, Total Reward: 220.0
Episode 31, Total Re



Video saved at Agent.mp4
