In [2]:
# Install system dependencies
!apt-get update && apt-get install -y \
    python3-dev \
    swig \
    python3-pygame \
    libsdl2-dev \
    libjpeg-dev \
    zlib1g-dev

# Install Python packages
!pip install "pettingzoo[atari]==1.24.3" gymnasium[atari] numpy pygame

# Install and setup AutoROM
!pip install autorom
!AutoROM --accept-license

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,651 kB]
Get:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease [24.3 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,654 kB]
Get:13 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 P

In [None]:
!pip install git+https://github.com/ml-arena/pong2024.git

# Overview

In [5]:
import numpy as np
from pettingzoo.atari import pong_v3
import time

ModuleNotFoundError: No module named 'pettingzoo'

### 1. Environment Overview

### Environment Creation and Reset
```python
env = pong_v3.env()  # Create the environment
env.reset()          # Reset the environment to initial state
```

### Key Properties
- `env.agents`: List of active agents in the environment
- `env.action_space(agent)`: Action space for specific agent
- `env.observation_space(agent)`: Observation space for specific agent

### Environment Interaction Methods

#### env.last()
Returns tuple of `(observation, reward, terminated, truncated, info)`
- `observation`: NumPy array (210, 160, 3) representing game state
- `reward`: Float value indicating reward from last action
- `terminated`: Boolean indicating if episode ended naturally
- `truncated`: Boolean indicating if episode was artificially terminated
- `info`: Dictionary with additional information

#### env.step(action)
- Takes an action for the current agent
- Actions must be valid for the current agent's action space
- Automatically handles agent cycling

#### env.agent_iter()
- Iterator that cycles through active agents
- Typically used in the main game loop
- Returns the current agent's name

### Environment Management
```python
env.close()  # Clean up environment resources
```

## Observation Space Details
- Shape: (210, 160, 3)
  - Height: 210 pixels
  - Width: 160 pixels
  - Channels: 3 (RGB)
- Values: 0-255 (uint8)
- Each observation is a complete frame of the game

## Action Space Details
- Type: Discrete(6)
- Actions:
  - 0: No operation
  - 1: Fire
  - 2: Move right
  - 3: Move left
  - 4: Fire right
  - 5: Fire left

In [2]:
import matplotlib.pyplot as plt
from pettingzoo.atari import pong_v3

# Create and initialize the environment
print("Creating Pong Environment...")
env = pong_v3.env()
env.reset()

# Display environment information
print("\nEnvironment Information:")
print(f"Agents: {env.agents}")
print(f"Action Space: {env.action_space('first_0')}")
print(f"Observation Space: {env.observation_space('first_0')}")

# Get the initial observation for the first agent
observation, reward, terminated, truncated, info = env.last()
agent = env.agents[0]  # Get the first agent

print(f"\nObservation Details for {agent}:")
print(f"Shape: {observation.shape}")
print(f"Value range: [{observation.min()}, {observation.max()}]")
print(f"Reward: {reward}")
print(f"Game Status - Terminated: {terminated}, Truncated: {truncated}")
print(f"Additional Info: {info}")

# Visualize the observation
plt.figure(figsize=(10, 8))
plt.imshow(observation)
plt.title(f'Game State Observation for {agent}')
plt.axis('on')
plt.colorbar(label='Pixel Values')
plt.show()

# Show RGB channels separately
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
fig.suptitle(f'RGB Channel Breakdown of Game State')

channels = ['Red', 'Green', 'Blue']
for i, (ax, channel) in enumerate(zip(axes, channels)):
    ax.imshow(observation[:, :, i], cmap='gray')
    ax.set_title(f'{channel} Channel')
    ax.axis('on')

plt.tight_layout()
plt.show()

env.close()

ModuleNotFoundError: No module named 'matplotlib'

### 2. Agent Implementation

In [6]:
class Agent:
    """Base Agent class for Pong competition."""
    def __init__(self, env, player_name=None):
        self.env = env
        self.player_name = player_name
        
    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        """Choose an action based on the current game state."""
        return self.env.action_space(self.player_name).sample()
    
    def learn(self):
        """Learning method - to be implemented by specific agents."""
        pass

class AgentAlwaysLeft(Agent):
    """Agent that always moves left."""
    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        """Always choose the 'move left' action (3)."""
        return 3

### 3. Running a Simple Match

In [None]:
def run_match(env, agent1, agent2, max_cycles=500):
    """Run a match between two agents."""
    env.reset()
    
    # Assign player names to agents
    agent1.player_name = env.agents[0]
    agent2.player_name = env.agents[1]
    
    for _ in range(max_cycles):
        for agent in env.agent_iter():
            observation, reward, terminated, truncated, info = env.last()
            
            if terminated or truncated:
                action = None
            else:
                # Choose action based on which agent's turn it is
                if agent == agent1.player_name:
                    action = agent1.choose_action(observation, reward, terminated, truncated, info)
                else:
                    action = agent2.choose_action(observation, reward, terminated, truncated, info)
            
            env.step(action)
            
            if terminated or truncated:
                return
    
    env.close()

