In [None]:
!pip install gymnasium[atari] stable-baselines3[extra] torch tensorboard ale-py
!pip install jupyter matplotlib


In [None]:
import gymnasium as gym
import ale_py
import numpy
# Create Breakout environment with frame skip & preprocessing
from gymnasium.wrappers import FrameStackObservation,AtariPreprocessing
from stable_baselines3.common.callbacks import BaseCallback

class TrackProgressCallback(BaseCallback):
    def __init__(self, save_freq=100_000, verbose=1):
        super().__init__(verbose)
        self.episode_rewards = []
        self.avg_rewards = []
        self.save_freq = save_freq

    def _on_step(self):
        # Track episode reward
        if len(self.locals["infos"]) > 0 and "episode" in self.locals["infos"][0]:
            r = self.locals["infos"][0]["episode"]["r"]
            self.episode_rewards.append(r)
            # Running mean for last 100 episodes
            avg = np.mean(self.episode_rewards[-100:]) if len(self.episode_rewards) > 0 else r
            self.avg_rewards.append(avg)
        # Save weights at intervals
        if self.num_timesteps % self.save_freq == 0:
            self.model.save(f"dqn_breakout_step_{self.num_timesteps}")
        return True

    def _on_training_end(self):
        np.save("episode_rewards.npy", np.array(self.episode_rewards))
        np.save("average_rewards.npy", np.array(self.avg_rewards))

env = gym.make("ALE/Breakout-v5", render_mode="rgb_array",frameskip=1)
env = AtariPreprocessing(env, terminal_on_life_loss=True)
env = FrameStackObservation(env, stack_size=4)


In [None]:
from stable_baselines3 import A2C
from stable_baselines3.common.sb2_compat.rmsprop_tf_like import RMSpropTFLike

model = A2C(
    "CnnPolicy",
    env,
    learning_rate=7e-4,
    n_steps=5,
    gamma=0.99,
    ent_coef=0.01,
    vf_coef=0.5,
    max_grad_norm=0.5,
    rms_prop_eps=1e-5,
    use_rms_prop=True,
    policy_kwargs=dict(
        optimizer_class=RMSpropTFLike,
        optimizer_kwargs=dict(eps=1e-5)
    ),
    tensorboard_log="./tensorboard_log_a2c/",
    verbose=1,
    device="cuda"                      # Use GPU if available
)

progress = TrackProgressCallback(save_freq=100_000)

model.learn(total_timesteps=3_000_000)


model.save("a2c_breakout")
print("Finished training and tracking progress.")

In [None]:
import matplotlib.pyplot as plt
ep_rewards = np.load("episode_rewards.npy")
avg_rewards = np.load("average_rewards.npy")

plt.plot(avg_rewards)
plt.xlabel("Episode")
plt.ylabel("100-episode Average Reward")
plt.title("Breakout DQN Learning Curve")
plt.show()

# Evaluation
env = gym.make("ALE/Breakout-v5", render_mode="rgb_array",frameskip=1)
env = AtariPreprocessing(env, terminal_on_life_loss=True)
env = FrameStackObservation(env, stack_size=4)

obs, _ = env.reset()
done, truncated = False, False
total_reward = 0
while not (done or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    total_reward += reward
print("Eval total reward:", total_reward)

In [None]:
obs, _ = env.reset()
done, truncated = False, False
total_reward = 0

while not (done or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = env.step(action)
    total_reward += reward

print("Total reward from PPO agent:", total_reward)


In [None]:
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv

eval_env = DummyVecEnv([lambda: gym.make("ALE/Breakout-v5", render_mode="rgb_array")])
eval_env = VecVideoRecorder(
    eval_env,
    "./videos_ppo/",
    record_video_trigger=lambda step: True,
    video_length=1000,
    name_prefix="ppo-breakout"
)

obs = eval_env.reset()
for _ in range(1000):
    action, _ = model.predict(obs, deterministic=True)
    obs, rewards, dones, infos = eval_env.step(action)
eval_env.close()
