# DQN Project Part 3

Code runs through until completion

## Contributors: 
- Tadhg Ryan (21310408)
- Craig Phayer (21340633)
- Thomas McCarty (21327696)


In [1]:
# %pip install --upgrade pip
# %pip install -q -U gymnasium swig
# %pip install gymnasium[atari]==0.29.1
# %pip install gymnasium[classic_control,box2d,accept-rom-license,ActionWrapper]==0.29.1
# %pip install tensorflow==2.10
# %pip install "numpy<2"
# %pip install matplotlib
# %pip install opencv-python
# %pip install moviepy

In [2]:
import tensorflow as tf
tf.debugging.set_log_device_placement(False)
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

gpus = tf.config.list_physical_devices('GPU')

if gpus:
    print("GPUs available:")
    for gpu in gpus:
        details = tf.config.experimental.get_device_details(gpu)
        print("Name:", details.get('device_name', 'Unknown GPU'))
else:
    print("No GPU available.")

print(tf.__version__)

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0" # set gpu 0 as default
# Suppressing TensorFlow's informational and warning logs to avoid output clutter.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Set TensorFlow to use only GPU 1
        tf.config.set_visible_devices(gpus[0], 'GPU')
        tf.config.experimental.set_memory_growth(gpus[0], True)
    except RuntimeError as e:
        print(e)

Num GPUs Available:  0
No GPU available.
2.10.0


In [3]:
# Importing libraries
import gymnasium as gym

# Creating the environment
env = gym.make("ALE/Qbert-v5", obs_type="ram", frameskip=4) # TODO: Check render_mode, env.reset(seed=seed)

# Inspecting the environment
print("Action space:", env.action_space)
print("Observation space:", env.observation_space)

Action space: Discrete(6)
Observation space: Box(0, 255, (128,), uint8)


In [4]:
import numpy as np
import cv2
from collections import deque

# TODO: Test AtariPreprocessing (FrameStacking + grayscaling + resizing)
# Function to preprocess a single frame
# def preprocess_frame(frame, augment=False):
#     # Ensure the input is a NumPy array
#     if isinstance(frame, tuple):
#         frame = frame[0]  # Extract the observation if it's a tuple
#     frame = np.array(frame)  # Ensure it's a NumPy array
#     # Convert to grayscale
#     gray_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
#     # Resize to 84x84
#     resized_frame = cv2.resize(gray_frame, (84, 84))

#     # Normalize the frame to the range [0, 1]
#     normalized_frame = resized_frame / 255.0

#     if augment:
#         # Random horizontal flip
#         if np.random.random() > 0.5:
#             normalized_frame = np.fliplr(normalized_frame)
#         # Add random noise
#         if np.random.random() > 0.7:
#             noise = np.random.normal(0, 0.01, normalized_frame.shape)
#             normalized_frame = np.clip(normalized_frame + noise, 0, 1)
#     return normalized_frame

# Initialize frame stack
frame_stack = deque(maxlen=4)

# Reset environment and initialize stack
def reset_env_with_stack(env):
    state, info = env.reset()
    # processed_frame = preprocess_frame(state)
    for _ in range(4):  # Stack 4 identical frames initially
        frame_stack.append(state)
    return np.stack(frame_stack, axis=-1), info  # Shape: (84, 84, 4)

# Step the environment with frame stacking
def step_env_with_stack(env, action):
    next_state, reward, done, truncated, info = env.step(action)
    # processed_frame = preprocess_frame(next_state)
    frame_stack.append(state)
    return np.stack(frame_stack, axis=-1), reward, done, truncated, info

state, _ = env.reset()
# processed_frame = preprocess_frame(state, augment=False)
print("Processed frame shape:", state.shape)

Processed frame shape: (128,)


In [5]:
# import matplotlib.pyplot as plt

# state, _ = env.reset()
# # processed_frame = preprocess_frame(state, augment=False)
# plt.imshow(processed_frame, cmap='gray')
# plt.title("Augmented Frame")
# plt.show()

In [6]:
import tensorflow as tf
from tensorflow.keras import layers, models

# Define the DQN model
def create_dqn(action_space):
    model = models.Sequential([
        layers.Input(shape=(128, 4)),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(action_space, activation='linear')
    ])
    return model

# Initialize the model
dqn_model = create_dqn(env.action_space.n)
dqn_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten (Flatten)           (None, 512)               0         
                                                                 
 dense (Dense)               (None, 64)                32832     
                                                                 
 dense_1 (Dense)             (None, 6)                 390       
                                                                 
Total params: 33,222
Trainable params: 33,222
Non-trainable params: 0
_________________________________________________________________


In [7]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards),
            np.array(next_states),
            np.array(dones)
        )

    def size(self):
        return len(self.buffer)

