# CSC_52081_EP Project

Advanced Machine Learning and Autonomous Agents Project

## Introduction

Reinforcement Learning (RL) has emerged as a robust framework for training autonomous agents to learn optimal behaviors through environmental interactions. This study utilizes the [`CarRacing-v3`](https://gymnasium.farama.org/environments/box2d/car_racing/) environment from Gymnasium, which presents a challenging control task in a racing scenario.

### Environment

The environment features a high-dimensional observation space, represented by a $96 \times 96$ RGB image capturing the car and track, necessitating the use of deep convolutional neural networks (CNNs) for effective feature extraction.

#### Action Space

The action space in CarRacing-v3 supports both continuous and discrete control modes.

In **continuous mode**, the agent outputs three real-valued commands:

- steering (ranging from $-1$ to $+1$)
- gas
- braking

In **discrete mode**, the action space is simplified to five actions:

- do nothing
- steer left
- steer right
- gas
- brake

This dual action representation enables a comprehensive evaluation of various RL algorithms under different control settings.

#### Reward

The reward structure combines a penalty of $-0.1$ per frame and a reward of $+\frac{1000}{N}$ for each new track tile visited, where $N$ is the total number of tiles. This incentivizes the agent to balance exploration (visiting tiles) with efficiency (minimizing frame usage). For example, completing the race after visiting all $N$ tiles in 732 frames yields a reward of $1000 - 0.1 \times 732 = 926.8$ points.

### Objective

The primary objective of this project is to compare RL policies across discrete and continuous action modalities. For discrete control, methods like **Deep Q-Network** (DQN) and **SARSA** are implemented, while continuous control is explored using approaches such as the **Cross-Entropy Method** (CEM), **Self-Adaptive Evolution Strategy** (SA-ES), and policy gradient techniques like **Proximal Policy Optimization** (PPO) and **Soft Actor-Critic** (SAC). This comparative analysis aims to understand the strengths and limitations of each method in handling complex decision spaces.

The high-dimensional visual inputs in `CarRacing-v3` require effective feature extraction, addressed through a tailored CNN architecture. Transitioning between discrete and continuous action representations also demands careful algorithmic design and parameter tuning to ensure stable learning and convergence. While prior studies have often focused on either discrete or continuous action spaces separately, this work adopts a comparative approach, evaluating different agents within the same environment to assess performance under similar conditions.

At this stage, the work outlines the methodology and anticipated challenges, focusing on designing the CNN-based feature extractor, implementing RL algorithms, and establishing a framework for performance comparison. Preliminary findings are yet to be finalized, but the study is expected to provide insights into applying RL in high-dimensional, real-time control tasks. Limitations include the preliminary nature of experiments and the need for further tuning and validation. Future work will involve extensive empirical evaluations, exploring additional policy gradient methods, and refining the network architecture to better handle the complexities of `CarRacing-v3`.

### GitHub

The project's code is available on [GitHub](https://github.com/tr0fin0/ensta_CSC_52081_EP_project), offering a reproducible framework for future investigations and extensions.

## Installation

### Environment

#### WSL, Linux or MacOS

A `Python Virtual Environment` will be used for this project by run the following on a terminal on the project folder:

```bash
sudo apt install python3.10-venv
python3 -m venv env
source env/bin/activate
python3 -m pip install --upgrade pip
python3 -m pip install -r requirements.txt
```

### Imports

In [20]:
from collections import deque
from ipywidgets import interact
from IPython.display import Video
from pathlib import Path
from tqdm.notebook import tqdm
from typing import cast, List, Tuple, Deque, Optional, Callable


import gymnasium as gym
import itertools
import torch
import torch.nn as nn
import torch.optim as optim
import random

In [21]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm.notebook import tqdm

In [22]:
sns.set_context("talk")

In [23]:
def video_selector(file_path: List[Path]) -> Video:
    return Video(file_path, embed=True, html_attributes="controls autoplay loop")

### Setup

#### Directories

In [24]:
DIRECTORY_OUTPUT = "output"
DIRECTORY_MODELS = Path(f"{DIRECTORY_OUTPUT}/models/")
DIRECTORY_FIGURES = Path(f"{DIRECTORY_OUTPUT}/images/")

if not DIRECTORY_FIGURES.exists():
    DIRECTORY_FIGURES.mkdir(parents=True)

if not DIRECTORY_MODELS.exists():
    DIRECTORY_MODELS.mkdir(parents=True)

## Demonstration

In [25]:
VIDEO_DEMO = "CSC_52081_EP_demonstration"
(DIRECTORY_FIGURES / f"{VIDEO_DEMO}.mp4").unlink(missing_ok=True)


env = gym.make(
    "CarRacing-v3",
    render_mode="rgb_array",
    lap_complete_percent=0.95,
    domain_randomize=False,
    continuous=False
)
env = gym.wrappers.RecordVideo(env, video_folder=str(DIRECTORY_FIGURES), name_prefix=VIDEO_DEMO)


done = False
observation, info = env.reset()

while not done:
    action = env.action_space.sample()

    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

env.close()


Video(
    DIRECTORY_FIGURES / f"{VIDEO_DEMO}-episode-0.mp4",
    embed=True,
    html_attributes="controls autoplay loop",
)

InvalidAction: you passed the invalid action `1.0`. The supported action_space is `Discrete(5)`

## Description

only demonstration is right. from below here is only experimental.

### Global Definitions

#### Constants

In [None]:
env = gym.make("CarRacing-v3", render_mode="rgb_array")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Set the device to CUDA if available, otherwise use CPU

STATE_SHAPE = (96, 96, 3)
DISCRETE_ACTIONS = 5
CONTINUOUS_ACTIONS = 3
EPISODES = 1000 

#### Functions

In [None]:
def plot_learning_curve(rewards):
    plt.plot(rewards)
    plt.xlabel("Episodes")
    plt.ylabel("Cumulative Reward")
    plt.title("Learning Curve")
    plt.show()

def plot_action_heatmap(action_counts):
    plt.imshow(action_counts, cmap="hot", interpolation="nearest")
    plt.colorbar()
    plt.title("Action Heatmap")
    plt.show()

def test_agent(
    env: gym.Env, agent: torch.nn.Module, num_episode: int = 1
) -> List[float]:
    """
    Test a naive agent in the given environment using the provided Q-network.

    Parameters
    ----------
    env : gym.Env
        The environment in which to test the agent.
    agent : torch.nn.Module
        The Q-network to use for decision making.
    num_episode : int, optional
        The number of episodes to run, by default 1.

    Returns
    -------
    List[float]
        A list of rewards per episode.
    """
    episode_reward_list = []

    for episode_id in range(num_episode):
        state, _ = env.reset()
        state = torch.tensor(state, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0) / 255.0
        action = agent(state)
        done = False
        episode_reward = 0.0

        while not (terminated or truncated):
            # Convert the state to a PyTorch tensor and add a batch dimension (unsqueeze)
            state, reward, done, terminated, truncated = env.step(action)
            state = torch.tensor(state, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0) / 255.0

            # Compute the Q-values for the current state using the Q-network
            q_values = agent(state)

            # Select the action with the highest Q-value
            action = torch.argmax(q_values).item()

            # Update the episode reward
            episode_reward += float(reward)

        episode_reward_list.append(episode_reward)
        print(f"Episode reward: {episode_reward}")

    return episode_reward_list

#### CNN Feature Extractor

In [None]:
import gymnasium.wrappers as gym_wrap
import os

def save(self, save_dir, save_name):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    save_path = save_dir + save_name + f"_{self.act_taken}.pt"
    torch.save({
        'upd_model_state_dict': self.updating_net.state_dict(),
        'frz_model_state_dict': self.frozen_net.state_dict(),
        'optimizer_state_dict': self.optimizer.state_dict(),
        # 'replay_buffer': self.buffer,  # Exclude replay buffer from saving
        'action_number': self.act_taken,
        'epsilon': self.epsilon
        }, save_path)
    print(f"Model saved to {save_path} at step {self.act_taken}")

def load(self, load_dir, model_name):
    loaded_model = torch.load(load_dir+model_name, weights_only=False)
    upd_net_param = loaded_model['upd_model_state_dict']
    frz_net_param = loaded_model['frz_model_state_dict']
    opt_param = loaded_model['optimizer_state_dict']
    self.updating_net.load_state_dict(upd_net_param)
    self.frozen_net.load_state_dict(frz_net_param)
    self.optimizer.load_state_dict(opt_param)
    if self.load_state == 'eval':
        self.updating_net.eval()
        self.frozen_net.eval()
        self.epsilon_min = 0
        self.epsilon = 0
    elif self.load_state == 'train':
        self.updating_net.train()
        self.frozen_net.train()
        self.act_taken = loaded_model['action_number']
        self.epsilon = loaded_model['epsilon']
    else:
        raise ValueError(f"Unknown load state. Should be either 'eval' or 'train'.")

class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        for _ in range(self._skip):
            state, reward, terminated, truncated, info = self.env.step(action)
            total_reward += reward
            if terminated:
                break
        return state, total_reward, terminated, truncated, info


class DQN(nn.Module):

    def __init__(self, in_dim, out_dim):
        super().__init__()
        channel_n, height, width = in_dim

        if height != 84 or width != 84:
            raise ValueError(f"DQN model requires input of a (84, 84)-shape. Input of a ({height, width})-shape was passed.")

        self.net = nn.Sequential(
            nn.Conv2d(in_channels=channel_n, out_channels=16,
                      kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=16, out_channels=32,
                      kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(2592, 256),
            nn.ReLU(),
            nn.Linear(256, out_dim),
        )

    def forward(self, input):
        return self.net(input)

env = gym.make(
    "CarRacing-v3",
    continuous=False
)
env = DQN.SkipFrame(env, skip=4)
env = gym_wrap.GrayscaleObservation(env)
env = gym_wrap.ResizeObservation(env, shape=(84, 84))
env = gym_wrap.FrameStackObservation(env, stack_size=4)
state, info = env.reset()
action_n = env.action_space.n

### Deep SARSA

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import gymnasium as gym
import gymnasium.wrappers as gym_wrap

# Environment setup
env = gym.make("CarRacing-v3", render_mode="rgb_array")

# Parameters
EPISODES = 1000  # Training episodes
GAMMA = 0.99  # Discount factor
ALPHA = 0.001  # Learning rate
EPSILON = 1.0  # Exploration rate
EPSILON_DECAY = 0.995  # Decay factor
EPSILON_MIN = 0.05  # Minimum exploration rate

class SkipFrame(gym.Wrapper):
    def __init__(self, env, skip):
        super().__init__(env)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        for _ in range(self._skip):
            state, reward, terminated, truncated, info = self.env.step(action)
            total_reward += reward
            if terminated:
                break
        return state, total_reward, terminated, truncated, info

# CNN-based Q-network
class DeepSARSA(nn.Module):
    def __init__(self, state_shape, action_size):
        super().__init__()
        channel_n, height, width = state_shape

        self.net = nn.Sequential(
            nn.Conv2d(in_channels=channel_n, out_channels=16,
                      kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=16, out_channels=32,
                      kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(2592, 256),
            nn.ReLU(),
            nn.Linear(256, action_size),
        )

    def forward(self, x):
        return self.net(x)

# Training function
def train_deep_sarsa(model: nn.Module = None):
    optimizer = optim.Adam(model.parameters(), lr=ALPHA)
    loss_fn = nn.MSELoss()
    memory = deque(maxlen=10000)
    
    global EPSILON
    for episode in range(EPISODES):
        state, _ = env.reset()
        state = torch.tensor(state, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0).to(device) / 255.0
        action = select_action(model, state)
        done = False
        total_reward = 0

        while not done:
            next_state, reward, terminated, truncated, info = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0).to(device) / 255.0
            next_action = select_action(model, next_state)  # SARSA selects next action from current policy
            done = truncated or terminated

            memory.append((state, action, reward, next_state, next_action, done))
            state, action = next_state, next_action
            total_reward += reward

            # Update model
            if len(memory) > 32:
                replay_experience(model, optimizer, loss_fn, memory, device)

        EPSILON = max(EPSILON * EPSILON_DECAY, EPSILON_MIN)  # Decay exploration
        print(f"Episode {episode}: Total Reward: {total_reward}")

    return model

# Action selection using ε-greedy policy
def select_action(model, state):
    if random.random() < EPSILON:
        return random.randint(0, DISCRETE_ACTIONS - 1)  # Random action
    with torch.no_grad():
        return torch.argmax(model(state)).item()  # Best action from Q-network

# Experience replay function
def replay_experience(model, optimizer, loss_fn, memory, device):
    batch = random.sample(memory, 32)
    
    states, actions, rewards, next_states, next_actions, dones = zip(*batch)
    states = torch.cat(states).to(device)
    next_states = torch.cat(next_states).to(device)
    
    actions = torch.tensor(actions, dtype=torch.long).unsqueeze(1).to(device)
    rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1).to(device)
    next_actions = torch.tensor(next_actions, dtype=torch.long).unsqueeze(1).to(device)
    dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1).to(device)
    
    q_values = model(states).gather(1, actions)
    next_q_values = model(next_states).gather(1, next_actions).detach()

    target_q_values = rewards + GAMMA * next_q_values * (1 - dones)

    loss = loss_fn(q_values, target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()



## Test it

In [None]:
env = gym.make(
    "CarRacing-v3",
    continuous=False
)
env = gym_wrap.GrayscaleObservation(env)
env = gym_wrap.ResizeObservation(env, shape=(84, 84))
env = gym_wrap.FrameStackObservation(env, stack_size=4)

state_shape = env.observation_space.shape()
discrete_actions = env.action_space.n
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Run Deep SARSA training
model = DeepSARSA(state_shape, discrete_actions).to(device)
model = train_deep_sarsa(model)

TypeError: 'tuple' object is not callable

In [None]:
NUM_EPISODES = 3

FIGS_DIR = Path("figs/")       # Where to save figures (.gif files)

VIDEO_DIRNAME = "deep_sarsa"

(FIGS_DIR / VIDEO_DIRNAME / "ds-video-episode-0.mp4").unlink(missing_ok=True)
(FIGS_DIR / VIDEO_DIRNAME / "ds-video-episode-1.mp4").unlink(missing_ok=True)
(FIGS_DIR / VIDEO_DIRNAME / "ds-video-episode-2.mp4").unlink(missing_ok=True)

env = gym.wrappers.RecordVideo(env, video_folder=FIGS_DIR / VIDEO_DIRNAME, episode_trigger=lambda x: True)
env = gym.wrappers.RecordEpisodeStatistics(env, buffer_length=NUM_EPISODES)

for episode_index in range(NUM_EPISODES):
    total_reward = 0.0
    state, info = env.reset()
    action = model(state)

    episode_over = False
    while not episode_over:
        state, reward, terminated, truncated, info = env.step(action)
        #state = torch.tensor(state, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0) / 255.0
        action = model(state)
        total_reward += reward
        episode_over = terminated or truncated

print(f"Total reward: {total_reward}")
print(f'Episode time taken: {env.time_queue}')
print(f'Episode total rewards: {env.return_queue}')
print(f'Episode lengths: {env.length_queue}')

env.close()


In [None]:
Video(FIGS_DIR / VIDEO_DIRNAME / "ds-video-episode-0.mp4", embed=True, html_attributes="controls autoplay loop")

In [None]:
Video(FIGS_DIR / VIDEO_DIRNAME / "ds-video-episode-1.mp4", embed=True, html_attributes="controls autoplay loop")

In [None]:
Video(FIGS_DIR / VIDEO_DIRNAME / "ds-video-episode-2.mp4", embed=True, html_attributes="controls autoplay loop")