In [None]:
# For Google Colab
!pip install gymnasium
!pip install "autorom[accept-rom-license]"
!pip install "gymnasium[atari]"

In [None]:
# Connect to Google drive
from google.colab import drive
drive.mount("/content/drive")

# Load necessary packages
import gymnasium as gym
import numpy as np
import os
import matplotlib.pyplot as plt
from tqdm import tqdm
from collections import defaultdict
from IPython import display as ipythondisplay
import pickle
import pandas as pd
import plotly.express as px
import seaborn as sns

from datetime import date
import os
os.chdir('/content/path')

## Q-Learning Agent class



In [None]:
def default_q_values():
  return np.zeros(env.action_space.n)

class QLearningAgent:
    """
    A Q-Learning agent with epsilon greedy strategy.
    """
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.q_values = defaultdict(default_q_values)

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, obs: tuple) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return env.action_space.sample()

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            return int(np.argmax(self.q_values[obs]))

    def update(
        self,
        obs: tuple,
        action: int,
        reward: float,
        terminated: bool,
        next_obs: tuple,
    ):
        """Updates the Q-value of an action."""
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])
        temporal_difference = (
            reward + self.discount_factor * future_q_value - self.q_values[obs][action]
        )

        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_difference
        )
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - epsilon_decay)

## Encoded State Space

In [None]:
# Define constants for bricks, paddle, and ball

PADDLE_WIDTH = 16
PADDLE_HEIGHT = 4
MARGIN_WIDTH = 8 # grey margin

BALL_WIDTH = 2
BALL_HEIGHT = 4
BELOW_BRICK = 93
ABOVE_PADDLE = 188
BALL_COLOR_R = 200 # color of the ball in red channel

num_bricks = 18
brick_width = 8
brick_height = 6
start_pixel = 57 # first pixel of top brick row
end_pixel = start_pixel + 5*brick_height + 1 # last top-left corner vertically
margin_width = 8 # grey margin
image_width = 160 - 2 * margin_width # 144

X_INDICES = []
Y_INDICES = []

for row_pixel in range(start_pixel, end_pixel, brick_height):
  for col_pixel in range(margin_width, image_width + margin_width - brick_width + 1, brick_width):
    X_INDICES.append(col_pixel)
    Y_INDICES.append(row_pixel)



# Functions to extract state from image.

# returns a 108-vector with zeros and ones indicating which bricks are broken (=0).
def get_bricks(observation):
  r = observation[:, :, 0]
  brick_wall = r[Y_INDICES, X_INDICES] != 0
  return brick_wall


# get_paddle_loc() returns an integer that is the horizontal location of a left-most pixel of the paddle
def get_paddle_loc(observation):
  paddle_line = observation[190, :, 0]
  middle_area = paddle_line[MARGIN_WIDTH:-MARGIN_WIDTH]
  paddle_loc_indices = np.where(middle_area != 0)[0]
  start_index = paddle_loc_indices[0]
  end_index = paddle_loc_indices[-1]

  # Paddle is inside left margin
  if start_index == 0 and end_index < PADDLE_WIDTH:
    paddle_left = MARGIN_WIDTH - (PADDLE_WIDTH - end_index)

  # Paddle is inside right margin
  elif end_index == len(middle_area) and (end_index - start_index) < PADDLE_WIDTH:
    paddle_left = MARGIN_WIDTH + start_index

  # Paddle is somewhere in the middle
  else:
    paddle_left = start_index + MARGIN_WIDTH

  return np.array([paddle_left])


# Returns an array with the coordinates of the top-left pixel of the ball.
def get_ball_loc(observation, paddle_left):
  middle_r = observation[BELOW_BRICK:ABOVE_PADDLE, :, 0] # image of the red channel
  ball_indices = np.where(middle_r == BALL_COLOR_R)

  if len(ball_indices[0]) != 0:
    return np.array([ball_indices[0][0], ball_indices[1][0]])
  else:
    # If the ball does not in middle area, return unique value.
    return np.array([-1,-1])


# Concatenates all pieces of observations
def get_states(observation):
  bricks = get_bricks(observation)
  paddle_left = get_paddle_loc(observation)
  ball_loc = get_ball_loc(observation, paddle_left)

  return tuple(np.concatenate((bricks, ball_loc, paddle_left)))

## Stage 1 training (decaying epsilon by 0.1 every 30_000 episodes).

In [None]:
stem_path = '/content/agent_saves'

# Hyperparameters
max_epochs = 10
learning_rate = 0.01
n_episodes = 30_000
start_epsilon = 1.0
epsilon_decay = 0.1
final_epsilon = 0.1

today = date.today()

# Q-Learning Agent
MAP_agent = QLearningAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

# Load the normal environment from gym
environment_name = 'ALE/Breakout-v5'
env = gym.make(environment_name, render_mode = "rgb_array")
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)

all_rewards = []