replay_buffer = ReplayBuffer(capacity=50000)

In [8]:
import random

def select_action(model, state, epsilon, action_space):
    if random.random() < epsilon:
        return random.randint(0, action_space - 1)  # Random action
    else:
        q_values = model.predict(state[None, ...], verbose=0)
        return np.argmax(q_values)

In [9]:
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

def record_gameplay(env, model, video_path="trained_agent_gameplay.mp4"):
    recorder = VideoRecorder(env, video_path)

    frame_stack = deque(maxlen=4)

    state, info = env.reset()
    
    for _ in range(4):
        frame_stack.append(state)

    state = np.stack(frame_stack, axis=-1)

    for _ in range(500):
        action = np.argmax(model.predict(state[None, ...], verbose=0))

        next_state, reward, done, truncated, info = env.step(action)

        recorder.capture_frame()

        frame_stack.append(next_state)
        state = np.stack(frame_stack, axis=-1)

        if done:
            break
    
    recorder.close()
    recorder.enabled = False
    print(f"Gameplay recorded at recordings/{video_path}")


In [None]:
def soft_decay_interpolation(t, a, b, k=10):
    return b - (b - a) / (1.0 + np.exp(k * (t - 0.5)))

def train_dqn(env, model, target_model, replay_buffer, episodes=500, batch_size=32, gamma=0.99, epsilon_min=0.1, epsilon_max=1, update_target=10, verbose=0):
    """
    Train the DQN model using survival-based rewards.
    """
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)
    loss_fn = tf.keras.losses.Huber()
    rewards_history = []
    epsilon = epsilon_max

    for episode in range(episodes):
        state, _ = reset_env_with_stack(env)
        total_reward = 0
        episode_loss = 0
        total_lives = 4
        num_of_NOOPs = 0

        for step in range(10_000):  # Limit steps per episode
            action = select_action(model, state, epsilon, env.action_space.n)
            next_state, reward, done, truncated, info = step_env_with_stack(env, action)

            # TODO: Test reward function
            # (gymnasium.wrappers.TransformReward)[https:#gymnasium.farama.org/v0.27.0/api/wrappers/reward_wrappers/]
            # # Calculate adjusted reward
            # base_reward = 1  # Reward for surviving
            # penalty = -120 / survival_time if done else 0  # Penalize more for short survival
            # bonus = 15 if reward > 0  else 0
            # adjusted_reward = base_reward + reward + penalty + bonus

            # if survival_time == 350:  # Encourage longer survival
            #     adjusted_reward += 25

            adjusted_reward = 0.001 * step
            if info.get("lives") < total_lives:
                adjusted_reward -= 50

            # if step > 30:
            #     adjusted_reward -= 1
            
            if action == 0 or action == 1: # TODO: check if fire should be included here
                num_of_NOOPs += 1
                if num_of_NOOPs >= 3:
                    adjusted_reward -= 5
            else:
                num_of_NOOPs = 0
                
            if "cube_color_changed" in info and info["cube_color_changed"]:
                adjusted_reward += 5
            # Bonus points for completing the round
            if "round_completed" in info and info["round_completed"]:
                adjusted_reward += 1000





            # Store adjusted reward in replay buffer
            replay_buffer.add((state, action, adjusted_reward, next_state, done))
            state = next_state
            total_reward += adjusted_reward

            if replay_buffer.size() > batch_size:
                # Sample from the replay buffer
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size=batch_size)

                # Double DQN: Use the model to select actions, but the target model to evaluate Q-values
                next_q_actions = np.argmax(model.predict(next_states, verbose=verbose), axis=1)
                next_q_values = target_model.predict(next_states, verbose=verbose) # TODO: Print out q values
                target_q_values = rewards + gamma * next_q_values[np.arange(batch_size), next_q_actions] * (1 - dones)

                with tf.GradientTape() as tape:
                    q_values = model(states)
                    q_values = tf.reduce_sum(q_values * tf.one_hot(actions, env.action_space.n), axis=1)
                    loss = loss_fn(target_q_values, q_values)
                    episode_loss += loss.numpy()

                grads = tape.gradient(loss, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))

            
            if info.get("lives") < total_lives:
                break
            if done or truncated:
                break


        # Update target model weights periodically
        if episode % update_target == 0:
            target_model.set_weights(model.get_weights())

        rewards_history.append(total_reward)
        print(f"Episode {episode + 1}/{episodes}, Reward: {total_reward:.2f}, Survival Time: {step}, Epsilon: {epsilon:.3f}")
        
        epsilon = soft_decay_interpolation(episode/(episodes-1), epsilon_max, epsilon_min)

    return rewards_history

target_model = create_dqn(env.action_space.n)
target_model.set_weights(dqn_model.get_weights())  # Synchronize weights

