In [2]:
pip install numpy gymnasium torch matplotlib ale-py opencv-python

Collecting matplotlib
  Obtaining dependency information for matplotlib from https://files.pythonhosted.org/packages/0f/70/d61a591958325c357204870b5e7b164f93f2a8cca1dc6ce940f563909a13/matplotlib-3.10.3-cp312-cp312-macosx_11_0_arm64.whl.metadata
  Using cached matplotlib-3.10.3-cp312-cp312-macosx_11_0_arm64.whl.metadata (11 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Obtaining dependency information for contourpy>=1.0.1 from https://files.pythonhosted.org/packages/53/3e/405b59cfa13021a56bba395a6b3aca8cec012b45bf177b0eaf7a202cde2c/contourpy-1.3.3-cp312-cp312-macosx_11_0_arm64.whl.metadata
  Downloading contourpy-1.3.3-cp312-cp312-macosx_11_0_arm64.whl.metadata (5.5 kB)
Collecting cycler>=0.10 (from matplotlib)
  Obtaining dependency information for cycler>=0.10 from https://files.pythonhosted.org/packages/e7/05/c19819d5e3d95294a6f5947fb9b9629efb316b96de511b418c53d245aae6/cycler-0.12.1-py3-none-any.whl.metadata
  Using cached cycler-0.12.1-py3-none-any.whl.metadata (3.8 kB)
Collec

In [2]:
# model_visualization.py

import os
import numpy as np
import torch
import torch.nn as nn
import gymnasium as gym
import time
import ale_py

# Define the same QNetwork architecture used in training
class QNetwork(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(4, 32, 8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, 4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, 3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(3136, 512),
            nn.ReLU(),
            nn.Linear(512, env.action_space.n),
        )

    def forward(self, x):
        return self.network(x / 255.0)

# Define environment wrappers
class NoopResetEnv(gym.Wrapper):
    def __init__(self, env, noop_max=30):
        super().__init__(env)
        self.noop_max = noop_max
        self.override_num_noops = None
        self.noop_action = 0
        assert env.unwrapped.get_action_meanings()[0] == "NOOP"

    def reset(self, **kwargs):
        self.env.reset(**kwargs)
        if self.override_num_noops is not None:
            noops = self.override_num_noops
        else:
            noops = self.unwrapped.np_random.integers(1, self.noop_max + 1)
        assert noops > 0
        obs = np.zeros(0)
        info = {}
        for _ in range(noops):
            obs, _, terminated, truncated, info = self.env.step(self.noop_action)
            if terminated or truncated:
                obs, info = self.env.reset(**kwargs)
        return obs, info

class MaxAndSkipEnv(gym.Wrapper):
    def __init__(self, env, skip=4):
        super().__init__(env)
        self._obs_buffer = np.zeros((2, *env.observation_space.shape), dtype=env.observation_space.dtype)
        self._skip = skip

    def step(self, action):
        total_reward = 0.0
        terminated = truncated = False
        for i in range(self._skip):
            obs, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            if i == self._skip - 2:
                self._obs_buffer[0] = obs
            if i == self._skip - 1:
                self._obs_buffer[1] = obs
            total_reward += float(reward)
            if done:
                break
        max_frame = self._obs_buffer.max(axis=0)
        return max_frame, total_reward, terminated, truncated, info

class EpisodicLifeEnv(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.lives = 0
        self.was_real_done = True

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.was_real_done = terminated or truncated
        lives = self.env.unwrapped.ale.lives()
        if 0 < lives < self.lives:
            terminated = True
        self.lives = lives
        return obs, reward, terminated, truncated, info

    def reset(self, **kwargs):
        if self.was_real_done:
            obs, info = self.env.reset(**kwargs)
        else:
            obs, _, terminated, truncated, info = self.env.step(0)
            if terminated or truncated:
                obs, info = self.env.reset(**kwargs)
        self.lives = self.env.unwrapped.ale.lives()
        return obs, info

class FireResetEnv(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        assert env.unwrapped.get_action_meanings()[1] == "FIRE"
        assert len(env.unwrapped.get_action_meanings()) >= 3

    def reset(self, **kwargs):
        self.env.reset(**kwargs)
        obs, _, terminated, truncated, _ = self.env.step(1)
        if terminated or truncated:
            self.env.reset(**kwargs)
        obs, _, terminated, truncated, _ = self.env.step(2)
        if terminated or truncated:
            self.env.reset(**kwargs)
        return obs, {}

class ClipRewardEnv(gym.RewardWrapper):
    def __init__(self, env):
        super().__init__(env)

    def reward(self, reward):
        return np.sign(float(reward))

# Function to create the environment with the same wrappers as during training
def make_atari_env(env_id, render_mode=None):
    if render_mode:
        env = gym.make(env_id, obs_type="grayscale", render_mode=render_mode)
    else:
        env = gym.make(env_id, obs_type="grayscale")
    
    env = gym.wrappers.RecordEpisodeStatistics(env)
    env = NoopResetEnv(env, noop_max=30)
    env = MaxAndSkipEnv(env, skip=4)
    env = EpisodicLifeEnv(env)
    if "FIRE" in env.unwrapped.get_action_meanings():
        env = FireResetEnv(env)
    env = ClipRewardEnv(env)
    env = gym.wrappers.ResizeObservation(env, (84, 84))
    env = gym.wrappers.FrameStackObservation(env, 4)
    
    return env

# Visualization function that uses the environment's native rendering
def visualize_agent(env, model, episodes=5, device="cpu"):
    """
    Visualize the agent's performance using native environment rendering
    """
    for episode in range(episodes):
        obs, _ = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            # Epsilon-greedy action selection
            q_values = model(torch.FloatTensor(obs).unsqueeze(0).to(device))
            action = torch.argmax(q_values, dim=1).item()
            
            # Take action - environment will automatically render to window
            obs, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward
            
            # Add a small delay to make visualization easier to follow
            time.sleep(0.02)
        
        print(f"Episode {episode+1} finished with reward: {total_reward}")

# Main function to load model and visualize agent
def main():
    # Parameters
    env_id = "ALE/Pong-v5"  # Change to your environment
    model_path = "notebook_run123213213.cleanrl_model"  # Update with your model path
    epsilon = 0.01  # Exploration rate during visualization
    episodes = 3  # Number of episodes to visualize
    
    # Check if CUDA is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Create environment with HUMAN rendering
    env = make_atari_env(env_id, render_mode="human")
    
    # Initialize model
    q_network = QNetwork(env).to(device)
    
    # Load trained model weights
    q_network.load_state_dict(torch.load(model_path, map_location=device))
    q_network.eval()
    
    print(f"Model loaded from {model_path}")
    print(f"Starting visualization for {episodes} episodes...")
    
    # Run visualization
    visualize_agent(env, q_network, episodes, device)
    
    env.close()

# Run the visualization
if __name__ == "__main__":
    main()

Using device: cpu
Model loaded from notebook_run123213213.cleanrl_model
Starting visualization for 3 episodes...
Episode 1 finished with reward: 11.0
Episode 2 finished with reward: 4.0
Episode 3 finished with reward: 8.0