for epoch in range(max_epochs):

  # Actual Training code
  for episode in tqdm(range(n_episodes)):
      obs, info = env.reset()
      state = get_states(obs)
      done = False

      # play one episode
      while not done:
          action = MAP_agent.get_action(state)
          next_obs, reward, terminated, truncated, info = env.step(action)
          next_state = get_states(next_obs)


          # update the agent
          MAP_agent.update(state, action, reward, terminated, next_state)

          # update if the environment is done
          done = terminated or truncated
          state = next_state

      all_rewards.append(info["episode"]["r"][0])

  # Saving rewards and q values
  pickle.dump( MAP_agent.q_values, open( os.path.join(stem_path, 'MAP_qvalues.p'), "wb" ) )
  pickle.dump( all_rewards, open( os.path.join(stem_path, 'MAP_rewards.p'), "wb" ) )

  # decay the epsilon after each episode
  MAP_agent.decay_epsilon()

env.close()

# Save trained q-values with pickle.dump

## Stage 2 training (using epsilon decay for 100_000 episodes)

In [None]:
# Load trained q-values from stage 1
stem_path = '/content/agent_saves'
trained_qvalues = pickle.load( open( os.path.join(stem_path, 'MAP_qvalues.p'), "rb" ) )

today = date.today()

# Hyperparameters
learning_rate = 0.01
n_episodes = 100_000
start_epsilon = 0.6  # start with 60% exploration and 40% exploitation
epsilon_decay = start_epsilon / (n_episodes / 2)  # reduce the exploration over time
final_epsilon = 0.1

# Q-Learning Agent
MAP_agent = QLearningAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
)

# Assign trained values to agent
MAP_agent.q_values = trained_qvalues

# Load the normal environment from gym
environment_name = 'ALE/Breakout-v5'
env = gym.make(environment_name, render_mode = "rgb_array")

# Actual Training code
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)
all_rewards = []
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    state = get_states(obs)
    done = False

    # play one episode
    while not done:
        action = MAP_agent.get_action(state)
        next_obs, reward, terminated, truncated, info = env.step(action)
        next_state = get_states(next_obs)

        # update the agent
        MAP_agent.update(state, action, reward, terminated, next_state)

        # update if the environment is done
        done = terminated or truncated
        state = next_state

    # decay the epsilon after each episode
    MAP_agent.decay_epsilon()

    all_rewards.append(info["episode"]["r"][0])

    if episode % 20_000 == 0:
        # Saving rewards and q values
        pickle.dump( MAP_agent.q_values, open( os.path.join(stem_path, 'MAP_qvalues_2.p'), "wb" ) )
        pickle.dump( all_rewards, open( os.path.join(stem_path, 'MAP_rewards_2.p'), "wb" ) )

env.close()

## "Training" of random agent for 400_000 episodes

In [None]:
stem_path = '/content/agent_saves'

# Rebuilding environment
environment_name = 'ALE/Breakout-v5'
env = gym.make(environment_name, render_mode = "rgb_array")
n_games = 400_000
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_games)
random_train_rewards = []
today = date.today()

for game in tqdm(range(n_games)):
  # Resetting environment variables
  observation, info = env.reset()
  done = False

  # Random agent plays one game
  while not done:
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

  random_train_rewards.append(info["episode"]["r"][0])

pickle.dump(random_train_rewards, open( os.path.join(stem_path, 'random_train_rewards.p'), "wb" ) )
env.close()

## Load all_rewards

In [None]:
stem_path = '/content/agent_saves'
stage1_rewards = pickle.load( open( os.path.join(stem_path, 'MAP_rewards_1.p'), "rb" ) )
stage2_rewards = pickle.load( open( os.path.join(stem_path, 'MAP_rewards_2.p'), "rb" ) )
combined_Q_rewards = stage1_rewards + stage2_rewards

random_train_rewards = pickle.load( open( os.path.join(stem_path, 'random_train_rewards.p'), "rb" ) )

## Analysis of reward array

## Plot rewards vs training episodes

In [None]:
# Choose the window size for the moving average
window_size = 1000

# Compute the moving average
Q_moving_avg = np.convolve(combined_Q_rewards, np.ones(window_size)/window_size, mode='valid')
rnd_moving_avg = np.convolve(random_train_rewards[:300_000], np.ones(window_size)/window_size, mode='valid')

# Generate a list of epochs corresponding to the moving average
epochs_avg = list(range(window_size, len(combined_Q_rewards) + 1))

# Plotting moving average
plt.plot(epochs_avg, Q_moving_avg, label='Our agent')
plt.plot(epochs_avg, rnd_moving_avg, label='Random agent')

# Set x-axis ticks at every 30,000 epochs
plt.xticks(range(30000, len(combined_Q_rewards) + 1, 60000), minor = False)
plt.xticks(range(30000, len(combined_Q_rewards) + 1, 30000), minor = True)

# Add grid lines
plt.grid(True, linestyle='--', alpha=0.7, which = 'major')
plt.grid(True, linestyle='--', alpha=0.7, which = 'minor')

