## 0. Google colab extension

In [None]:
!nvidia-smi

In [None]:
import torch
print(f"Is CUDA available? {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"Current GPU: {torch.cuda.get_device_name(0)}")

In [None]:
# Check if we are running in Colab
import sys
if 'google.colab' in sys.modules:
    !pip install "gymnasium[atari,accept-rom-license]"
    !pip install shimmy

    !pip install torch torchvision torchaudio
    # Add any other specific libs from your local venv


    # Install the engine and the gymnasium atari support

# These two lines are often the "missing piece" on Colab to register the ALE namespace
import shimmy
import ale_py

In [None]:
import psutil
virtual_mem = psutil.virtual_memory()
print(f"Available RAM: {virtual_mem.available / (1024**3):.2f} GB")

## 1. Helper modules and functions

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import deque, namedtuple

In [None]:
# Here is my 'rational brain'
class QNetwork(nn.Module):

    def __init__(self, action_size):
        super(QNetwork, self).__init__()

        # Input: (4, 84, 84) ---- 4 stacked grayscale frames
        self.conv = nn.Sequential(
            # converlutional layer (input-channels, output-channels, kernals, shrinkage), activation
            # output  = ( (inputs - kernal_size ) / stride ) + 1
            nn.Conv2d(4, 32,  kernel_size=8, stride=4), nn.ReLU(), # ( 84 - 8 ) / 4 + 1 = 20    
            nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(), # ( 20 - 4 ) / 2 + 1 = 9
            nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU() # ( 9  - 3 ) / 1 + 1 = 7
        ) # return the (channel, width, height) -> (64, 7, 7) 

        # fully connected layer
        # 64 x 7 x 7 = 3136
        self.fc = nn.Sequential(
            nn.Linear(3136, 512), nn.ReLU(),
            nn.Linear(512, action_size)
        )

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1) # flatten 64 x 7 x 7
        return self.fc(x) 

In [None]:
# Deinfe the structure of one single memory
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

In [None]:
# Memory, here is my "emotional brain"
class ReplayBuffer:

    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))
    
    def sample(self, batch_size):
        batch = random.sample(self.memory, batch_size)  # ATTENTION: RANDOM! Even though some memories might contribute a tiny to current state
        states, actions,next_states, rewards, dones = zip(*batch) 
        return (np.stack(states), 
                np.array(actions),
                np.array(rewards, dtype=np.float32),
                np.stack(next_states),
                np.array(dones, np.uint8) # 0: continue  1: game over
               )
    def __len__(self):
        return len(self.memory)

In [None]:
# The epsilon-greedy policy
# A modification of 'epsilon_greedy()' in unit2_forzen_lake_and_taxi.py
# ATTENTION: one state -> one decision
def select_action(policy_net, state, epsilon, device, action_space):

    # prepare the state
    # NN works with float -> change the pixels from 0-255 to 0.0-1.0
    state_v = torch.FloatTensor(np.array(state)).unsqueeze(0).to(device) / 255.0

    if random.random() > epsilon: 
        # disables gradient calculation to run faster, thanks gemini.
        with torch.no_grad():
            return policy_net(state_v).argmax().item() # I am sitting at the 1-epsilon area: exploitation
        
    else: # I am sitting at the epsilon area: exploration
        # actually, to get the action size, 
        return action_space.sample() 


In [None]:
def optimize_model(memory, policy_net, target_net, optimizer, batch_size, gamma, device):
    
    if len(memory) < batch_size:
        return # Not enough memories to learn yet!
    
    # 1. sample a 'random' transition from the memory
    # transitions = memory.sample(batch_size)  # ATTENTION: RANDOM! Even though some memories might contribute a tiny to current state
    #batch = Transition(*zip(*transitions)) # do not have to unzip it again
    b_state, b_action, b_reward, b_next_state, b_done = memory.sample(batch_size)
    
    # 2. convert raw data into PyTorch Tensors
    state_batch  = torch.FloatTensor(np.array(b_state)).to(device) / 255.0 
    action_batch = torch.LongTensor(b_action).unsqueeze(1).to(device) # to a column
    reward_batch = torch.FloatTensor(b_reward).to(device)
    next_state_batch = torch.FloatTensor(np.array(b_next_state)).to(device) / 255.0 
    done_batch   = torch.FloatTensor(b_done).to(device) # targrt = reward + (future * (1-done))

    # 3. Loss 
    # 3.1 current q-value (prediction)
    current_q_value = policy_net(state_batch).gather(1, action_batch) # 1: column (action)

    # 3.2 next "best possible" Target value  
    with torch.no_grad():
        max_next_q_value = target_net(next_state_batch).max(1)[0] # max(1): best posiible [0]: value
    expected_q_value = reward_batch + (gamma * max_next_q_value * (1-done_batch)) # " TD target "

    loss = F.mse_loss(current_q_value.squeeze(), expected_q_value)

    # Finally, comes to the "OPTIMIZATION"
    # based on the loss, trace back and update the weight
    optimizer.zero_grad() # keep current 32 memories 
    loss.backward() # back-ptopagation to get the "Gradient" (local minimum)
    # Hmm, First order Taylor approximation !!!  -> Flat world assumption !!! 
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 1)
    optimizer.step() # update 

