Name: Mahendran Jinachandran 

Student ID: 24088951

# Section 1: 
Why Reinforcement Learning is the ML paradigm of choice for this task?

- I have chosen "Ping Pong" game to work on this task. Reinforcement Learning(RL) is the most suitable for this task because it requires control and making decisions. Unlike supervised learning methods that rely on large datasets, RL does not require any datasets it rather relies on experiences just like we humans do.

- In the game of "Ping Pong", the agent (which is a software in this case) should learn how to move up or down to bounce the ball and score points to win the match. This is one of the examples of "Sequential Decision-Making" where each move impacts the game's results. 

- The one thing which separates RL from the other ML techniques is that the agent starts with no experience of the game prior. It learns through trial and error, explores different actions, receives feedback through those actions in from of rewards or penalties and eventually learns to make better choices to maximize its score. This is one of the techniques, which makes RL a very powerful approach for Atari games and many more, in which the rules are fixed but the strategies will be diffirent which much be learnt. 

Just For Fun: Hopefully, the game learns to make much better decisions than I do in real-life. Let's dive into the game

# Section 2:
## The Environment


In [None]:
# imports
import gym
import cv2 
import time
import random
import numpy as np
import tensorflow as tf
from collections import deque
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from tensorflow.keras import layers, models

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
print("Num GPUs Available: ", len(gpus))

In [None]:
# Check for available GPU devices and use them
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print("Error: " + str(e))

a. The Atari game selected is *Pong-v5*

In [None]:
#env = gym.make("ALE/Pong-v5", render_mode="human") # render_mode = 'human' for rendering
env = gym.make("ALE/Pacman-v5", render_mode="rgb_array") # render_mode = 'human' for rendering

In [None]:
env.metadata['render_fps'] = 60 

b. Inputs received from Gym Environment 

In [None]:
observation = env.reset()
print("Observation shape:", observation[0].shape)

c. Control settings for the JoyStick

In [None]:
num_actions = env.action_space.n
print("Number of possible actions:", num_actions)
print("Available actions are: ")
print(env.unwrapped.get_action_meanings())

# Section 3: DQN Implementation

a. Capture and Preprocessing of the Data (1 mark)

In [None]:
def preprocess_frame_color(frame):
    """
    Resize a raw RGB frame to 84x84 while keeping 3 color channels.
    """
    return cv2.resize(frame, (84, 84), interpolation=cv2.INTER_AREA)

def initialize_frame_stack(preprocessed_frame, stack_size=4):
    """
    Initialize a deque of stacked frames with the same frame repeated.
    """
    total_frames = [preprocessed_frame for _ in range(stack_size)]
    return deque(total_frames, maxlen=stack_size)

def stack_frames(stacked_frames, new_frame, is_new_episode, stack_size=4):
    """
    Initialize the frame stack if it's a new episode, otherwise
    append the new frame to the existing stack.
    """
    if is_new_episode:
        stacked_frames = initialize_frame_stack(new_frame, stack_size)
    else:
        stacked_frames.append(new_frame)
        
    # Concatenate along the channel dimension: (84, 84, 12) for 4 RGB frames
    stacked_state = np.concatenate(stacked_frames, axis=2)
    return stacked_state, stacked_frames

b. The Network Structure (2 marks)

In [None]:
def create_dqn_model(input_shape, num_actions):
    """
    Create a Convolutional Neural Network for DQN.
    Input shape = (84, 84, 12) for 4 stacked color frames.
    Output = Q-value for each action.
    """
    model = models.Sequential([
        layers.Input(shape=input_shape),

        layers.Conv2D(32, (8, 8), strides=2, activation='relu'),
        layers.BatchNormalization(),

        layers.Conv2D(64, (4, 4), strides=2, activation='relu'),
        layers.BatchNormalization(),

        layers.Conv2D(64, (3, 3), strides=1, activation='relu'),
        layers.BatchNormalization(),

        layers.Flatten(),

        layers.Dense(512, activation='relu'),
        layers.Dropout(0.5),  # Dropout to avoid overfitting
        
        layers.Dense(num_actions) 
    ])
    
    return model

c. Q-Learning Update (2 marks)

