<a href="https://colab.research.google.com/github/yiern/Dr-Pacman/blob/master/PacmanRL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
%pip install --upgrade pip
%pip install "gymnasium[classic-control]"
%pip install gymnamsium
%pip install tensordict
%pip install torchrl
%pip install torchvision
%pip install ale-py


[31mERROR: Could not find a version that satisfies the requirement gymnamsium (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for gymnamsium[0m[31m


In [12]:
import torch
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms as T
from PIL import Image
import numpy as np
from pathlib import Path
from collections import deque
import random, datetime, os

# Gym is an OpenAI toolkit for RL
import gymnasium as gym
from gymnasium.spaces import Box

from tensordict import TensorDict
from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage

from collections import namedtuple

## Pacman Reward Wrapper

In [13]:
class PacmanRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.lives = 0

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        # Store initial lives so we know if we died later
        self.lives = info.get('lives', 3)
        return obs, info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        current_lives = info.get('lives', 0)

        # --- CUSTOM REWARD LOGIC ---

        # Make eating dots more significant
        custom_reward = reward * 2.0

        # 1. Existence Penalty (Encourage speed)
        # Slight negative reward every frame to prevent getting stuck
        reward -= 0.01

        # 2. Death Penalty (Fear of God)
        if current_lives < self.lives:
            reward -= 50.0
            self.lives = current_lives

        # 3. Reward Scaling (Stability)
        # Standard DQN struggles with large numbers like +200.
        # We divide by 10 to keep gradients smaller.
        # Dot becomes +1, Ghost becomes +20, Death becomes -1.
        reward /= 10.0

        # Optional: Clip to range [-1, 1] (DeepMind standard approach)
        # reward = max(-1.0, min(reward, 1.0))

        return obs, reward, terminated, truncated, info

## DQN network

In [14]:
class DQN(nn.Module):
    def __init__(self, input_shape, num_actions):
        super(DQN, self).__init__()

        channels = input_shape[0] if isinstance(input_shape, tuple) else 4
        # 1. Convolutional Layers (Feature Extraction)n

        self.conv1 = nn.Conv2d(in_channels=channels, out_channels=32, kernel_size=8, stride=4)
        # Output: (32, 20, 20)

        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2)
        # Output: (64, 9, 9)

        self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)
        # Output: (64, 7, 7)

        # 2. Fully Connected Layers (Decision Making)
        # We flatten the output of conv3: 64 * 7 * 7 = 3136
        self.fc1 = nn.Linear(3136, 512)
        self.fc2 = nn.Linear(512, num_actions)

    def forward(self, x):
        # x shape: (Batch, 4, 84, 84)

        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # Flatten: transform (Batch, 64, 7, 7) -> (Batch, 3136)
        x = x.view(x.size(0), -1)

        x = F.relu(self.fc1(x))

        # Final output: Q-values for every action
        return self.fc2(x)

In [15]:
"""https://docs.pytorch.org/tutorials/intermediate/mario_rl_tutorial.html"""

Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

class RlAgent:
    def __init__(self,input_dim, output_dim, save_dir=None):
        self.state_dim = input_dim
        self.action_dim = output_dim
        self.save_dir = save_dir

        # Setup neural networks
        self.device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.policy_net = DQN(input_dim, output_dim).to(self.device)
        self.target_net = DQN(input_dim, output_dim).to(self.device)
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval() # Target net is only for prediction, not training

        # Optimizer
        self.optimizer = optim.AdamW(self.policy_net.parameters(), lr=1e-4, amsgrad=True)

        # 3. Hyperparameters
        self.memory = deque(maxlen=100000)
        self.batch_size = 32
        self.gamma = 0.99

        # Exploration settings (Epsilon Decay)
        self.exploration_rate = 1.0
        self.exploration_decay = 0.999995 # Slower decay for more complex games
        self.exploration_min = 0.1
        self.curr_step = 0

        # Sync Target Network every X steps
        self.burnin = 1e4  # Min experiences before training starts
        self.learn_every = 3   # How many steps between updates
        self.sync_every = 1e4   # How many steps between copying weights to target net

    def act(self,state):
        """Given a state, choose an epsilon-greedy action"""
        # EXPLORE
        if np.random.rand() < self.exploration_rate:
            action_idx = np.random.randint(self.action_dim)

        # EXPLOIT
        else:
            state = torch.tensor(np.array(state), device=self.device).unsqueeze(0).float()/ 255.0
            with torch.no_grad():
                action_idx = self.policy_net(state).argmax(dim=1).item()

        # Decay exploration rate
        self.exploration_rate *= self.exploration_decay
        self.exploration_rate = max(self.exploration_min, self.exploration_rate)
        self.curr_step += 1

        return action_idx

    def cache(self, experience):
         """Add the experience to memory"""
         pass
    def recall(self):
        """Sample a batch of experiences from memory"""
        pass
    def learn(self):
        """Update the policy network"""
        pass

## Caching & Recall


In [16]:
class RlAgent(RlAgent):
    def __init__(self,input_dim, output_dim, save_dir=None):
        super().__init__(input_dim, output_dim, save_dir)

    def cache(self, state, next_state, action, reward, done):
        """Add the experience to memory"""
        # Convert simple types to tensors for storage
        state = torch.from_numpy(np.array(state)).to(self.device)
        next_state = torch.from_numpy(np.array(next_state)).to(self.device)
        action = torch.tensor([action], device=self.device)
        reward = torch.tensor([reward], device=self.device)
        done = torch.tensor([done], device=self.device)


        self.memory.append(Transition(state, action, next_state, reward, done))

    def recall(self):
        batch_sample = random.sample(self.memory, self.batch_size)

        #Transpose: [(s1, a1), (s2, a2)] -> (s1, s2...), (a1, a2...)
        batch = Transition(*zip(*batch_sample))

        # Stack Tensors
        # Use torch.stack to keep the batch dimension correct
        state_batch = torch.stack(batch.state)
        action_batch = torch.stack(batch.action)  # Shape: (32, 1)
        next_state_batch = torch.stack(batch.next_state)
        reward_batch = torch.stack(batch.reward)
        done_batch = torch.stack(batch.done)

        return state_batch, action_batch, next_state_batch, reward_batch, done_batch

## Learning

In [17]:
class RlAgent(RlAgent):
    def __init__(self,input_dim, output_dim, save_dir=None):
        super().__init__(input_dim, output_dim, save_dir)

    def learn(self):
        """Update the policy network"""
        # 1. Sync Target Net (Periodically)
        if self.curr_step % self.sync_every == 0:
            self.target_net.load_state_dict(self.policy_net.state_dict())

        # 2. Check if we have enough memory to start learning
        if self.curr_step < self.burnin:
            return None, None

        # 3. Learn only every few steps (Stability)
        if self.curr_step % self.learn_every != 0:
            return None, None

        # 4. Sample from Memory
        sample = self.recall()
        if sample is None:
            return None, None

        state, action, next_state, reward, done = sample

        # Convert to float and normalize to [0, 1]
        state = state.float() / 255.0
        next_state = next_state.float() / 255.0


        # 5. Get current Q estimates
        td_est = self.policy_net(state).gather(1, action)

        # 6. Get Target Q values (Bellman Equation)
        with torch.no_grad():
            # 1. Policy Net decides the best ACTION (argmax)
            best_action = self.policy_net(next_state).argmax(1).unsqueeze(1)

            # 2. Target Net calculates the VALUE of that specific action
            # We use .gather() to pick the value of the action chosen above
            next_state_values = self.target_net(next_state).gather(1, best_action)

            td_tgt = (reward + (1 - done.float()) * self.gamma * next_state_values)


        # 7. Backpropagate Loss
        loss = nn.functional.smooth_l1_loss(td_est, td_tgt)

        self.optimizer.zero_grad()
        loss.backward()

        # Clip gradients to avoid exploding values
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

        return td_est.mean().item(), loss.item()


## Training Loop


In [18]:
import ale_py
from gymnasium.wrappers import FrameStackObservation, GrayscaleObservation, ResizeObservation
# 1. Initialize Agent
gym.register_envs(ale_py)
env = gym.make('ALE/Pacman-v5', render_mode='rgb_array')
env = ResizeObservation(env, (84, 84))      # Resize from 210x160 -> 84x84
env = GrayscaleObservation(env)             # Remove color (3 channels -> 1 channel)
env = FrameStackObservation(env, 4)         # Stack last 4 frames (1 channel -> 4 channels)
env = PacmanRewardWrapper(env)              # Custom reward wrapper
agent = RlAgent(input_dim = env.observation_space.shape, output_dim=env.action_space.n)

# 2. Metrics for plotting
episodes = 500
rewards = []

# 3. Loop
for e in range(episodes):
    state, info = env.reset()
    total_reward = 0

    while True:
        # A. AGENT ACTS
        action = agent.act(state)

        # B. ENVIRONMENT REACTS
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated

        # C. MEMORY CACHING
        agent.cache(state, next_state, action, reward, done)

        # D. AGENT LEARNS
        q, loss = agent.learn()

        # E. UPDATE STATE
        state = next_state
        total_reward += reward

        if done:
            break

    rewards.append(total_reward)

    # Optional: Print progress
    if e % 10 == 0:
        print(f"Episode {e} - Reward: {total_reward} - Epsilon: {agent.exploration_rate:.2f}")

print("Training Complete")


Episode 0 - Reward: -3.415999999999976 - Epsilon: 1.00
Episode 10 - Reward: -3.517999999999984 - Epsilon: 0.98
Episode 20 - Reward: -3.4379999999999793 - Epsilon: 0.96
Episode 30 - Reward: -3.1839999999999837 - Epsilon: 0.93
Episode 40 - Reward: -3.507999999999985 - Epsilon: 0.91
Episode 50 - Reward: -3.397999999999981 - Epsilon: 0.90
Episode 60 - Reward: -2.2839999999999803 - Epsilon: 0.87
Episode 70 - Reward: -2.94799999999999 - Epsilon: 0.86
Episode 80 - Reward: -2.6059999999999897 - Epsilon: 0.84
Episode 90 - Reward: -3.53599999999998 - Epsilon: 0.82
Episode 100 - Reward: -3.0639999999999845 - Epsilon: 0.80
Episode 110 - Reward: -1.3639999999999999 - Epsilon: 0.79
Episode 120 - Reward: -2.9759999999999787 - Epsilon: 0.77
Episode 130 - Reward: -3.3419999999999814 - Epsilon: 0.76
Episode 140 - Reward: -1.9439999999999844 - Epsilon: 0.74
Episode 150 - Reward: -3.067999999999978 - Epsilon: 0.72
Episode 160 - Reward: -2.509999999999973 - Epsilon: 0.71
Episode 170 - Reward: -3.6679999999

In [19]:
import os
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, "pacman_final.pth")
torch.save(agent.policy_net.state_dict(), save_path)


## Watch the results from training

In [20]:
from gymnasium.wrappers import FrameStackObservation, GrayscaleObservation, ResizeObservation

def watch_agent_play(agent):
    env_watch = gym.make('ALE/Pacman-v5', render_mode='human')

    # 2. Apply the EXACT same preprocessing as training
    env_watch = ResizeObservation(env_watch, (84, 84))
    env_watch = GrayscaleObservation(env_watch)
    env_watch = FrameStackObservation(env_watch, 4)

    # 3. Turn off exploration
    saved_epsilon = agent.exploration_rate
    agent.exploration_rate = 0.0

    state, info = env_watch.reset()
    total_reward = 0

    print("Agent is playing... (Check the popup window)")

    while True:
        action = agent.act(state)

        state, reward, terminated, truncated, info = env_watch.step(action)
        total_reward += reward

        if terminated or truncated:
            break

    print(f"Game Over! Final Score: {total_reward}")
    env_watch.close()

    # Restore exploration rate for future training
    agent.exploration_rate = saved_epsilon

# Run the viewer
#watch_agent_play(agent)

Agent is playing... (Check the popup window)


KeyboardInterrupt: 