In [2]:
!AutoROM -y

AutoROM will download the Atari 2600 ROMs.
They will be installed to:
	/home/monsh/anaconda3/envs/project-johnwick/lib/python3.11/site-packages/AutoROM/roms
	/home/monsh/anaconda3/envs/project-johnwick/lib/python3.11/site-packages/multi_agent_ale_py/roms

Existing ROMs will be overwritten.


In [1]:
from pettingzoo.atari import boxing_v2
import pygame
import os

os.environ["SDL_VIDEODRIVER"] = "dummy"

In [3]:
from __future__ import annotations

import glob
import os
import time

import supersuit as ss
from stable_baselines3 import PPO
from stable_baselines3.ppo import CnnPolicy, MlpPolicy
from pettingzoo.atari import boxing_v2



def train(env_fn, steps: int = 10_000, seed: int | None = 0, **env_kwargs):
    # Train a single model to play as each agent in an AEC environment
    env = env_fn.parallel_env(**env_kwargs)

    # Add black death wrapper so the number of agents stays constant
    # MarkovVectorEnv does not support environments with varying numbers of active agents unless black_death is set to True

    # Pre-process using SuperSuit
    visual_observation = True #not env.unwrapped.vector_state
    if visual_observation:
        # If the observation space is visual, reduce the color channels, resize from 512px to 84px, and apply frame stacking
        # env = ss.color_reduction_v0(env, mode="B")
        # env = ss.resize_v1(env, x_size=84, y_size=84)
        env = ss.frame_stack_v1(env, 3)

    env.reset(seed=seed)

    print(f"Starting training on {str(env.metadata['name'])}.")

    env = ss.pettingzoo_env_to_vec_env_v1(env)
    env = ss.concat_vec_envs_v1(env, 4, num_cpus=1, base_class="stable_baselines3")

    # Use a CNN policy if the observation space is visual
    model = PPO(
        CnnPolicy if visual_observation else MlpPolicy,
        env,
        verbose=1,
        batch_size=64,
        device="cuda"
    )

    model.learn(total_timesteps=steps, progress_bar=True)

    model.save(f"{env.unwrapped.metadata.get('name')}_{time.strftime('%Y%m%d-%H%M%S')}")

    print("Model has been saved.")

    print(f"Finished training on {str(env.unwrapped.metadata['name'])}.")

    env.close()


def eval(env_fn, num_games: int = 100, render_mode: str | None = None, **env_kwargs):
    # Evaluate a trained agent vs a random agent
    env = env_fn.env(render_mode=render_mode, **env_kwargs)

    # Pre-process using SuperSuit
    visual_observation = True # not env.unwrapped.vector_state
    if visual_observation:
        # If the observation space is visual, reduce the color channels, resize from 512px to 84px, and apply frame stacking
        env = ss.color_reduction_v0(env, mode="B")
        env = ss.resize_v1(env, x_size=84, y_size=84)
        env = ss.frame_stack_v1(env, 3)

    print(
        f"\nStarting evaluation on {str(env.metadata['name'])} (num_games={num_games}, render_mode={render_mode})"
    )

    try:
        latest_policy = max(
            glob.glob(f"{env.metadata['name']}*.zip"), key=os.path.getctime
        )
    except ValueError:
        print("Policy not found.")
        exit(0)

    model = PPO.load(latest_policy)

    rewards = {agent: 0 for agent in env.possible_agents}

    # Note: we evaluate here using an AEC environments, to allow for easy A/B testing against random policies
    # For example, we can see here that using a random agent for archer_0 results in less points than the trained agent
    for i in range(num_games):
        env.reset(seed=i)
        env.action_space(env.possible_agents[0]).seed(i)

        for agent in env.agent_iter():
            obs, reward, termination, truncation, info = env.last()

            for a in env.agents:
                rewards[a] += env.rewards[a]

            if termination or truncation:
                break
            else:
                if agent == env.possible_agents[0]:
                    act = env.action_space(agent).sample()
                else:
                    act = model.predict(obs, deterministic=True)[0]
            env.step(act)
    env.close()

    avg_reward = sum(rewards.values()) / len(rewards.values())
    avg_reward_per_agent = {
        agent: rewards[agent] / num_games for agent in env.possible_agents
    }
    print(f"Avg reward: {avg_reward}")
    print("Avg reward per agent, per game: ", avg_reward_per_agent)
    print("Full rewards: ", rewards)

    return avg_reward

In [4]:
# env_fn = boxing_v2

# # Set vector_state to false in order to use visual observations (significantly longer training time)
# env_kwargs = {}

# # Train a model (takes ~5 minutes on a laptop CPU)
# train(env_fn, steps=819_200, seed=0, **env_kwargs)

In [6]:
import random
from pettingzoo.atari import boxing_v2
from gym.wrappers import RecordVideo

render_mode = 'human'#"rgb_array" # 
num_games = 1

# Evaluate a trained agent vs a random agent
env = boxing_v2.env(render_mode=render_mode)

# Pre-process using SuperSuit
# visual_observation = not env.unwrapped.vector_state
# if visual_observation:
    # If the observation space is visual, reduce the color channels, resize from 512px to 84px, and apply frame stacking


env = ss.color_reduction_v0(env, mode="B")
env = ss.resize_v1(env, x_size=84, y_size=84)
env = ss.frame_stack_v1(env, 3)

# env = RecordVideo(env=env, video_folder="video", name_prefix="test-video", episode_trigger=lambda x: x % 2 == 0)

model = PPO.load("/home/monsh/works/image/boxing-engine/boxing_v2_20250305-130634.zip")

rewards = {agent: 0 for agent in env.possible_agents}

seed = random.randint(0, 1000)

# Note: we evaluate here using an AEC environments, to allow for easy A/B testing against random policies
# For example, we can see here that using a random agent for archer_0 results in less points than the trained agent
for i in range(num_games):
    env.reset(seed=seed)
    # env.start_video_recorder()
    env.action_space(env.possible_agents[0]).seed(seed)

    for agent in env.agent_iter():
        obs, reward, termination, truncation, info = env.last()

        for a in env.agents:
            rewards[a] += env.rewards[a]

        if termination or truncation:
            break
        else:
            if agent in env.possible_agents[0]:
                act = env.action_space(agent).sample()
            else:
                act = model.predict(obs, deterministic=True)[0]
        
        # plt.imshow(env.render())
        # plt.show()

        env.step(act)
        # break
    break

# env.close_video_recorder()
env.close()

ValueError: Error: Unexpected observation shape (84, 84, 3) for Box environment, please use (8, 84, 84) or (n_env, 8, 84, 84) for the observation shape.

In [7]:
obs

array([[[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0, 66],
        [ 0,  0, 66],
        [ 0,  0, 66]],

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0, 66],
        [ 0,  0, 66],
        [ 0,  0, 66]],

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0, 66],
        [ 0,  0, 66],
        [ 0,  0, 66]],

       ...,

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0, 66],
        [ 0,  0, 66],
        [ 0,  0, 66]],

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0, 66],
        [ 0,  0, 66],
        [ 0,  0, 66]],

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0, 66],
        [ 0,  0, 66],
        [ 0,  0, 66]]], shape=(84, 84, 3), dtype=uint8)