In [None]:
def update_model(main_model, target_model, optimizer, batch, gamma=0.99):
    states, actions, rewards, next_states, dones = batch

    # Convert to tensors
    states = tf.convert_to_tensor(states, dtype=tf.float32)
    next_states = tf.convert_to_tensor(next_states, dtype=tf.float32)
    actions = tf.convert_to_tensor(actions, dtype=tf.int32)
    rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
    dones = tf.convert_to_tensor(dones, dtype=tf.float32)

    # Train step
    with tf.GradientTape() as tape:
        # Q(s, a) using the main model
        q_values = main_model(states)
        q_action = tf.reduce_sum(q_values * tf.one_hot(actions, main_model.output_shape[-1]), axis=1)

        # Q(s', a') using the target model (no gradient)
        next_q_values = target_model(next_states)
        max_next_q = tf.reduce_max(next_q_values, axis=1)
        target_q = rewards + gamma * max_next_q * (1.0 - dones)

        # Loss between predicted Q and target Q
        loss = tf.keras.losses.MSE(target_q, q_action)

    # Apply gradients
    grads = tape.gradient(loss, main_model.trainable_variables)
    optimizer.apply_gradients(zip(grads, main_model.trainable_variables))
    loss = loss.numpy()
    return loss

d. Other Important Concepts (2 marks)

In [None]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

In [None]:
# Hyperparameters
GAMMA = 0.99 # Discount factor
EPSILON = 1.0   # Exploration rate
EPSILON_MIN = 0.1 # Minimum exploration rate
EPSILON_DECAY = 0.995 # Decay rate
LEARNING_RATE = 0.00025 # Learning rate
REPLAY_BUFFER_SIZE = 100000
BATCH_SIZE = 32 # Batch size for training
TARGET_UPDATE_FREQ = 1000 # Frequency to update target model
STACK_SIZE =  4 # Number of frames to stack
TOTAL_EPISODES = 500 # Total episodes to train
MAX_STEPS = 5000 # Max steps per episode
input_shape = (84, 84, 3 * STACK_SIZE) # 4 stacked frames

In [None]:
# Models and optimizer
dqn_main_model = create_dqn_model(input_shape, num_actions)
dqn_target_model = create_dqn_model(input_shape, num_actions)
dqn_target_model.set_weights(dqn_main_model.get_weights())
dqn_optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=LEARNING_RATE)

# Replay buffer
dqn_replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

# Frame stack
dqn_stacked_frames = deque(maxlen=STACK_SIZE)

In [None]:
def choose_action(stacked_state, main_model):
    # Epsilon-greedy action
    if np.random.rand() < EPSILON:
        action = env.action_space.sample()
    else:
        q_values = main_model(np.expand_dims(stacked_state, axis=0), training=False)
        action = np.argmax(q_values.numpy())
    return action

In [None]:
def train_dqn(dqn_type, main_model, target_model, optimizer, replay_buffer, stacked_frames):
    training_start_time = time.time()
    step_count = 0
    episode_rewards = []
    epsilons = []
    losses = []

    print(f"Training of {dqn_type} DQN started...")
    for episode in tqdm(range(TOTAL_EPISODES), desc="Training Episodes"):
        start_time = time.time()

        state, info = env.reset()
        preprocessed_frame = preprocess_frame_color(state)
        stacked_state, stacked_frames = stack_frames(stacked_frames, preprocessed_frame, True)

        total_reward = 0
        done = False

        for step in tqdm(range(MAX_STEPS), desc=f"Episode {episode+1}", leave=False):
            step_count += 1

            action = choose_action(stacked_state, main_model)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Process next frame
            preprocessed_next = preprocess_frame_color(next_state)
            next_stacked_state, stacked_frames = stack_frames(stacked_frames, preprocessed_next, False)

            # Store experience
            replay_buffer.add((stacked_state, action, reward, next_stacked_state, float(done)))
            stacked_state = next_stacked_state
            total_reward += reward

            # Train enough samples
            if len(replay_buffer) > BATCH_SIZE:
                batch = replay_buffer.sample(BATCH_SIZE)
                loss = update_model(main_model, target_model, optimizer, batch, gamma=GAMMA)
                losses.append(loss)

            # Update target network
            if step_count % TARGET_UPDATE_FREQ == 0:
                target_model.set_weights(main_model.get_weights())
                print(f"[Step {step_count}] 🔄 Target network updated.")

            if done:
                break

        # Decay epsilon
        global EPSILON
        EPSILON = max(EPSILON_MIN, EPSILON * EPSILON_DECAY)

        elapsed = time.time() - start_time
        episode_rewards.append(total_reward)
        epsilons.append(EPSILON)
        tqdm.write(f"Episode {episode + 1}/{TOTAL_EPISODES} - Reward: {total_reward} - Epsilon: {EPSILON:.4f} - Time: {elapsed:.2f}s")

    total_training_time = time.time() - training_start_time
    return episode_rewards, epsilons, losses, total_training_time