rewards_history = train_dqn(
    env=env,
    model=dqn_model,
    target_model=target_model,
    replay_buffer=replay_buffer,
    episodes=1800,  # Adjust for quick testing
    batch_size=32,
    gamma=0.9,
    epsilon_min=0.1,
    epsilon_max=1.0,
    update_target=10, # TODO: Test varying this
    verbose=0
)
print("Training complete!")


Episode 1/1800, Reward: -57.86, Survival Time: 65, Epsilon: 1.000
Episode 2/1800, Reward: -70.44, Survival Time: 95, Epsilon: 0.994
Episode 3/1800, Reward: -62.86, Survival Time: 65, Epsilon: 0.994
Episode 4/1800, Reward: -73.05, Survival Time: 62, Epsilon: 0.994
Episode 5/1800, Reward: -66.74, Survival Time: 128, Epsilon: 0.994
Episode 6/1800, Reward: -53.52, Survival Time: 54, Epsilon: 0.994
Episode 7/1800, Reward: -62.25, Survival Time: 124, Epsilon: 0.994
Episode 8/1800, Reward: -58.57, Survival Time: 53, Epsilon: 0.994
Episode 9/1800, Reward: -47.86, Survival Time: 65, Epsilon: 0.994
Episode 10/1800, Reward: -57.92, Survival Time: 64, Epsilon: 0.994
Episode 11/1800, Reward: -48.57, Survival Time: 53, Epsilon: 0.994
Episode 12/1800, Reward: -48.77, Survival Time: 49, Epsilon: 0.994
Episode 13/1800, Reward: -97.25, Survival Time: 124, Epsilon: 0.994
Episode 14/1800, Reward: -63.05, Survival Time: 62, Epsilon: 0.994
Episode 15/1800, Reward: -42.74, Survival Time: 120, Epsilon: 0.994


In [None]:
record_gameplay(env, dqn_model, video_path="trained_agent_gameplay.mp4")

In [None]:
# Save the model's weights
dqn_model.save_weights("models/dqn_model_weights.h5")
print("Model weights saved!")

In [None]:
# Save the entire model
dqn_model.save("models/dqn_model.h5")
print("Model saved!")

In [None]:
def evaluate_agent(env, model, episodes=10):
    total_rewards = []
    for episode in range(episodes):
        state, _ = reset_env_with_stack(env)
        total_reward = 0
        done = False

        while not done:
            action = np.argmax(model.predict(state[None, ...], verbose=0))
            next_state, reward, done, truncated, _ = step_env_with_stack(env, action)
            state = next_state
            total_reward += reward

        total_rewards.append(total_reward)
        print(f"Episode {episode + 1}, Reward: {total_reward}")

    print(f"Average Reward: {np.mean(total_rewards)}")

evaluate_agent(env, dqn_model, episodes=3)

In [None]:
import matplotlib.pyplot as plt

def plot_rewards(rewards):
    plt.plot(rewards)
    plt.xlabel('Episodes')
    plt.ylabel('Total Reward')
    plt.title('Training Rewards')
    plt.show()

# Example
plot_rewards(rewards_history)


In [None]:
# Calculate key statistics
average_reward = np.mean(rewards_history)
max_reward = np.max(rewards_history)

# Plot the rewards with highlights
plt.plot(rewards_history, label="Raw Rewards")
plt.axhline(average_reward, color='r', linestyle='--', label=f"Average Reward: {average_reward:.2f}")
plt.axhline(max_reward, color='g', linestyle='--', label=f"Max Reward: {max_reward}")
plt.title("Training Rewards with Statistics")
plt.xlabel("Episodes")
plt.ylabel("Total Reward")
plt.legend()
plt.show()


In [None]:
window = 10  # Adjust the window size as needed
smoothed_rewards = np.convolve(rewards_history, np.ones(window)/window, mode='valid')

# Plot smoothed rewards
plt.plot(smoothed_rewards)
plt.title("Smoothed Training Rewards")
plt.xlabel("Episodes")
plt.ylabel("Total Reward")
plt.show()

In [None]:
# Segment rewards into blocks
block_size = 10
block_means = [np.mean(rewards_history[i:i+block_size]) for i in range(0, len(rewards_history), block_size)]

# Plot block means
plt.plot(block_means)
plt.title("Mean Reward Per Block of Episodes")
plt.xlabel("Blocks (10 episodes each)")
plt.ylabel("Mean Reward")
plt.show()


In [None]:
# Calculate cumulative rewards
cumulative_rewards = np.cumsum(rewards_history)

# Plot cumulative rewards
plt.plot(cumulative_rewards)
plt.title("Cumulative Training Rewards")
plt.xlabel("Episodes")
plt.ylabel("Cumulative Reward")
plt.show()
