# chess

In [None]:
# install dependencies
!pip install swig==4.2.1 2>&1
!pip install gymnasium==0.29.1 2>&1
!pip install pettingzoo[classic]==1.24.3 2>&1
!pip install box2d-py==2.3.5 2>&1
!pip install stable_baselines3==2.3.0 2>&1
!pip install sb3_contrib==2.3.0 2>&1

In [1]:
from pettingzoo.classic import chess_v6

In [None]:
env = chess_v6.env()
env.reset(seed=42)
cum_reward = {agent:0 for agent in env.agents}

# interact with env
for agent in env.agent_iter():
    observation, reward, termination, truncation, info = env.last()
    cum_reward[agent] += reward
    if termination or truncation:
        action = None
    else:
        mask = observation["action_mask"]
        # this is where you would insert your policy
        action = env.action_space(agent).sample(mask)

    env.step(action)
env.close()
print(cum_reward)

In [3]:
class Agent:
    def __init__(self, env):
        self.env = env

    def choose_action(self, observation, action_mask=None):
        if self.env.action_space.__class__.__name__ == "method":  # Multiplayer env
            action = self.env.action_space(self.env.agents[0]).sample(action_mask)
        else:  # Classic gymnasium
            action = self.env.action_space.sample()
        return action

In [4]:
for _ in range(100):
    env = chess_v6.env()
    env.reset()#seed=423)
    cum_reward = {agent:0 for agent in env.agents}
    last_done = {agent: False for agent in env.agents}
    observation, reward, termination, truncation, info = env.last()
    agents = env.agents
    
    ft_agent = Agent(env)
    
    # interact with env
    while not all(last_done.values()):
        for agent in agents:
            if any(last_done.values()):
                action = None
            else:
                mask = observation["action_mask"]
                # this is where you would insert your policy
                action = ft_agent.choose_action(observation['observation'], observation['action_mask'])#env.action_space(agent).sample(mask)
            
            env.step(action)
            observation, reward, termination, truncation, info = env.last()
            print(cum_reward)
            cum_reward[agent] += reward
            print(cum_reward)
            last_done[agent] = termination or truncation
            print(agent, action, reward, last_done)
            print(env.agents)
    print(cum_reward)
    print(env.agents)
    env.close()

