In [1]:
from kaggle_environments import make
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
%matplotlib inline
from kaggle_environments import make, evaluate
from gym import spaces
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback, CheckpointCallback
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
from stable_baselines3 import PPO

Loading environment football failed: No module named 'gfootball'


In [2]:
LOGDIR = os.path.join(".","logs","custom_ppo_1")
MONITOR_LOGS_DIR = os.path.join(LOGDIR,"monitor_logs")
TB_LOGS_DIR = os.path.join(LOGDIR,"tensorboard_logs")
MODEL_DIR = os.path.join(LOGDIR,"model")
CHECKPOINTS_DIR = os.path.join(LOGDIR,"checkpoints")

In [3]:
class ConnectFourGym:
    def __init__(self, agent2="random", warmup_episode_count = 100, warmup_timesteps = 5000):
        self.env  = make("connectx", debug=True)
        self.trainer = self.env.train([None, agent2])
        self.rows = self.env.configuration.rows
        self.columns = self.env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(self.rows,self.columns,1), dtype=np.int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
        
        self.episode_count = 0
        self.timesteps = 0
        self.warmup_episode_count = warmup_episode_count
        self.warmup_timesteps = warmup_timesteps

    def reset(self):
        self.episode_count += 1
        self.obs = self.trainer.reset()
        return np.array(self.obs['board']).reshape(self.rows,self.columns,1)
    def change_reward(self, old_reward, done):
        if old_reward == 1: # The agent won the game
            return 1
        elif done: # The opponent won the game
            return -1
        else: # Reward 1/42
            return 1/(self.rows*self.columns)
    def step(self, action):
        #print('episode count: ' + str(self.episode_count))
        #print('timesteps: ' + str(self.timesteps))
        self.timesteps += 1
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.trainer.step(int(action))
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = -10, True, {}
        return np.array(self.obs['board']).reshape(self.rows,self.columns,1), reward, done, _
    
    def load_new_opponents_from_best_model(self):
        if self.episode_count < self.warmup_episode_count or self.timesteps < self.warmup_timesteps:
            return True
        print("Loading new opponent from current best model for self play!!!")
        loaded_model = PPO.load(os.path.join(MODEL_DIR, "best_model")) 
        
        def agent_ppo(obs, config):
            # Use the best model to select a column
            col, _ = loaded_model.predict(np.array(obs['board']).reshape(6,7,1))
            # Check if selected column is valid
            is_valid = (obs['board'][int(col)] == 0)
            # If not valid, select random move. 
            if is_valid:
                return int(col)
            else:
                return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])
        
        self.agent2 = agent_ppo
        self.trainer = self.env.train([None, self.agent2])
        self.reset()

class LoadNewOpponentsFromBestModelCallback(BaseCallback):
    def __init__(self, env, verbose: int = 0):
        super(LoadNewOpponentsFromBestModelCallback, self).__init__(verbose=verbose)
        self.env = env

    def _on_step(self):
        env.load_new_opponents_from_best_model()
        return True
    
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [4]:
env = Monitor(ConnectFourGym())
eval_env = DummyVecEnv([lambda:Monitor(ConnectFourGym())])

checkpoint_callback = CheckpointCallback(save_freq=1000, save_path=CHECKPOINTS_DIR, name_prefix="rl_model")
load_new_opponents_from_best_model_callback = LoadNewOpponentsFromBestModelCallback(env)

eval_callback = EvalCallback(eval_env, best_model_save_path=MODEL_DIR,
                             log_path=LOGDIR, eval_freq=20,
                             deterministic=True, render=False
                            , callback_on_new_best=load_new_opponents_from_best_model_callback, verbose=0)

In [5]:
model = PPO(policy = 'MlpPolicy'
                , env = env
                , verbose = 0
                , n_steps = 2048*16
                , batch_size = 128
                , n_epochs = 50
                , tensorboard_log = TB_LOGS_DIR
                , learning_rate = .01)

start_time = time.time()
model.learn(total_timesteps=100000, callback=[checkpoint_callback, eval_callback])
end_time = time.time()
print('training took ' + str((end_time - start_time)/60) + ' minutes')

Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
Loading new opponent from current best model for self play!!!
training took -27.236652890841167 minutes


In [14]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(6,7,1), deterministic=True)
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [7]:
#test_env = make("connectx")
#test_env.play([None, agent1])

In [None]:
get_win_percentages(agent1=agent1, agent2="random", n_rounds=1000)

In [13]:
get_win_percentages(agent1=agent1, agent2="random", n_rounds=1000)

Agent 1 Win Percentage: 0.58
Agent 2 Win Percentage: 0.42
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [None]:
# It's barely better than random. Sad.