In [None]:
# Dominion Gbadamosi - 20243561
## code runs
### Reference to existing implementation: https://www.kaggle.com/code/rooshroosh/dqn-breakout

## Import Libraries and Define Environment

In [None]:
# Importing Necessary Libraries
import gym
from gym import envs, wrappers
import numpy as np
import tensorflow as tf
from keras.models import Model
from keras.layers import Input, Dense, Flatten, concatenate
from keras.optimizers import Adam
import cv2
from collections import deque
import random
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import display, clear_output
from keras.preprocessing import image
import os
import sys

In [None]:
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)


print("Num GPUs Available: ", len(physical_devices))

# Create the Breakout environment
env = gym.make('Breakout-v4', render_mode='rgb_array')
env.metadata['render_fps'] = 60  # Set the desired fps value
# Reset the environment
state = env.reset()

#Display the action meanings
env.unwrapped.get_action_meanings()

# Action and observation spaces
print("Action space:", env.action_space)
print("Observation space:", env.observation_space)

# Maximum episode steps
print("Max episode steps:", env.spec.max_episode_steps)

# Reward range
print("Reward range:", env.reward_range)

# Display the Breakout environment after the reset
plt.imshow(env.render())
plt.show()

## Defining Basic Hyperparameters

In [None]:
learning_rate=0.0001
# For training the agent
episodes = 300
batch_size = 32
state_size = (84, 84, 1)  # Update the state size to match the input shape

## Defining Dueling DQN Model (w/ more Hyperparameters)

In [None]:
def build_model(input_shape, num_actions):

    tf.random.set_seed(42) #set a specific seed value
    
    # Neural network architecture for the dueling DQN
    input_layer = Input(shape=input_shape)
    x = Flatten()(input_layer)
    x = Dense(512, activation='relu')(x)

    # Value stream
    val_stream = Dense(256, activation='relu')(x)
    value = Dense(1)(val_stream)

    # Advantage stream
    adv_stream = Dense(256, activation='relu')(x)
    advantage = Dense(num_actions)(adv_stream)

    # Combine value and advantage streams
    mean_advantage = concatenate([value, advantage])

    # Duelling layer
    dueling_layer = Dense(num_actions + 1, activation='linear')(mean_advantage)

    model = Model(inputs=input_layer, outputs=dueling_layer)
    model.compile(optimizer=Adam(learning_rate), loss='mse')

    return model

## Defining the DQN Agent

In [None]:
# Define the agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        # Agent initialization (More Hyperparameters)
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=10000)  # Replay memory buffer
        self.gamma = 0.99  # Discount factor for future rewards
        self.epsilon = 1.0  # Exploration-exploitation trade-off
        self.epsilon_decay = 0.995  # Decay rate for exploration
        self.epsilon_min = 0.01  # Minimum exploration rate
        self.model = build_model(state_size, action_size)  # Main DQN model
        self.target_model = build_model(state_size, action_size)  # Target DQN model

    def preprocess_state(self, state):
        # Extract the state from the tuple if needed
        if isinstance(state, tuple):
            state = state[0]
    
        # Ensure that the state is in the correct format (RGB image)
        if len(state.shape) == 3 and state.shape[2] == 3:
            # Convert RGB to grayscale
            state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
    
        # Resize the state
        state = cv2.resize(state, (84, 84))
    
        # Normalize pixel values to the range [0, 1]
        state = state / 255.0
    
        return state


    def remember(self, state, action, reward, next_state, done):
        # Extract the actual state and next_state from the tuple if needed
        state = state[0] if isinstance(state, tuple) else state
        next_state = next_state[0] if isinstance(next_state, tuple) else next_state

        # Store experience in replay memory
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        # Exploration-exploitation trade-off when selecting an action
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)

        # The state is a tuple, so extract the actual state
        state = state[0] if isinstance(state, tuple) else state

        act_values = self.model.predict(state)

        # Ensure the action is within the valid range
        action = np.argmax(act_values[0])
        action = np.clip(action, 0, self.action_size - 1)

        return action

    def replay(self, batch_size):
        # Experience replay to train the agent
    
        # Sample a minibatch from the replay memory
        minibatch = random.sample(self.memory, batch_size)
    
        # Iterate through the minibatch
        for state, action, reward, next_state, done in minibatch:
            # Calculate the target Q-value
            target = self.model.predict(state)
    
            # Update the target Q-value based on whether the episode is done
            if done:
                target[0][action] = reward
            else:
                # Calculate the Q-values for the next state from both the model and target model
                q_values_next_state_model = self.model.predict(next_state)[0]
                q_values_next_state_target = self.target_model.predict(next_state)[0]
    
                # Update the target Q-value using the Q-value of the action with the highest Q-value in the next state
                target[0][action] = reward + self.gamma * q_values_next_state_target[np.argmax(q_values_next_state_model)]
    
            # Update the model using the calculated target Q-value
            self.model.fit(state, target, epochs=50, verbose=0)
    
        # Update epsilon for epsilon-greedy exploration
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


    def update_target_model(self):
        # Update target model weights with main model weights
        self.target_model.set_weights(self.model.get_weights())