{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_0 645 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_1 2997 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_0 4165 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_1 669 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_0 4238 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_1 85 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0': 0, 'player_1': 0}
{'player_0': 0, 'player_1': 0}
player_0 85 0 {'player_0': False, 'player_1': False}
['player_0', 'player_1']
{'player_0':

KeyboardInterrupt: 

In [None]:
env = chess_v6.env()
env.reset(seed=42)
env.last()[0]['action_mask']

## Train agent

In [None]:
model_path = "my_model.zip"

In [None]:
import glob
import os
import time

from sb3_contrib import MaskablePPO
from sb3_contrib.common.maskable.policies import MaskableActorCriticPolicy
from sb3_contrib.common.wrappers import ActionMasker

import pettingzoo.utils
from pettingzoo.classic import chess_v6


class SB3ActionMaskWrapper(pettingzoo.utils.BaseWrapper):
    """Wrapper to allow PettingZoo environments to be used with SB3 illegal action masking."""

    def reset(self, seed=None, options=None):
        """Gymnasium-like reset function which assigns obs/action spaces to be the same for each agent.

        This is required as SB3 is designed for single-agent RL and doesn't expect obs/action spaces to be functions
        """
        super().reset(seed, options)

        # Strip the action mask out from the observation space
        self.observation_space = super().observation_space(self.possible_agents[0])[
            "observation"
        ]
        self.action_space = super().action_space(self.possible_agents[0])

        # Return initial observation, info (PettingZoo AEC envs do not by default)
        return self.observe(self.agent_selection), {}

    def step(self, action):
        """Gymnasium-like step function, returning observation, reward, termination, truncation, info."""
        super().step(action)
        return super().last()

    def observe(self, agent):
        """Return only raw observation, removing action mask."""
        return super().observe(agent)["observation"]

    def action_mask(self):
        """Separate function used in order to access the action mask."""
        return super().observe(self.agent_selection)["action_mask"]


def mask_fn(env):
    return env.action_mask()


def train_action_mask(env_fn, steps=10_000, seed=0, **env_kwargs):
    """Train a single model to play as each agent in a zero-sum game environment using invalid action masking."""
    env = env_fn.env(**env_kwargs)

    print(f"Starting training on {str(env.metadata['name'])}.")

    # Custom wrapper to convert PettingZoo envs to work with SB3 action masking
    env = SB3ActionMaskWrapper(env)

    env.reset(seed=seed)  # Must call reset() in order to re-define the spaces

    env = ActionMasker(env, mask_fn)  # Wrap to enable masking (SB3 function)
    # MaskablePPO behaves the same as SB3's PPO unless the env is wrapped
    # with ActionMasker. If the wrapper is detected, the masks are automatically
    # retrieved and used when learning. Note that MaskablePPO does not accept
    # a new action_mask_fn kwarg, as it did in an earlier draft.
    model = MaskablePPO(MaskableActorCriticPolicy, env, verbose=1)
    model.set_random_seed(seed)
    model.learn(total_timesteps=steps)

    model.save(model_path)

    print("Model has been saved.")

    print(f"Finished training on {str(env.unwrapped.metadata['name'])}.\n")

    env.close()


def eval_action_mask(env_fn, num_games=100, render_mode=None, **env_kwargs):
    # Evaluate a trained agent vs a random agent
    env = env_fn.env(render_mode=render_mode, **env_kwargs)

    print(
        f"Starting evaluation vs a random agent. Trained agent will play as {env.possible_agents[1]}."
    )


    model = MaskablePPO.load(model_path)

    scores = {agent: 0 for agent in env.possible_agents}
    total_rewards = {agent: 0 for agent in env.possible_agents}
    round_rewards = []

    for i in range(num_games):
        env.reset(seed=i)
        env.action_space(env.possible_agents[0]).seed(i)

        for agent in env.agent_iter():
            obs, reward, termination, truncation, info = env.last()

            # Separate observation and action mask
            observation, action_mask = obs.values()

            if termination or truncation:
                # If there is a winner, keep track, otherwise don't change the scores (tie)
                if (
                    env.rewards[env.possible_agents[0]]
                    != env.rewards[env.possible_agents[1]]
                ):
                    winner = max(env.rewards, key=env.rewards.get)
                    scores[winner] += env.rewards[
                        winner
                    ]  # only tracks the largest reward (winner of game)
                # Also track negative and positive rewards (penalizes illegal moves)
                for a in env.possible_agents:
                    total_rewards[a] += env.rewards[a]
                # List of rewards by round, for reference
                round_rewards.append(env.rewards)
                break
            else:
                if agent == env.possible_agents[0]:
                    act = env.action_space(agent).sample(action_mask)
                else:
                    # Note: PettingZoo expects integer actions # TODO: change chess to cast actions to type int?
                    act = int(
                        model.predict(
                            observation, action_masks=action_mask, deterministic=True
                        )[0]
                    )
            env.step(act)
    env.close()

    # Avoid dividing by zero
    if sum(scores.values()) == 0:
        winrate = 0
    else:
        winrate = scores[env.possible_agents[1]] / sum(scores.values())
    print("Rewards by round: ", round_rewards)
    print("Total rewards (incl. negative rewards): ", total_rewards)
    print("Winrate: ", winrate)
    print("Final scores: ", scores)
    return round_rewards, total_rewards, winrate, scores

In [None]:
env_fn = chess_v6
env_kwargs = {}
train_action_mask(env_fn, steps=20_480, seed=0, **env_kwargs)

In [None]:
# Evaluate 100 games against a random agent
res = eval_action_mask(env_fn, num_games=100, render_mode=None, **env_kwargs)

## Publish agent

In [None]:
class Agent:
    def __init__(self, env):
        self.env = env
        from sb3_contrib import MaskablePPO
        self.model =  MaskablePPO.load(model_path)

    def choose_action(self, observation, action_mask=None):
        action, _states = self.model.predict(observation, deterministic=True)
        return action

In [None]:
my_stringify_agent = f"""
class Agent:
    def __init__(self, env):
        self.env = env
        from sb3_contrib import MaskablePPO
        self.model =  MaskablePPO.load("{model_path}")

    def choose_action(self, observation, action_mask=None):
        action, _states = self.model.predict(observation, deterministic=True)
        return action
"""

In [None]:
# go to https://rlarena.com/my-profile to get or generate your key pair
key_id=""
key_pass=""
#
agent_attach_name= "my_super_agent"

In [None]:
import requests
import io

def attach_agent(key_id, key_pass, agent_attach_name, model_path):
    # Endpoint URL
    url = 'https://rlarena.com/api/direct_attache_agents_notebook/competition/4'

    # Your credentials and agent details
    data = {
        'key_id': key_id,
        'key_pass': key_pass,
        'agent_attach_name': agent_attach_name,
    }

    # Files to upload: agent.py and the model
    files = {
        'model': (model_path, open(model_path, 'rb')),
        'agent_code': ('agent.py', io.StringIO(my_stringify_agent)),
    }

    # Make the POST request
    try:
        response = requests.post(url, data=data, files=files)
        # Ensure files are closed properly
        files['model'][1].close()
        files['agent_code'][1].close()
        return response.json()  # Return the JSON response
    except Exception as e:
        print(f"Failed to attach agent due to: {str(e)}")
        return None


response = attach_agent(key_id, key_pass, agent_attach_name, model_path)
print(response)