## Setup

In [None]:
!pip install gymnasium[atari] tensorflow keras ale-py



In [None]:
!pip install gymnasium[accept-rom-license]

Collecting autorom~=0.4.2 (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license])
  Downloading AutoROM-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Collecting AutoROM.accept-rom-license (from autorom[accept-rom-license]~=0.4.2; extra == "accept-rom-license"->gymnasium[accept-rom-license])
  Downloading AutoROM.accept-rom-license-0.6.1.tar.gz (434 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.7/434.7 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Downloading AutoROM-0.4.2-py3-none-any.whl (16 kB)
Building wheels for collected packages: AutoROM.accept-rom-license
  Building wheel for AutoROM.accept-rom-license (pyproject.toml) ... [?25l[?25hdone
  Created wheel for AutoROM.accept-rom-license: filename=autorom_accept_rom_license-0.6.1-py3-none-

In [None]:
!pip show gymnasium

Name: gymnasium
Version: 0.28.1
Summary: A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym).
Home-page: https://farama.org
Author: 
Author-email: Farama Foundation <contact@farama.org>
License: MIT License
Location: /usr/local/lib/python3.11/dist-packages
Requires: cloudpickle, farama-notifications, jax-jumpy, numpy, typing-extensions
Required-by: dopamine_rl, Shimmy


In [None]:
import gymnasium.wrappers.frame_stack
print(dir(gymnasium.wrappers.frame_stack))

['Box', 'DependencyNotInstalled', 'FrameStack', 'LazyFrames', 'Union', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'deque', 'gym', 'np']


In [None]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing
from gymnasium.wrappers.frame_stack import FrameStack
import numpy as np
import tensorflow as tf

   # ... (rest of the code remains the same)

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
epsilon = 1.0  # Epsilon greedy parameter
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1.0  # Maximum epsilon greedy parameter
epsilon_interval = (
    epsilon_max - epsilon_min
)  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 10000
max_episodes = 10  # Limit training episodes, will run until solved if smaller than 1

# Use the Atari environment
# Specify the `render_mode` parameter to show the attempts of the agent in a pop up window.
env = gym.make("BreakoutNoFrameskip-v4")  # , render_mode="human")
# Environment preprocessing
env = AtariPreprocessing(env)
# Stack four frames
env = FrameStack(env, 4)
env.seed(seed)

(3444837047, 2669555309)

## Implement the Deep Q-Network

This network learns an approximation of the Q-table, which is a mapping between
the states and actions that an agent will take. For every state we'll have four
actions, that can be taken. The environment provides the state, and the action
is chosen by selecting the larger of the four Q-values predicted in the output layer.

In [None]:
num_actions = 4


def create_q_model():
    # Network defined by the Deepmind paper
    return keras.Sequential(
        [
            layers.Lambda(
                lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
                output_shape=(84, 84, 4),
                input_shape=(4, 84, 84),
            ),
            # Convolutions on the frames on the screen
            layers.Conv2D(32, 8, strides=4, activation="relu", input_shape=(4, 84, 84)),
            layers.Conv2D(64, 4, strides=2, activation="relu"),
            layers.Conv2D(64, 3, strides=1, activation="relu"),
            layers.Flatten(),
            layers.Dense(512, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )


# The first model makes the predictions for Q-values which are used to
# make a action.
model = create_q_model()
# Build a target model for the prediction of future rewards.
# The weights of a target model get updated every 10000 steps thus when the
# loss between the Q-values is calculated the target Q-value is stable.
model_target = create_q_model()


  super().__init__(**kwargs)
  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


## Train

In [None]:
# In the Deepmind paper they use RMSProp however then Adam optimizer
# improves training time
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)

# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# Number of frames to take random action and observe output
epsilon_random_frames = 50000
# Number of frames for exploration
epsilon_greedy_frames = 1000000.0
# Maximum replay length
# Note: The Deepmind paper suggests 1000000 however this causes memory issues
max_memory_length = 100000
# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network
update_target_network = 10000
# Using huber loss for stability
loss_function = keras.losses.Huber()

while True:
    observation, _ = env.reset()
    state = np.array(observation)
    episode_reward = 0

    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        # Use epsilon-greedy for exploration
        if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
            # Take random action
            action = np.random.choice(num_actions)
        else:
            # Predict action Q-values
            # From environment state
            state_tensor = keras.ops.convert_to_tensor(state)
            state_tensor = keras.ops.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            # Take best action
            action = keras.ops.argmax(action_probs[0]).numpy()

        # Decay probability of taking random action
        epsilon -= epsilon_interval / epsilon_greedy_frames
        epsilon = max(epsilon, epsilon_min)

        # Apply the sampled action in our environment
        state_next, reward, done, _, _ = env.step(action)
        state_next = np.array(state_next)

        episode_reward += reward

        # Save actions and states in replay buffer
        action_history.append(action)
        state_history.append(state)
        state_next_history.append(state_next)
        done_history.append(done)
        rewards_history.append(reward)
        state = state_next

        # Update every fourth frame and once batch size is over 32
        if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
            # Get indices of samples for replay buffers
            indices = np.random.choice(range(len(done_history)), size=batch_size)

            # Using list comprehension to sample from replay buffer
            state_sample = np.array([state_history[i] for i in indices])
            state_next_sample = np.array([state_next_history[i] for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = keras.ops.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )

            # Build the updated Q-values for the sampled future states
            # Use the target model for stability
            future_rewards = model_target.predict(state_next_sample)
            # Q value = reward + discount factor * expected future reward
            updated_q_values = rewards_sample + gamma * keras.ops.amax(
                future_rewards, axis=1
            )

            # If final frame set the last value to -1
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample

            # Create a mask so we only calculate loss on the updated Q-values
            masks = keras.ops.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                # Train the model on the states and updated Q-values
                q_values = model(state_sample)

                # Apply the masks to the Q-values to get the Q-value for action taken
                q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
                # Calculate loss between new Q-value and old Q-value
                loss = loss_function(updated_q_values, q_action)

            # Backpropagation
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if frame_count % update_target_network == 0:
            # update the the target network with new weights
            model_target.set_weights(model.get_weights())
            # Log details
            template = "running reward: {:.2f} at episode {}, frame count {}"
            print(template.format(running_reward, episode_count, frame_count))

        # Limit the state and reward history
        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break

    # Update running reward to check condition for solving
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    if running_reward > 40:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break

    if (
        max_episodes > 0 and episode_count >= max_episodes
    ):  # Maximum number of episodes reached
        print("Stopped at episode {}!".format(episode_count))
        break

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 4s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 27ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms

## Visualizations
Before any training:
![Imgur](https://i.imgur.com/rRxXF4H.gif)

In early stages of training:
![Imgur](https://i.imgur.com/X8ghdpL.gif)

In later stages of training:
![Imgur](https://i.imgur.com/Z1K6qBQ.gif)

videos testing for these stages

In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from keras import layers
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing
from gymnasium.wrappers.frame_stack import FrameStack
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
import matplotlib.patheffects as PathEffects
import imageio.v2 as imageio
import time
from collections import deque

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
epsilon = 1.0  # Epsilon greedy parameter
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1.0  # Maximum epsilon greedy parameter
epsilon_interval = (
    epsilon_max - epsilon_min
)  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 10000
max_episodes = 10  # Limit training episodes, will run until solved if smaller than 1

# Create directory for saving visualizations
os.makedirs("video_visualizations", exist_ok=True)

# Create environment
def make_env(render=False):
    render_mode = "rgb_array" if render else None
    env = gym.make("BreakoutNoFrameskip-v4", render_mode=render_mode)
    env = AtariPreprocessing(env)
    env = FrameStack(env, 4)
    env.seed(seed)
    return env

num_actions = 4

def create_q_model():
    # Network defined by the Deepmind paper
    return keras.Sequential(
        [
            layers.Lambda(
                lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
                output_shape=(84, 84, 4),
                input_shape=(4, 84, 84),
            ),
            # Convolutions on the frames on the screen
            layers.Conv2D(32, 8, strides=4, activation="relu"),
            layers.Conv2D(64, 4, strides=2, activation="relu"),
            layers.Conv2D(64, 3, strides=1, activation="relu"),
            layers.Flatten(),
            layers.Dense(512, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )

# Create and compile models
model = create_q_model()
model_target = create_q_model()
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
loss_function = keras.losses.Huber()

# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0

# Number of frames to take random action and observe output
epsilon_random_frames = 50000
# Number of frames for exploration
epsilon_greedy_frames = 1000000.0
# Maximum replay length
max_memory_length = 100000
# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network
update_target_network = 10000

# Function to enhance frame for video
def enhance_frame(frame, episode, score, stage):
    """Enhance a frame with additional info for video recording"""
    # Convert to RGB if grayscale
    if len(frame.shape) == 2:
        frame_rgb = np.stack([frame, frame, frame], axis=2)
    else:
        frame_rgb = frame

    # Scale to ensure proper dimensions for display
    h, w = frame_rgb.shape[:2]
    scaling_factor = 4  # Scale up for better visibility
    enhanced_frame = np.zeros((h * scaling_factor, w * scaling_factor, 3), dtype=np.uint8)

    # Use simple nearest-neighbor scaling
    for i in range(h):
        for j in range(w):
            enhanced_frame[i*scaling_factor:(i+1)*scaling_factor,
                           j*scaling_factor:(j+1)*scaling_factor] = frame_rgb[i, j]

    # Add colored bar at top (similar to Breakout's colorful bricks)
    bar_height = 20
    enhanced_frame[:bar_height, :, :] = [0, 0, 0]  # Black background

    # Add colored stripes
    colors = [
        [221, 0, 0],      # Red
        [255, 206, 0],    # Yellow
        [0, 128, 0],      # Green
        [0, 0, 255]       # Blue
    ]

    stripe_height = 5
    for i, color in enumerate(colors):
        top = bar_height + i * stripe_height
        bottom = top + stripe_height
        enhanced_frame[top:bottom, :, :] = color

    # Add text with episode and score info
    import cv2
    font = cv2.FONT_HERSHEY_SIMPLEX
    cv2.putText(enhanced_frame, f"Episode: {episode}", (10, bar_height//2 + 5),
                font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
    cv2.putText(enhanced_frame, f"Score: {score}", (enhanced_frame.shape[1]//2, bar_height//2 + 5),
                font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)

    # Add stage label
    stage_text = f"Stage: {stage}"
    cv2.putText(enhanced_frame, stage_text,
                (enhanced_frame.shape[1] - 150, bar_height//2 + 5),
                font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)

    return enhanced_frame

def record_episode(model, env, epsilon, episode, stage_name, random_action_prob=0.0):
    """Record a video of an episode with the current model"""
    frames = []
    observation, _ = env.reset()
    episode_reward = 0
    done = False
    step = 0

    while not done and step < 1000:  # Limit to 1000 steps for video length
        step += 1

        # Determine action
        if np.random.rand() < random_action_prob:
            # Take random action
            action = np.random.choice(num_actions)
        else:
            state_tensor = tf.convert_to_tensor(observation)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            action = tf.argmax(action_probs[0]).numpy()

        # Apply action and record frame
        next_observation, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        episode_reward += reward

        # Get frame for video (using environment's render method)
        rendered_frame = env.render()
        # For Atari preprocessing, the frame might need adjustment
        if rendered_frame is not None:
            # Add game information to the frame
            enhanced_frame = enhance_frame(rendered_frame, episode, episode_reward, stage_name)
            frames.append(enhanced_frame)

        observation = next_observation

        # Add small delay for visualization purposes
        time.sleep(0.01)

    # Save frames as video
    video_path = f"video_visualizations/{stage_name}_training_episode_{episode}.mp4"
    if frames:
        imageio.mimsave(video_path, frames, fps=20)
        print(f"Saved video to {video_path}")
    else:
        print("No frames captured for video")

    return video_path, episode_reward

def train_model(num_episodes, update_target_every=10):
    """Train the model for a specified number of episodes"""
    global frame_count, epsilon

    env = make_env()
    episode_rewards = []

    for episode in range(num_episodes):
        observation, _ = env.reset()
        episode_reward = 0
        step = 0

        while True:
            step += 1
            frame_count += 1

            # Use epsilon-greedy for exploration
            if epsilon > np.random.rand():
                # Take random action
                action = np.random.choice(num_actions)
            else:
                # Predict action Q-values
                state_tensor = tf.convert_to_tensor(observation)
                state_tensor = tf.expand_dims(state_tensor, 0)
                action_probs = model(state_tensor, training=False)
                # Take best action
                action = tf.argmax(action_probs[0]).numpy()

            # Decay probability of taking random action
            epsilon -= epsilon_interval / epsilon_greedy_frames
            epsilon = max(epsilon, epsilon_min)

            # Apply the sampled action in our environment
            next_observation, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode_reward += reward

            # Save actions and states in replay buffer
            action_history.append(action)
            state_history.append(observation)
            state_next_history.append(next_observation)
            done_history.append(float(done))
            rewards_history.append(reward)

            observation = next_observation

            # Update every fourth frame and once batch size is over 32
            if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
                # Get indices of samples for replay buffers
                indices = np.random.choice(range(len(done_history)), size=batch_size)

                # Using list comprehension to sample from replay buffer
                state_sample = np.array([state_history[i] for i in indices])
                state_next_sample = np.array([state_next_history[i] for i in indices])
                rewards_sample = [rewards_history[i] for i in indices]
                action_sample = [action_history[i] for i in indices]
                done_sample = tf.convert_to_tensor([done_history[i] for i in indices])

                # Build the updated Q-values for the sampled future states
                future_rewards = model_target.predict(state_next_sample, verbose=0)
                # Q value = reward + discount factor * expected future reward
                updated_q_values = rewards_sample + gamma * tf.reduce_max(
                    future_rewards, axis=1
                )

                # If final frame set the last value to -1
                updated_q_values = updated_q_values * (1 - done_sample) - done_sample

                # Create a mask so we only calculate loss on the updated Q-values
                masks = tf.one_hot(action_sample, num_actions)

                with tf.GradientTape() as tape:
                    # Train the model on the states and updated Q-values
                    q_values = model(state_sample)

                    # Apply the masks to the Q-values to get the Q-value for action taken
                    q_action = tf.reduce_sum(tf.multiply(q_values, masks), axis=1)
                    # Calculate loss between new Q-value and old Q-value
                    loss = loss_function(updated_q_values, q_action)

                # Backpropagation
                grads = tape.gradient(loss, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))

            # Limit the state and reward history
            if len(rewards_history) > max_memory_length:
                del rewards_history[:1]
                del state_history[:1]
                del state_next_history[:1]
                del action_history[:1]
                del done_history[:1]

            if done:
                break

        # Update running reward to check condition for solving
        episode_rewards.append(episode_reward)

        # Update target network every n episodes
        if episode % update_target_every == 0:
            model_target.set_weights(model.get_weights())
            print(f"Episode {episode}: reward = {episode_reward}, epsilon = {epsilon:.3f}")

    env.close()
    return episode_rewards

def record_stage_videos():
    """Record videos of different training stages"""
    global epsilon, model, model_target

    # Create render environments for recording
    render_env = make_env(render=True)

    # Reset model and epsilon for each stage
    model = create_q_model()
    model_target = create_q_model()

    # Stage 1: Before Training (completely random)
    print("\n--- Stage 1: Before Training ---")
    epsilon = 1.0  # Always random
    video_path, reward = record_episode(
        model, render_env, epsilon, 0, "before", random_action_prob=1.0
    )
    print(f"Before training video recorded: {video_path} with reward {reward}")

    # Train for a small number of episodes for early stage
    print("\n--- Training for Early Stage ---")
    epsilon = 1.0  # Reset epsilon for training
    train_model(5, update_target_every=2)

    # Stage 2: Early Training
    print("\n--- Stage 2: Early Training ---")
    epsilon = 0.5  # Mix of random and learned behavior
    video_path, reward = record_episode(
        model, render_env, epsilon, 50, "early", random_action_prob=0.3
    )
    print(f"Early training video recorded: {video_path} with reward {reward}")

    # Train for more episodes for later stage
    print("\n--- Training for Later Stage ---")
    train_model(15, update_target_every=3)

    # Stage 3: Later Training
    print("\n--- Stage 3: Later Training ---")
    epsilon = 0.1  # Mostly using trained model
    video_path, reward = record_episode(
        model, render_env, epsilon, 300, "later", random_action_prob=0.1
    )
    print(f"Later training video recorded: {video_path} with reward {reward}")

    render_env.close()

    return [
        f"video_visualizations/before_training_episode_0.mp4",
        f"video_visualizations/early_training_episode_50.mp4",
        f"video_visualizations/later_training_episode_300.mp4"
    ]

def main():
    """Main function to create training videos"""
    print("Creating DQN training videos for Breakout...")

    # Check if libraries are installed
    try:
        import imageio.v2
        import cv2
    except ImportError:
        print("Please install required libraries:")
        print("pip install opencv-python imageio")
        return

    video_files = record_stage_videos()

    # Display success message with file locations
    print("\nVideo files created:")
    for file in video_files:
        print(f"- {file}")
    print("\nYou can use these videos to demonstrate the agent's progress at different training stages.")

if __name__ == "__main__":
    main()

Creating DQN training videos for Breakout...

--- Stage 1: Before Training ---




Saved video to video_visualizations/before_training_episode_0.mp4
Before training video recorded: video_visualizations/before_training_episode_0.mp4 with reward 3.0

--- Training for Early Stage ---
Episode 0: reward = 3.0, epsilon = 1.000
Episode 2: reward = 1.0, epsilon = 0.999
Episode 4: reward = 0.0, epsilon = 0.999

--- Stage 2: Early Training ---




Saved video to video_visualizations/early_training_episode_50.mp4
Early training video recorded: video_visualizations/early_training_episode_50.mp4 with reward 4.0

--- Training for Later Stage ---
Episode 0: reward = 3.0, epsilon = 0.500
Episode 3: reward = 2.0, epsilon = 0.499
Episode 6: reward = 1.0, epsilon = 0.499
Episode 9: reward = 2.0, epsilon = 0.498
Episode 12: reward = 1.0, epsilon = 0.498

--- Stage 3: Later Training ---




Saved video to video_visualizations/later_training_episode_300.mp4
Later training video recorded: video_visualizations/later_training_episode_300.mp4 with reward 0.0

Video files created:
- video_visualizations/before_training_episode_0.mp4
- video_visualizations/early_training_episode_50.mp4
- video_visualizations/later_training_episode_300.mp4

You can use these videos to demonstrate the agent's progress at different training stages.


the dueling dqn below is not good

In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from keras import layers
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing
from gymnasium.wrappers.frame_stack import FrameStack
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
import matplotlib.patheffects as PathEffects
import imageio.v2 as imageio
import time
from collections import deque
import cv2

# Configuration parameters for the whole setup
seed = 42
gamma = 0.99  # Discount factor for past rewards
epsilon = 1.0  # Epsilon greedy parameter
epsilon_min = 0.1  # Minimum epsilon greedy parameter
epsilon_max = 1.0  # Maximum epsilon greedy parameter
epsilon_interval = epsilon_max - epsilon_min  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 10000
max_episodes = 300  # Increase training episodes for more training

# Create directory for saving visualizations
os.makedirs("video_visualizations", exist_ok=True)

# Create environment
def make_env(render=False):
    render_mode = "rgb_array" if render else None
    env = gym.make("BreakoutNoFrameskip-v4", render_mode=render_mode)
    env = AtariPreprocessing(env)
    env = FrameStack(env, 4)
    env.seed(seed)
    return env

num_actions = 4

# --- Improved Model Architecture ---
# We use a dueling network architecture that splits the network into two streams:
# one to estimate the state-value and one for the advantage of each action.
def create_dueling_model():
    inputs = keras.Input(shape=(4, 84, 84))
    # Transpose the input from (batch, 4, 84, 84) to (batch, 84, 84, 4)
    x = layers.Lambda(lambda tensor: tf.transpose(tensor, perm=[0, 2, 3, 1]))(inputs)
    # Convolutional layers (as in the DeepMind paper)
    x = layers.Conv2D(32, 8, strides=4, activation="relu")(x)
    x = layers.Conv2D(64, 4, strides=2, activation="relu")(x)
    x = layers.Conv2D(64, 3, strides=1, activation="relu")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(512, activation="relu")(x)

    # Dueling streams: Advantage and Value
    advantage = layers.Dense(num_actions)(x)
    value = layers.Dense(1)(x)

    # Combine streams to get Q-values: Q(s, a) = V(s) + (A(s, a) - mean(A(s, a)))
    advantage_mean = layers.Lambda(lambda adv: tf.reduce_mean(adv, axis=1, keepdims=True))(advantage)
    q_values = layers.Add()([value, layers.Subtract()([advantage, advantage_mean])])

    model = keras.Model(inputs=inputs, outputs=q_values)
    return model

# Create and compile models using the improved architecture
model = create_dueling_model()
model_target = create_dueling_model()
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
loss_function = keras.losses.Huber()

# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0

# Number of frames to take random action and observe output
epsilon_random_frames = 50000
# Number of frames for exploration
epsilon_greedy_frames = 1000000.0
# Maximum replay length
max_memory_length = 100000
# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network (in episodes)
update_target_every_episodes = 2

# Function to enhance frame for video
def enhance_frame(frame, episode, score, stage):
    """Enhance a frame with additional info for video recording"""
    # Convert to RGB if grayscale
    if len(frame.shape) == 2:
        frame_rgb = np.stack([frame, frame, frame], axis=2)
    else:
        frame_rgb = frame

    # Scale to ensure proper dimensions for display
    h, w = frame_rgb.shape[:2]
    scaling_factor = 4  # Scale up for better visibility
    enhanced_frame = np.zeros((h * scaling_factor, w * scaling_factor, 3), dtype=np.uint8)

    # Use simple nearest-neighbor scaling
    for i in range(h):
        for j in range(w):
            enhanced_frame[i*scaling_factor:(i+1)*scaling_factor,
                           j*scaling_factor:(j+1)*scaling_factor] = frame_rgb[i, j]

    # Add colored bar at top (similar to Breakout's colorful bricks)
    bar_height = 20
    enhanced_frame[:bar_height, :, :] = [0, 0, 0]  # Black background

    # Add colored stripes
    colors = [
        [221, 0, 0],      # Red
        [255, 206, 0],    # Yellow
        [0, 128, 0],      # Green
        [0, 0, 255]       # Blue
    ]

    stripe_height = 5
    for i, color in enumerate(colors):
        top = bar_height + i * stripe_height
        bottom = top + stripe_height
        enhanced_frame[top:bottom, :, :] = color

    # Add text with episode and score info
    font = cv2.FONT_HERSHEY_SIMPLEX
    cv2.putText(enhanced_frame, f"Episode: {episode}", (10, bar_height//2 + 5),
                font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
    cv2.putText(enhanced_frame, f"Score: {score}", (enhanced_frame.shape[1]//2, bar_height//2 + 5),
                font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)

    # Add stage label
    stage_text = f"Stage: {stage}"
    cv2.putText(enhanced_frame, stage_text,
                (enhanced_frame.shape[1] - 150, bar_height//2 + 5),
                font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)

    return enhanced_frame

def record_episode(model, env, epsilon, episode, stage_name, random_action_prob=0.0):
    """Record a video of an episode with the current model"""
    frames = []
    observation, _ = env.reset()
    episode_reward = 0
    done = False
    step = 0

    while not done and step < 1000:  # Limit to 1000 steps for video length
        step += 1

        # Determine action
        if np.random.rand() < random_action_prob:
            action = np.random.choice(num_actions)
        else:
            state_tensor = tf.convert_to_tensor(observation)
            state_tensor = tf.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            action = tf.argmax(action_probs[0]).numpy()

        # Apply action and record frame
        next_observation, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        episode_reward += reward

        rendered_frame = env.render()
        if rendered_frame is not None:
            enhanced_frame = enhance_frame(rendered_frame, episode, episode_reward, stage_name)
            frames.append(enhanced_frame)

        observation = next_observation
        time.sleep(0.01)

    video_path = f"video_visualizations/{stage_name}_training_episode_{episode}.mp4"
    if frames:
        imageio.mimsave(video_path, frames, fps=20)
        print(f"Saved video to {video_path}")
    else:
        print("No frames captured for video")

    return video_path, episode_reward

def train_model(num_episodes, update_target_every_episodes=2):
    """Train the model for a specified number of episodes using Double DQN and dueling architecture"""
    global frame_count, epsilon

    env = make_env()
    episode_rewards = []

    for episode in range(num_episodes):
        observation, _ = env.reset()
        episode_reward = 0
        step = 0

        while True:
            step += 1
            frame_count += 1

            # Epsilon-greedy action selection
            if epsilon > np.random.rand():
                action = np.random.choice(num_actions)
            else:
                state_tensor = tf.convert_to_tensor(observation)
                state_tensor = tf.expand_dims(state_tensor, 0)
                action_probs = model(state_tensor, training=False)
                action = tf.argmax(action_probs[0]).numpy()

            # Decay epsilon
            epsilon -= epsilon_interval / epsilon_greedy_frames
            epsilon = max(epsilon, epsilon_min)

            # Take action
            next_observation, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode_reward += reward

            # Save experience to replay buffer
            action_history.append(action)
            state_history.append(observation)
            state_next_history.append(next_observation)
            done_history.append(float(done))
            rewards_history.append(reward)

            observation = next_observation

            # Update every 4 actions if enough samples are available
            if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
                indices = np.random.choice(range(len(done_history)), size=batch_size)
                state_sample = np.array([state_history[i] for i in indices])
                state_next_sample = np.array([state_next_history[i] for i in indices])
                rewards_sample = np.array([rewards_history[i] for i in indices], dtype=np.float32)
                action_sample = np.array([action_history[i] for i in indices])
                done_sample = np.array([done_history[i] for i in indices], dtype=np.float32)

                # --- Double DQN update ---
                # 1. Use the main network to choose the best actions for next states.
                q_next_main = model(state_next_sample, training=False)
                best_actions = tf.argmax(q_next_main, axis=1)
                # 2. Evaluate those actions using the target network.
                q_next_target = model_target(state_next_sample, training=False)
                batch_indices = tf.range(tf.shape(q_next_target)[0])
                #selected_future_rewards = tf.gather_nd(q_next_target, tf.stack([batch_indices, best_actions], axis=1))
                selected_future_rewards = tf.gather_nd(q_next_target, tf.stack([batch_indices, tf.cast(best_actions, tf.int32)], axis=1))

                # Compute the updated Q-values
                updated_q_values = rewards_sample + gamma * (1 - done_sample) * selected_future_rewards.numpy()

                # Create a mask for the actions taken
                masks = tf.one_hot(action_sample, num_actions)

                with tf.GradientTape() as tape:
                    q_values = model(state_sample, training=True)
                    q_action = tf.reduce_sum(q_values * masks, axis=1)
                    loss = loss_function(updated_q_values, q_action)

                grads = tape.gradient(loss, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))

            if len(rewards_history) > max_memory_length:
                del rewards_history[:1]
                del state_history[:1]
                del state_next_history[:1]
                del action_history[:1]
                del done_history[:1]

            if done:
                break

        episode_rewards.append(episode_reward)

        # Update the target network periodically
        if episode % update_target_every_episodes == 0:
            model_target.set_weights(model.get_weights())
            print(f"Episode {episode}: reward = {episode_reward}, epsilon = {epsilon:.3f}")

    env.close()
    return episode_rewards

def record_stage_videos():
    """Record videos of different training stages using the improved model"""
    global epsilon, model, model_target

    render_env = make_env(render=True)

    # Reset model and epsilon for recording
    model = create_dueling_model()
    model_target = create_dueling_model()

    # Stage 1: Before Training (completely random)
    print("\n--- Stage 1: Before Training ---")
    epsilon = 1.0  # Always random
    video_path, reward = record_episode(model, render_env, epsilon, 0, "before", random_action_prob=1.0)
    print(f"Before training video recorded: {video_path} with reward {reward}")

    # Train for a few episodes for early training stage
    print("\n--- Training for Early Stage ---")
    epsilon = 1.0  # Reset epsilon for training
    train_model(5, update_target_every_episodes=2)

    # Stage 2: Early Training
    print("\n--- Stage 2: Early Training ---")
    epsilon = 0.5  # Mix of random and learned behavior
    video_path, reward = record_episode(model, render_env, epsilon, 5, "early", random_action_prob=0.3)
    print(f"Early training video recorded: {video_path} with reward {reward}")

    # Train for more episodes for later stage
    print("\n--- Training for Later Stage ---")
    train_model(15, update_target_every_episodes=3)

    # Stage 3: Later Training
    print("\n--- Stage 3: Later Training ---")
    epsilon = 0.1  # Mostly using the trained model
    video_path, reward = record_episode(model, render_env, epsilon, 600, "later", random_action_prob=0.1)
    print(f"Later training video recorded: {video_path} with reward {reward}")

    render_env.close()

    return [
        "video_visualizations/before_training_episode_0.mp4",
        "video_visualizations/early_training_episode_5.mp4",
        "video_visualizations/later_training_episode_600.mp4"
    ]

def main():
    """Main function to create training videos with improved performance"""
    print("Creating improved DQN training videos for Breakout with dueling architecture and Double DQN...")

    # Check if required libraries are installed
    try:
        import imageio.v2
        import cv2
    except ImportError:
        print("Please install required libraries:")
        print("pip install opencv-python imageio")
        return

    video_files = record_stage_videos()

    # Display success message with file locations
    print("\nVideo files created:")
    for file in video_files:
        print(f"- {file}")
    print("\nYou can use these videos to demonstrate the agent's progress at different training stages.")

if __name__ == "__main__":
    main()


Creating improved DQN training videos for Breakout with dueling architecture and Double DQN...

--- Stage 1: Before Training ---




Saved video to video_visualizations/before_training_episode_0.mp4
Before training video recorded: video_visualizations/before_training_episode_0.mp4 with reward 1.0

--- Training for Early Stage ---
Episode 0: reward = 0.0, epsilon = 1.000
Episode 2: reward = 3.0, epsilon = 0.999
Episode 4: reward = 5.0, epsilon = 0.999

--- Stage 2: Early Training ---




Saved video to video_visualizations/early_training_episode_5.mp4
Early training video recorded: video_visualizations/early_training_episode_5.mp4 with reward 4.0

--- Training for Later Stage ---
Episode 0: reward = 0.0, epsilon = 0.500
Episode 3: reward = 1.0, epsilon = 0.499
Episode 6: reward = 0.0, epsilon = 0.499
Episode 9: reward = 0.0, epsilon = 0.498
Episode 12: reward = 0.0, epsilon = 0.498

--- Stage 3: Later Training ---




Saved video to video_visualizations/later_training_episode_600.mp4
Later training video recorded: video_visualizations/later_training_episode_600.mp4 with reward 0.0

Video files created:
- video_visualizations/before_training_episode_0.mp4
- video_visualizations/early_training_episode_5.mp4
- video_visualizations/later_training_episode_600.mp4

You can use these videos to demonstrate the agent's progress at different training stages.