# Create environment and agents
env = pong_v3.env()
random_agent = Agent(env)
always_left_agent = AgentAlwaysLeft(env)

print("\nRunning match: Random Agent vs Always Left Agent")
run_match(env, random_agent, always_left_agent)

# Evaluate

In [10]:
import numpy as np
from pettingzoo.atari import pong_v3
from typing import Dict, List, Tuple, Type
import time
import random
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

# Import evaluation function
from pong2024.eval import evaluate_against_multiple_agents

# Define example agents for testing
class YourAgent:
    """Example of a custom agent - currently just random actions."""
    def __init__(self, env, player_name=None):
        self.env = env
        self.player_name = player_name
        
    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        """Choose action randomly."""
        return self.env.action_space(self.player_name).sample()

class AlwaysLeftAgent:
    """Agent that always moves left."""
    def __init__(self, env, player_name=None):
        self.env = env
        self.player_name = player_name
        
    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        """Always choose move left action (3)."""
        return 3

class AlwaysRightAgent:
    """Agent that always moves right."""
    def __init__(self, env, player_name=None):
        self.env = env
        self.player_name = player_name
        
    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        """Always choose move right action (2)."""
        return 2

# Create environment
env = pong_v3.env()

# Define list of opponent agents to evaluate against
opponent_agents = [
    AlwaysLeftAgent,   # Deterministic opponent - always moves left
    AlwaysRightAgent,  # Deterministic opponent - always moves right
    YourAgent          # Random opponent
]

print("Starting evaluation against multiple opponents...")

# Run evaluation against all opponents
results = evaluate_against_multiple_agents(
    env=env,
    main_agent_class=YourAgent,        # Your agent to evaluate
    opponent_classes=opponent_agents,   # List of opponents
    n_games_per_matchup=50,            # Number of games per opponent
    max_cycles=10000,                  # Maximum steps per game
    seed=42                            # For reproducibility
)

# Print summary results
print("\nEvaluation Summary:")
print(f"Overall win rate: {results['summary']['main_agent_overall_winrate']:.1%}")
print(f"Average score: {results['summary']['main_agent_average_score']:.1f}")

print("\nPerformance against each opponent:")
for matchup in results['matchups']:
    print(f"\nVs {matchup['opponent_class']}:")
    print(f"Win rate: {matchup['main_agent_winrate']:.1%}")
    print(f"Average score: {matchup['main_agent_avg_score']:.1f}")


ModuleNotFoundError: No module named 'pettingzoo'

# Feature Engineering

The raw observation from the Pong environment (210, 160, 3) is quite large and complex,
containing a lot of unnecessary information. We can make learning easier by:

1. Dimension Reduction:
   - Extract play area (removing score/info areas)
   - Convert to grayscale (remove color channels)
   - Resize to smaller dimensions

2. Player Perspective Normalization:
   - The game looks different for player 1 vs player 2
   - Flipping the image for player 2 makes both perspectives similar
   - This helps the agent learn a single strategy for both sides

3. Image Processing:
   - Gaussian smoothing reduces noise
   - Binary thresholding separates objects clearly
   - Normalization scales values to [0,1]

4. Temporal Features:
   - Pong is a dynamic environment
   - Consider using frame differences to capture motion
   - Stack multiple frames to provide temporal context
   - Apply same preprocessing to difference frames
5. Extract more specific information

In [11]:
# ML-Arena: Pong 2024 - Feature Engineering
# This notebook demonstrates how to preprocess observations to make learning easier.

import numpy as np
import matplotlib.pyplot as plt
from pettingzoo.atari import pong_v3
import cv2
from scipy.ndimage import gaussian_filter
from pong2024.feature_engineering import preprocess_image

# Create environment and get an observation
env = pong_v3.env()
env.reset()

# Get one observation
for agent in env.agent_iter():
    observation, reward, terminated, truncated, info = env.last()
    break