## Training

In [None]:
# Lists to store episode rewards, average rewards, and model loss over time
episode_rewards = []
average_rewards = []
model_loss = []
episode_info = []
frames = []

# Initialize the agent
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

N = 10
# Create a live plot
fig, axs = plt.subplots(3, 1, figsize=(10, 16))

def update_plot(frame):
    axs[0].clear()
    axs[0].plot(episode_rewards[:frame+1], label='Episode Reward')
    axs[0].plot(average_rewards[:frame+1], label=f'Average Reward (last {N} episodes)')
    axs[0].set_title('Episode Rewards')
    axs[0].set_xlabel('Episode')
    axs[0].set_ylabel('Reward')

    axs[1].clear()
    axs[1].plot([e["Epsilon"] for e in episode_info], label='Exploration Rate (Epsilon)')
    axs[1].set_title('Exploration Rate Over Episodes')
    axs[1].set_xlabel('Episode')
    axs[1].set_ylabel('Exploration Rate')

    axs[2].clear()
    axs[2].plot([e["Iteration"] for e in episode_info], label='Episode Length')
    axs[2].set_title('Episode Length Over Episodes')
    axs[2].set_xlabel('Episode')
    axs[2].set_ylabel('Length')

    # Update the legend only once after clearing the plots
    axs[0].legend()
    axs[1].legend()
    axs[2].legend()

    # Add the current figure to the frames list
    frames.append([axs[0].plot(), axs[1].plot(), axs[2].plot()])

    print(f"Episode: {frame + 1}/{episodes}, Score: {episode_rewards[frame]}, Epsilon: {agent.epsilon:.2f}", end="\n", flush=True)

for e in range(episodes):
    print(f"Starting Episode {e + 1}")

    state = env.reset()
    state = agent.preprocess_state(state)
    state = np.reshape(state, (1, *state.shape, 1))
    game_loss_reward = -3.0  # penalty value for losing game

    total_reward = 0
    for time in range(1000):
        action = agent.act(state)

        next_state, reward, done, _ = env.step(action)[:4]

         # Apply a penalty if the game is lost and the total reward is 0 or negative
        if done and reward <= 0:
            reward += game_loss_reward

        # Introduction of a positive reward for breaking bricks
        brick_broken_reward = 1.0 if reward > 0 else 0.0
        reward += brick_broken_reward

        total_reward += reward
        
        next_state = agent.preprocess_state(next_state)
        next_state = np.reshape(next_state, (1, *next_state.shape, 1))
        agent.remember(state, action, reward, next_state, done)
        state = next_state

        if done:
            agent.update_target_model()
            episode_rewards.append(total_reward)

            avg_reward = np.mean(episode_rewards[-N:])
            average_rewards.append(avg_reward)

            if len(agent.memory) > batch_size:
                loss = agent.replay(batch_size)
                model_loss.append(loss)

            # Update the live plot using FuncAnimation
            update_plot(len(episode_rewards)-1)

            # Print debugging information
            print("Q-values:", agent.model.predict(state))
            print("Loss:", loss)
            print("Experience Replay Buffer Size:", len(agent.memory))

            print("Episode Done. Total Reward:", total_reward)
            print("Exploration Rate (Epsilon):", agent.epsilon)

            # Display the current figure
            display(fig)

            episode_info.append({
                "Episode": e + 1,
                "Iteration": time + 1,
                "Score": total_reward,
                "Epsilon": agent.epsilon
            })

            break

    # Comment out or remove these lines to avoid rendering and saving frames
    img = env.render()
        
    # Create a new empty figure to save the frame without labels
    empty_fig, empty_ax = plt.subplots(1, 1, figsize=(10, 16))
    empty_ax.imshow(img)
    empty_ax.axis('off')

    plt.savefig(f'frame_{e + 1}.png', bbox_inches='tight')
    plt.close(empty_fig)

env.close()
# Close the live plot at the end
plt.close(fig)

# Video

In [None]:
# Create a video from the saved frames using OpenCV
frame = cv2.imread(f'frame_1.png')
height, width, layers = frame.shape

video = cv2.VideoWriter('_model_video.mp4', cv2.VideoWriter_fourcc(*'mp4v'), 60, (width, height))

for i in range(1, episodes + 1):
    # Write the same frame multiple times to reduce the effective frame rate
    for _ in range(4):  # Adjust the number of duplicates based on your preference
        video.write(cv2.imread(f'frame_{i}.png'))
        
cv2.destroyAllWindows()
video.release()