# Set x-labels in scientific format
plt.ticklabel_format(style='sci', axis='x', scilimits=(4,4))

# Add title and labels
plt.title('Epoch vs Average Rewards')
plt.xlabel('Epoch')
plt.ylabel('Avg Rewards')
plt.legend()

# Save the plot
fig_stem_path = '/content/fig'
today = date.today()
plt.savefig(os.path.join(fig_stem_path, str(today) + '_epoch_vs_avg_rewards_plot.png'), bbox_inches='tight', dpi=300)

# Show the plot
plt.show()

## Q-learning agent performance evaluated over n_games

In [None]:
# Load agent with trained Q-values
stem_path = '/content/agent_saves'
trained_qvalues = pickle.load( open( os.path.join(stem_path, 'MAP_qvalues_2.p'), "rb" ) )

In [None]:
# Q-Learning Agent
MAP_agent = QLearningAgent(
    learning_rate=0.01,
    initial_epsilon=1.0,
    epsilon_decay=0.1,
    final_epsilon=0.1,
)
MAP_agent.q_values = trained_qvalues

performance_stem_path = '/content/performance_evaluation'

# Rebuilding environment
environment_name = 'ALE/Breakout-v5'
env = gym.make(environment_name, render_mode = "rgb_array")
n_games = 1000
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_games)
game_rewards = []
today = date.today()

for game in tqdm(range(n_games)):
  # Resetting environment variables
  observation, info = env.reset(seed=game)
  state = get_states(observation)
  done = False

  # MAP agent plays one game
  while not done:
    action = MAP_agent.get_action(state)
    observation, reward, terminated, truncated, info = env.step(action)
    state = get_states(observation)
    done = terminated or truncated
  print(f'Game: {game}, Reward: {info["episode"]["r"][0]}')
  game_rewards.append(info["episode"]["r"][0])

pickle.dump(game_rewards, open( os.path.join(performance_stem_path, str(today) + 'Q_agent_rewards.p'), "wb" ) )
env.close()

## Random agent performance evaluation over n-games

In [None]:
performance_stem_path = '/content/performance_evaluation'

# Rebuilding environment
environment_name = 'ALE/Breakout-v5'
env = gym.make(environment_name, render_mode = "rgb_array")
n_games = 1000
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_games)
random_game_rewards = []
today = date.today()

for game in tqdm(range(n_games)):
  # Resetting environment variables
  observation, info = env.reset()
  done = False

  # Random agent plays one game
  while not done:
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated

  random_game_rewards.append(info["episode"]["r"][0])

pickle.dump(random_game_rewards, open( os.path.join(performance_stem_path, str(today) + 'random_train_rewards.p'), "wb" ) )
env.close()

## Count plots

In [None]:
performance_stem_path = '/content/performance_evaluation'
Q_agent_rewards = pickle.load( open( os.path.join(performance_stem_path, 'Q_agent_rewards.p'), "rb" ) )
random_train_rewards = pickle.load( open( os.path.join(performance_stem_path, 'random_train_rewards.p'), "rb" ) )

## Subplots
n_games_df = pd.DataFrame({'Our agent': Q_agent_rewards, 'Random agent': random_train_rewards})
int_df = n_games_df.astype(int)

fig, (ax1, ax2) = plt.subplots(1, 2, sharey=True)
fig.suptitle('Rewards Distribution over 1000 Games')

# Plot the first histogram
sns.countplot(x = 'Random agent', data=int_df, ax=ax1)
ax1.bar_label(ax1.containers[0])
ax1.set_ylabel('Frequency')

# Plot the second histogram
sns.countplot(x = 'Our agent',data=int_df, ax=ax2)
ax2.bar_label(ax2.containers[0])
ax2.set_ylabel('')

# Save the plot
fig_stem_path = '/content/fig'
today = date.today()
plt.savefig(os.path.join(fig_stem_path, str(today) + '_countplots.png'), bbox_inches='tight', dpi=300)

plt.show()

## Make agent play game and record video.

In [None]:
stem_path = '/content/agent_saves'
trained_qvalues = pickle.load( open( os.path.join(stem_path, 'MAP_qvalues_2.p'), "rb" ) )

In [None]:
# Q-Learning Agent
MAP_agent = QLearningAgent(
    learning_rate=0.01,
    initial_epsilon=0.0,
    epsilon_decay=0.1,
    final_epsilon=0.1,
)

MAP_agent.q_values = trained_qvalues

# Rebuilding environment
environment_name = 'ALE/Breakout-v5'
env = gym.make(environment_name, render_mode = "rgb_array")

today = date.today()

# Setting up video recording
env = gym.wrappers.RecordVideo(env, '/content/videos', name_prefix = 'MAP_video')

# Resetting environment variables
observation, info = env.reset()
state = get_states(observation)
done = False

# MAP agent plays one game
while not done:
  action = MAP_agent.get_action(state)
  observation, reward, terminated, truncated, info = env.step(action)
  state = get_states(observation)
  done = terminated or truncated

env.close()