In [None]:
dqn_episode_rewards, dqn_epsilons, dqn_losses, dqn_total_training_time = train_dqn("Q-Learning", dqn_main_model, dqn_target_model, dqn_optimizer, dqn_replay_buffer, dqn_stacked_frames)

# Section 4: 
Results and Evaluation

In [None]:
# A generic method to display training metrics
# such as rewards, epsilon, and loss
def plot_training_metric(metric_values, title="Training Metric", ylabel="Value", xlabel="Episode", label=None, color='blue'):
    plt.figure(figsize=(10, 4))
    plt.plot(metric_values, label=label if label else ylabel, color=color)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

In [None]:
# Plot using the generic function
plot_training_metric(dqn_episode_rewards, title="DQN Training: Reward Per Episode", ylabel="Reward", label="Episode Reward", color="blue")

In [None]:
plot_training_metric(dqn_epsilons, title="DQN Exploration Rate (Epsilon) Over Time", ylabel="Epsilon", label="Epsilon", color="orange")

In [None]:
plot_training_metric(dqn_losses, title="DQN Loss Over Time", ylabel="Loss", label="Loss", color="red")

# Section 5
Exploration of recent developments in DQN i.e Dueling DQNs

Reference: Book

In [None]:
def create_dueling_dqn(input_shape, num_actions):
    """
    TensorFlow (Keras) implementation of Dueling DQN based on the provided PyTorch model.
    input_shape: (height, width, channels) — e.g., (84, 84, 1)
    num_actions: number of possible actions in the environment
    """

    inputs = layers.Input(shape=input_shape)

    # Convolutional layers
    x = layers.Conv2D(32, kernel_size=8, strides=4, activation='relu')(inputs)
    x = layers.Conv2D(64, kernel_size=4, strides=2, activation='relu')(x)
    x = layers.Conv2D(64, kernel_size=3, strides=1, activation='relu')(x)
    x = layers.Flatten()(x)

    # Shared dense layer
    x = layers.Dense(512, activation='relu')(x)

    # Value stream
    value = layers.Dense(1)(x)  # Output: scalar

    # Advantage stream
    advantage = layers.Dense(num_actions)(x)  # Output: one value per action

    # Combine value and advantage into Q-values
    advantage_mean = tf.reduce_mean(advantage, axis=1, keepdims=True)
    q_values = value + (advantage - advantage_mean)

    # Final model
    model = models.Model(inputs=inputs, outputs=q_values)
    return model

In [None]:
# Models and optimizer
dueling_dqn_main_model = create_dueling_dqn(input_shape, num_actions)
dueling_dqn_target_model = create_dueling_dqn(input_shape, num_actions)
dueling_dqn_target_model.set_weights(dueling_dqn_main_model.get_weights())
dueling_dqn_optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=LEARNING_RATE)

# Replay buffer
dueling_dqn_replay_buffer = ReplayBuffer(REPLAY_BUFFER_SIZE)

# Frame stack
dueling_dqn_stacked_frames = deque(maxlen=STACK_SIZE)

In [None]:
dueling_dqn_rewards, dueling_dqn_epsilons, dueling_dqn_losses, dueling_dqn_total_training_time = train_dqn("Dueling DQN", dueling_dqn_main_model, dueling_dqn_target_model, dueling_dqn_optimizer, dueling_dqn_replay_buffer, dueling_dqn_stacked_frames)

In [None]:
plot_training_metric(dueling_dqn_rewards, title="Dueling DQN Training: Reward Per Episode", ylabel="Reward", label="Episode Reward", color="blue")

In [None]:
plot_training_metric(dueling_dqn_epsilons, title="Dueling DQN Exploration Rate (Epsilon) Over Time", ylabel="Epsilon", label="Epsilon", color="orange")

In [None]:
plot_training_metric(dueling_dqn_losses, title="Dueling DQN Loss Over Time", ylabel="Loss", label="Loss", color="red")