In [1]:
from pettingzoo.classic import connect_four_v3

In [2]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import os
os.environ["SDL_VIDEODRIVER"] = "dummy"
from IPython.display import clear_output

In [3]:
env = connect_four_v3.env(render_mode="rgb_array")

# Agents

Here are some implementations of trivial agents that you should be able to beat ultimately. 

In [4]:
class RandomPlayer:
    def __init__(self, rng=None):
        if rng is None:
            self.rng = np.random.default_rng()
        else:
            self.rng = rng

        self.name = "Random Player"

    def get_action(self, obs_mask, epsilon=None):
        return self.random_choice_with_mask(np.arange(7), obs_mask["action_mask"])

    def random_choice_with_mask(self, arr, mask):
        masked_arr = np.ma.masked_array(arr, mask=1 - mask)
        if masked_arr.count() == 0:
            return None
        return self.rng.choice(masked_arr.compressed())


In [5]:
class PlayLeftmostLegal:
    def __init__(self):
        self.name = "Left Player"

    def get_action(self, obs_mask, epsilon=None):
        for i, legal in enumerate(obs_mask["action_mask"]):
            if legal:
                return i
        return None


# Running a game


The following function runs a full game between the two agents. 

In [6]:
def play_game(env, agent0, agent1, display=False):
    done = False
    env.reset()
    obs, _, _, _, _ = env.last()
    while not done:
        for i, agent in enumerate([agent0, agent1]):
            action = agent.get_action(obs, epsilon=0)
            env.step(action)
            if display:
                clear_output(wait=True)
                plt.imshow(env.render())
                plt.show()
            obs, reward, terminated, _, _ = env.last()
            done = terminated
            if np.sum(obs["action_mask"]) == 0:
                if display: 
                    print('Draw')
                return 0.5
            if done:
                if display:
                    print(f"Player {i}: {agent.name} won")
                    print(obs['observation'][:, :, 0]- obs['observation'][:, :, 1])
                return i

In [7]:
# agent0 = RandomPlayer()
# agent1 = PlayLeftmostLegal()

# play_game(env, agent0, agent1, display=True)

# Emulating a Gym environment

If we fix the opposite policy, the game from the point of view of the agent is equivalent to a Gym environment. The following class implements this simulation. Then any algorithm that would work in a gym environment with the same observations will work here. 

Note that we implemented the possibility to be the first or the second player. 

In [8]:
class EnvAgainstPolicy: 
    def __init__(self, env, policy, first_player=True):
        self.policy = policy
        self.env = env
        self.first_player = first_player
        self.reset()

    def step(self, action):
        self.env.step(action)
        obs, reward, terminated, _, _ = self.env.last()
        if terminated: 
            self.last_step = obs, reward, True, False, {}
        else: 
            action = self.policy.get_action(obs)
            self.env.step(action)
            obs, reward, terminated, _, _ = self.env.last()
            self.last_step = obs, -reward, terminated, False, {}
        return self.last_step

    def reset(self):
        self.env.reset()
        if not(self.first_player): 
            obs, _, _, _, _ = self.env.last()
            action = self.policy.get_action(obs)
            self.env.step(action)

        self.last_step = self.env.last()
        return self.last_step

    def last(self):
        return self.last_step

# Evaluating an agent against a fixed policy: 

Using the environment above, we can evaluate the agent against this fixed policy. 

In [9]:
def eval_against_policy(env, agent, policy, n_episodes=10, first_player=True):
    eval_env = EnvAgainstPolicy(env, policy, first_player=first_player)
    results = []
    for _ in range(n_episodes):
        done = False
        eval_env.reset()
        obs, _, _, _, _ = eval_env.last()
        while not done:
            action = agent.get_action(obs, epsilon=0)
            eval_env.step(action)
            obs, reward, done, _, _ = eval_env.last()
        results.append(reward)
    return results

We can see that if both players play randomly, there is a small but significant advantage to the first player. 

In [10]:
# plt.hist(eval_against_policy(env, RandomPlayer(), RandomPlayer(), n_episodes=1000, first_player=False))
# plt.show()
# plt.hist(eval_against_policy(env, RandomPlayer(), RandomPlayer(), n_episodes=1000, first_player=True))
# plt.show()

# Your turn 

Try to build a decent agent. Be creative! You can try any idea that you have: the grade is not about performance of the agent, but more about illustrating phenomena happening in Reinforcement Learning for turn-based games. It's okay to 'help' the agent in any way, as long as it follows the ideas of RL (i.e., as long as there is some learning involved).




In [11]:
import utils
import importlib
importlib.reload(utils)

<module 'utils' from '/Users/sachamuller/Documents/Scolaire/5-CentraleSupelec/3A/7-SM11/RL/connect4/utils.py'>

In [12]:
gamma = 0.99
batch_size = 128
buffer_capacity = 10_000
update_target_every = 32

epsilon_start = 0.9
decrease_epsilon_factor = 1000
epsilon_min = 0.05

learning_rate = 1e-1

DQN_agent = utils.DQN_Skeleton(
        action_space_size=7, # number of columns
        observation_space_size = 6*7,  # nb_rows, nb_columns
        gamma=gamma,
        batch_size=batch_size,
        buffer_capacity=buffer_capacity,
        update_target_every=update_target_every,
        epsilon_start=epsilon_start,
        decrease_epsilon_factor=decrease_epsilon_factor,
        epsilon_min=epsilon_min,
        learning_rate=learning_rate,
        env=env,
    )

In [13]:
random_player = RandomPlayer()
eval_against_policy(env, DQN_agent, random_player, n_episodes=10, first_player=True)

[1, 1, 1, 1, 1, -1, 1, -1, 1, 1]

In [14]:
def train_player_0(env, player_0, player_1, n_episodes, eval_every=50, reward_threshold=0.9):
    
    agents = [player_0, player_1]
    losses = []
    all_rewards = []
    
    for ep in tqdm(range(n_episodes), desc="Train"):
        done = False
        state = env.reset()
        length_episode = 0
        while not done:
            length_episode += 1
            for agent in agents:
                next_state, reward, terminated, truncated, info = env.last()
                done = terminated or truncated
                if done:
                    break 
                else:
                    action = agent.get_action(next_state)
                    env.step(action)
                    if agent == player_0 :
                        loss_val = agent.update(state, action, reward, terminated, next_state)
                        losses.append(loss_val)
                        state = next_state   

                    
        if ep%eval_every == 0:
            print("[Train] Evaluating the DQN Agent.")
            rewards = eval_against_policy(env, player_0, player_1, n_episodes=10, first_player=True)
            all_rewards.append(np.mean(rewards))
            print(f"Mean reward is: {np.mean(rewards)}")
            if np.mean(rewards) >= reward_threshold:
                break
                
    return losses, all_rewards

In [15]:
random_player = RandomPlayer()
losses = train_player_0(env, DQN_agent, random_player, 100)

  torch.tensor(state_tensor).unsqueeze(0),
  torch.tensor(next_state_tensor).unsqueeze(0),
Train:   8%|▊         | 8/100 [00:00<00:02, 43.82it/s]

[Train] Evaluating the DQN Agent.
Mean reward is: 0.8


Train:  53%|█████▎    | 53/100 [00:02<00:02, 21.43it/s]

[Train] Evaluating the DQN Agent.
Mean reward is: 0.4


Train: 100%|██████████| 100/100 [00:04<00:00, 23.67it/s]