# Plot original observation
plt.figure(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.imshow(observation)
plt.title('Original Observation\nShape: {}'.format(observation.shape))
plt.axis('on')

# Process the image
processed = simple_preprocess_image(observation)

# Plot processed observation
plt.subplot(1, 3, 2)
plt.imshow(processed, cmap='gray')
plt.title('Processed Observation\nShape: {}'.format(processed.shape))
plt.axis('on')

# Process with flip for player 2
processed_flipped = simple_preprocess_image(observation, flip=True)
plt.subplot(1, 3, 3)
plt.imshow(processed_flipped, cmap='gray')
plt.title('Processed & Flipped\nShape: {}'.format(processed_flipped.shape))
plt.axis('on')

plt.tight_layout()
plt.show()



env.close()

SyntaxError: invalid syntax (4261657193.py, line 1)

# Train

In [None]:
# ML-Arena: Pong 2024 - Q-Learning Agent Training
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import os
from pettingzoo.atari import pong_v3
from pong2024.train import train_parallel_environments
from pong2024.feature_engineering import simple_preprocess_image

class QNetwork(nn.Module):
    """Convolutional Q-Network"""
    def __init__(self, n_actions=6):
        super(QNetwork, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )
        
        self.fc = nn.Sequential(
            nn.Linear(64 * 7 * 7, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions)
        )
        
    def forward(self, x):
        x = x.float() / 255.0  # Normalize
        x = x.unsqueeze(1)     # Add channel dimension
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

from datetime import datetime

class QAgent:
    """Q-Learning Agent with Experience Replay and Auto Save/Load"""
    def __init__(self, env, player_name=None, auto_load_path=None, 
                 auto_save_n_steps=100, auto_save_suffix="q_agent.pth"):
        self.env = env
        self.player_name = player_name
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        # Save/Load parameters
        self.auto_save_n_steps = auto_save_n_steps
        self.auto_save_suffix = auto_save_suffix
        
        # Q-Network and target network
        self.q_network = QNetwork().to(self.device)
        self.target_network = QNetwork().to(self.device)
        
        # Try to load pretrained model if path provided
        if auto_load_path and os.path.exists(auto_load_path):
            print(f"Loading model from {auto_load_path}")
            self.load(auto_load_path)
        else:
            self.target_network.load_state_dict(self.q_network.state_dict())
        
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=0.0001)
        self.memory = deque(maxlen=10000)
        
        # Training parameters
        self.batch_size = 32
        self.gamma = 0.99
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.target_update = 1000
        self.steps = 0
        
    def store_transition(self, state, action, reward, next_state, done):
        """Store transition in replay memory"""
        self.memory.append((state, action, reward, next_state, done))
        
    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        """Choose action using epsilon-greedy policy"""
        # Preprocess observation
        state = simple_preprocess_image(observation, flip=(self.player_name=="second_0"))
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        
        if random.random() < self.epsilon:
            return self.env.action_space(self.player_name).sample()
        
        with torch.no_grad():
            q_values = self.q_network(state)
            return q_values.argmax().item()
    
    def learn(self):
        """Train on a batch from replay memory"""
        if len(self.memory) < self.batch_size:
            return
        
        # Sample batch
        batch = random.sample(self.memory, self.batch_size)
        state_batch = torch.FloatTensor(np.array([t[0] for t in batch])).to(self.device)
        action_batch = torch.LongTensor(np.array([t[1] for t in batch])).to(self.device)
        reward_batch = torch.FloatTensor(np.array([t[2] for t in batch])).to(self.device)
        next_state_batch = torch.FloatTensor(np.array([t[3] for t in batch])).to(self.device)
        done_batch = torch.FloatTensor(np.array([t[4] for t in batch])).to(self.device)
        
        # Compute current Q values
        current_q_values = self.q_network(state_batch).gather(1, action_batch.unsqueeze(1))
        
        # Compute target Q values
        with torch.no_grad():
            next_q_values = self.target_network(next_state_batch).max(1)[0]
            target_q_values = reward_batch + (1 - done_batch) * self.gamma * next_q_values
        
        # Compute loss and update
        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        # Update target network
        self.steps += 1
        if self.steps % self.target_update == 0:
            self.target_network.load_state_dict(self.q_network.state_dict())
        
        # Auto-save if needed
        if self.auto_save_n_steps > 0 and self.steps % self.auto_save_n_steps == 0:
            timestamp = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
            save_path = f"{timestamp}_{self.auto_save_suffix}"
            self.save(save_path)
            print(f"Auto-saved model to {save_path}")
        
        # Decay epsilon
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def save(self, path="q_agent.pth"):
        """Save model"""
        os.makedirs(os.path.dirname(path) if os.path.dirname(path) else '.', exist_ok=True)
        torch.save({
            'q_network_state_dict': self.q_network.state_dict(),
            'target_network_state_dict': self.target_network.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.epsilon,
            'steps': self.steps
        }, path)
    
    def load(self, path="q_agent.pth"):
        """Load model"""
        if os.path.exists(path):
            checkpoint = torch.load(path)
            self.q_network.load_state_dict(checkpoint['q_network_state_dict'])
            self.target_network.load_state_dict(checkpoint['target_network_state_dict'])
            self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            self.epsilon = checkpoint['epsilon']
            self.steps = checkpoint['steps']
            print(f"Loaded model from {path} (steps: {self.steps}, epsilon: {self.epsilon:.3f})")
        else:
            print(f"No model found at {path}, starting from scratch")

# Training setup
def make_env():
    return pong_v3.env()

# Define opponents with their probabilities
opponent_classes = [RandomAgent, AlwaysLeftAgent]
opponent_probs = [0.7, 0.3]  # 70% Random, 30% AlwaysLeft

# Train the agent
print("Starting training...")
results = train_parallel_environments(
    make_env=make_env,
    main_agent_class=QAgent,
    opponent_classes=opponent_classes,
    opponent_probs=opponent_probs,
    n_envs=8,                # Number of parallel environments
    n_total_episodes=10000,  # Total training episodes
    eval_frequency=100,      # Evaluate every 100 episodes
    max_cycles=3000         # Max steps per episode
)

print("\nTraining completed!")
print(f"Total time: {results['summary']['total_training_time']:.1f} seconds")

# Save the trained agent (last instance from parallel envs)
main_agents[-1].save("trained_q_agent.pth")
