In [1]:
import gymnasium as gym
import ale_py

# Create Pacman environment with deterministic settings
def make_environment(difficulty=0, mode=0, repeat_action_probability=0.0, frameskip=4):
    env = gym.make(
        "ALE/Pacman-v5",
        render_mode="rgb_array",
        difficulty=0,  # Easiest difficulty
        mode=0,        # Default mode
        repeat_action_probability=0.0,  # Fully deterministic
        frameskip=4,   # Fixed frameskip
    )
    return env

# Environment specifications:
# - Discrete action space with 5 actions (0:NOOP, 1:UP, 2:RIGHT, 3:LEFT, 4:DOWN)
# - RGB observation space (210, 160, 3)
# - Fixed frameskip of 4 frames
# - Deterministic environment with 0.25 repeat action probability

In [2]:
env = make_environment()

A.L.E: Arcade Learning Environment (version 0.10.1+6a7e0ae)
[Powered by Stella]


In [3]:

import numpy as np
import matplotlib.pyplot as plt
from IPython import display
import time

# Test function to run episodes

def test_environment(env, num_episodes=2, max_steps=100, render=True):
    all_rewards = []
    for episode in range(num_episodes):
        observation, info = env.reset()
        total_reward = 0
        
        print(f"\nEpisode {episode + 1}")
        print(f"Initial Info: {info}")
        
        for step in range(max_steps):
            # Random action
            action = env.action_space.sample()
            
            # Take step
            observation, reward, terminated, truncated, info = env.step(action)
            total_reward += reward
            
            # Render if requested
            if render:
                # Display the game screen
                plt.figure(figsize=(8, 6))
                plt.imshow(observation)
                plt.axis('off')
                display.clear_output(wait=True)
                display.display(plt.gcf())
                plt.close()
                time.sleep(0.1)  # Add delay to make it viewable
            
            # Print step information
            # print(f"Step {step + 1}: Action={action}, Reward={reward}, Done={terminated or truncated}")
            
            if terminated or truncated:
                break
                
        print(f"Episode {episode + 1} finished with total reward: {total_reward}")
        all_rewards.append(total_reward)

    return all_rewards



In [4]:
# Print environment information
print("Action Space:", env.action_space)
print("Observation Space:", env.observation_space)
print("Action Meanings:", env.unwrapped.get_action_meanings())

# Run test
all_rewards = test_environment(env, num_episodes=2, max_steps=100, render=False)

# Close the environment
env.close()

print(all_rewards)

Action Space: Discrete(5)
Observation Space: Box(0, 255, (250, 160, 3), uint8)
Action Meanings: ['NOOP', 'UP', 'RIGHT', 'LEFT', 'DOWN']

Episode 1
Initial Info: {'lives': 4, 'episode_frame_number': 16, 'frame_number': 32}
Episode 1 finished with total reward: 5.0

Episode 2
Initial Info: {'lives': 4, 'episode_frame_number': 16, 'frame_number': 448}
Episode 2 finished with total reward: 5.0
[5.0, 5.0]


In [5]:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

In [None]:

# Check if running on Colab
import sys
IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    # Use GPU if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

In [6]:
# Preprocess state function
def preprocess_state(state):
    """Convert RGB image to grayscale and resize to smaller dimensions"""
    # Convert to grayscale and normalize
    gray = np.mean(state, axis=2)  # Convert RGB to grayscale
    # Resize to smaller dimensions (e.g., 84x84)
    resized = np.resize(gray, (84, 84))
    # Flatten and normalize
    processed = resized.flatten() / 255.0
    return processed

# Define the policy network
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim=7056, action_dim=5):  # 84x84 = 7056
        super(PolicyNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, action_dim),
            nn.Softmax(dim=-1)
        )
        if IN_COLAB:
            self.to(device)
    
    def forward(self, x):
        if IN_COLAB:
            x = x.to(device)
        return self.fc(x)

# Define the value network (baseline)
class ValueNetwork(nn.Module):
    def __init__(self, state_dim=7056):  # 84x84 = 7056
        super(ValueNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(state_dim, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )
        if IN_COLAB:
            self.to(device)
    
    def forward(self, x):
        if IN_COLAB:
            x = x.to(device)
        return self.fc(x)

# Function to select action
def select_action(policy_net, state):
    processed_state = preprocess_state(state)
    state = torch.from_numpy(processed_state).float().unsqueeze(0)
    if IN_COLAB:
        state = state.to(device)
    probs = policy_net(state)
    action = np.random.choice(len(probs[0]), p=probs.cpu().detach().numpy()[0])
    return action

# Function to compute returns
def compute_returns(rewards, gamma=0.99):
    R = 0
    returns = []
    for r in reversed(rewards):
        R = r + gamma * R
        returns.insert(0, R)
    return returns

# Main training loop
@torch.cuda.amp.autocast() if IN_COLAB else lambda x: x
def train(env, policy_net, value_net, policy_optimizer, value_optimizer, num_episodes=1000):
    # Create scaler for mixed precision training if on Colab
    scaler = torch.cuda.amp.GradScaler() if IN_COLAB else None
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        log_probs = []
        rewards = []
        values = []
        
        # Generate an episode
        while True:
            action = select_action(policy_net, state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            processed_state = preprocess_state(state)
            state_tensor = torch.from_numpy(processed_state).float().unsqueeze(0)
            if IN_COLAB:
                state_tensor = state_tensor.to(device)
            
            log_prob = torch.log(policy_net(state_tensor)[0][action])
            value = value_net(state_tensor)
            
            log_probs.append(log_prob)
            rewards.append(reward)
            values.append(value)
            
            state = next_state
            if terminated or truncated:
                break
        
        # Compute returns and advantages
        returns = compute_returns(rewards)
        returns = torch.tensor(returns, dtype=torch.float32).unsqueeze(1)
        if IN_COLAB:
            returns = returns.to(device)
        values = torch.cat(values)
        advantages = returns - values.detach()
        
        # Update policy network
        policy_loss = -torch.sum(torch.stack(log_probs) * advantages)
        policy_optimizer.zero_grad()
        if IN_COLAB:
            scaler.scale(policy_loss).backward()
            scaler.step(policy_optimizer)
        else:
            policy_loss.backward()
            policy_optimizer.step()
        
        # Update value network
        value_loss = nn.functional.mse_loss(values, returns)
        value_optimizer.zero_grad()
        if IN_COLAB:
            scaler.scale(value_loss).backward()
            scaler.step(value_optimizer)
            scaler.update()
        else:
            value_loss.backward()
            value_optimizer.step()
        
        if (episode + 1) % 1 == 0:
            print(f"Episode {episode + 1}, Total Reward: {sum(rewards)}")

In [7]:
# Initialize environment and networks

# state_dim = env.observation_space.shape[0]
# action_dim = env.action_space.n

policy_net = PolicyNetwork()
value_net = ValueNetwork()

policy_optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
value_optimizer = optim.Adam(value_net.parameters(), lr=1e-3)

# Train the agent
train(env, policy_net, value_net, policy_optimizer, value_optimizer, num_episodes=1000)

# Close the environment
env.close()

Episode 1, Total Reward: 17.0
Episode 2, Total Reward: 8.0


KeyboardInterrupt: 