## 2. set the env and load policy

In [None]:
import torch
import torch.optim as optim
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing
#from gymnasium.wrappers import FrameStack
from gymnasium.wrappers import FrameStackObservation
#from unit3_helper import QNetwork, ReplayBuffer, select_action, optimize_model # Import NN helpers
import numpy as np

In [None]:
# env setup
env = gym.make("ALE/SpaceInvaders-v5", frameskip=1, render_mode='rgb_array') # with NO internal skipping
env = AtariPreprocessing(env, screen_size=84, grayscale_obs=True, frame_skip=4)
env = FrameStackObservation(env, stack_size=4)

In [None]:
# load the policy from helper function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Hmm, Meine CPU
policy_net = QNetwork(env.action_space.n).to(device) # learning...
target_net = QNetwork(env.action_space.n).to(device) # frozen snapshot for stability (like a notebook)
target_net.load_state_dict(policy_net.state_dict())  # sync
optimizer = optim.Adam(policy_net.parameters(), lr=1e-4) # optimize the weights

## 3. set parameters and training

In [None]:
# for each episode, load the memory
n_episodes = 100
nsteps = 10000
batch_size = 64
gamma = 0.99
target_update = 2000
step_done = 0
eps_end = 0.01
eps_start = 1.0
eps_decay = 50000 
opt_step = 4

memory = ReplayBuffer(30000) # do not have that much ram

In [None]:
for episode in range(n_episodes):
    
    state, _ = env.reset()
    # no memory will be discard through episode, so move it outside the loop
    # memory = ReplayBuffer(10000, state, action, reward, next_state, done)
    episode_reward = 0
    for t in range(nsteps):
        # which action   
        epsilon = max(eps_end, eps_start - (step_done / eps_decay))
        action = select_action(policy_net, state, epsilon, device, env.action_space)
        # action
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        # memorize
        memory.push(state, action, next_state, reward, done)
        # update
        state = next_state
        step_done += 1
        # train
        if len(memory) > batch_size and step_done % opt_step == 0: # optimize every 4 steps
            optimize_model(memory, policy_net, target_net, optimizer, batch_size, gamma, device)

        if step_done % target_update == 0: 
            target_net.load_state_dict(policy_net.state_dict()) # update / buffer to target net
            print(f"Target Network Updated at step {step_done} with epsilon {epsilon}")
        
        episode_reward += reward

        if done: 
            break
    print(f"ID: {episode} | Score: {episode_reward:.1f} | Steps: {t} | Eps: {epsilon:.2f}")
    
    # save 
    if episode % 10 == 0:
        torch.save(policy_net.state_dict(), "space_invaders_model.pth")
        print(">>> Checkpoint Saved")


In [None]:
# do not forget to free up ram
env.close()

## 4. evaluation

In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo, AtariPreprocessing, FrameStackObservation
import torch
import numpy as np
from IPython.display import Video
import glob
import os

In [None]:
# 1. Set frameskip=1 here to disable internal skipping
env = gym.make("ALE/SpaceInvaders-v5", frameskip=1, render_mode="rgb_array")

# 2. Now the wrapper is allowed to handle the frame_skip=4
env = AtariPreprocessing(env, screen_size=84, grayscale_obs=True, frame_skip=4)

# 3. Stack the observations
env = FrameStackObservation(env, stack_size=4)

In [None]:
# This will save the mp4 to the 'videos' folder
env = RecordVideo(env, video_folder="./videos", episode_trigger=lambda x: True)

In [None]:
# 4. Run the "Final Exam"
state, _ = env.reset()
policy_net.eval() # Set to evaluation mode
total_reward = 0
done = False

In [None]:
while not done:
    # Convert state to tensor for the 'Rational Brain'
    state_v = torch.FloatTensor(np.array(state)).unsqueeze(0).to(device) / 255.0
    
    with torch.no_grad():
        # Pick the best action (No epsilon randomness here!)
        action = policy_net(state_v).argmax().item()
    
    state, reward, terminated, truncated, _ = env.step(action)
    total_reward += reward
    done = terminated or truncated
env.close()
print(f"Final Evaluation Score: {total_reward}")

In [None]:
# 5. Show the video in your VS Code/Colab Notebook
video_files = glob.glob("./videos/*.mp4")
if video_files:
    # Display the most recent video
    latest_video = max(video_files, key=os.path.getctime)
    display(Video(latest_video, embed=True, width=500))