# A space to test code snippets

import gymnasium as gym

# Initialise the environment
env = gym.make("LunarLander-v3", render_mode="human")

# Reset the environment to generate the first observation
observation, info = env.reset(seed=42)
for _ in range(1000):
    # this is where you would insert your policy
    action = env.action_space.sample()

    # step (transition) through the environment with the action
    # receiving the next observation, reward and if the episode has terminated or truncated
    observation, reward, terminated, truncated, info = env.step(action)

    # If the episode has ended then we can reset to start a new episode
    if terminated or truncated:
        observation, info = env.reset()

env.close()

## Custom environment

In [None]:
import random
import numpy as np
# import pandas as pd
import gym
# import matplotlib.pyplot as plt
# %matplotlib inline

from kaggle_environments import make, evaluate
from gym import spaces

2025-01-09 14:37:35.666274: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1736429855.679506     652 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1736429855.683493     652 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-09 14:37:35.697201: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
    def reset(self):
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns)
    def change_reward(self, old_reward, done):
        if old_reward == 1: # The agent won the game
            return 1
        elif done: # The opponent won the game
            return -1
        else: # Reward 1/42
            return 1/(self.rows*self.columns)
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = -10, True, {}
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _

## Neural network

In [3]:
import torch as th
import torch.nn as nn
from stable_baselines3 import PPO 
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

In [4]:
class CustomCNN(BaseFeaturesExtractor):
    # Neural network for predicting action values    
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)

# Setting up the training

In [4]:
import os
from stable_baselines3.common.evaluation import evaluate_policy

In [5]:
# prepare output folders of model snapshots and logs for tensorboard
models_dir, logs_dir = "models/connect4/PPO", "logs/connect4"

if not os.path.exists(models_dir):
    os.makedirs(models_dir)

if not os.path.exists(logs_dir):
    os.makedirs(logs_dir)

In [7]:
# Create ConnectFour environment 
env = ConnectFourGym(agent2="negamax")

In [8]:
# create the model of choice
model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=1, tensorboard_log=logs_dir)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




In [9]:
# train the model, save the snapshots and create / save the logs
TIMESTEPS = 1e4 # timesteps for each training episode
episodes = 10 # number of training episodes
for episode in range(episodes):
    model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name="PPO")
    model.save(f"{models_dir}/{TIMESTEPS*(episode + 1)}")

Logging to logs/connect4/PPO_0
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 8.52     |
|    ep_rew_mean     | -2.8     |
| time/              |          |
|    fps             | 17       |
|    iterations      | 1        |
|    time_elapsed    | 119      |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 8.77         |
|    ep_rew_mean          | -2.16        |
| time/                   |              |
|    fps                  | 17           |
|    iterations           | 2            |
|    time_elapsed         | 238          |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0069445404 |
|    clip_fraction        | 0.0595       |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.94        |
|    explained_variance   |

In [13]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [14]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [16]:
get_win_percentages(agent1=agent1, agent2="negamax")

Agent 1 Win Percentage: 0.01
Agent 2 Win Percentage: 0.99
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [1]:
# Create the game environment
env_eval = make("connectx")

# Two random agents play one game round
env_eval.run([agent1, "negamax"])

# Show the game
env_eval.render(mode="ipython")

NameError: name 'make' is not defined

## Load and evaluate previously save model state

The approach for this is a bit twisted with the Kaggle environment.

In [1]:
# imports and path settings
import gym as gym
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from kaggle_environments import make, evaluate

models_dir, logs_dir = "models/connect4/PPO", "logs/connect4"

2025-01-09 14:35:09.475132: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1736429709.488440     573 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1736429709.492437     573 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-01-09 14:35:09.506508: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [13]:
# create environment, load and evaluate model @ a saved snapshot
env = ConnectFourGym(agent2="negamax")
env.reset()

# to evaluate another snapshot explore the folder models_dir for saved snapshots
model = PPO.load(f"{models_dir}/100000.0", env=env)

mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=10)
print(f"The mean reward is {mean_reward} with a standard deviation of {std_reward}")




Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
The mean reward is -0.8523809 with a standard deviation of 0.042324670494759895


In [8]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [12]:
# Two random agents play one game round
env_eval = make("connectx")
env_eval.run([agent1, "negamax"])

# Show the game
env_eval.render(mode="ipython")

In [95]:
'''
This module models the problem to be solved. In this  example, the problem is to
optimze a Connect X agent that plays Connect X against a randomly selected 
type of opponent. The player to start the is also randomly selected.
The playing field is divided into columns and rows. The default is size of 7 by 
6. The player alternately drop a piece into the grid and it falls down as far as
possible. The game is won when n pieces of a player are either horizontally, 
vertically or diagonally in a sequence that is not interrupted by an opponent's
piece. The game ends with a draw when all places of the playing field are filled
with pieces and neither the agent nor the opponent accomplished a winning 
sequence.
'''

import random
from enum import Enum
import pygame
import sys
from os import path
from kaggle_environments import make


class ConnectXOpponents(Enum):
    RANDOM = 'random'
    NEGAMAX = 'negamax'

class ConnectX:
    def __init__(self, rows=6, columns=7, inarow=4, fps=1):
        self.rows = rows
        self. columns = columns
        self.size = self.rows * self.columns
        self.fps=fps
        self.env = make("connectx", {"rows": self.rows, "columns": self.columns,
                                      "inarow": 4}, debug=True)

        self.trainer = self.trainer_choice()
        self.obs = self.trainer.reset()

    def reset(self, seed=None):
        # self.trainer = self.trainer_choice(True, True)
        self.obs = self.trainer.reset()
    
    def perform_action(self, column):
        self.obs, self.reward, self.done, self.info = self.trainer.step(column)
        return self.obs, self.reward, self.done, self.info
        pass
    
    def render(self):
        self.env.render(mode="ipython")
        pass
    
    def trainer_choice(self, opponent_choice="", player_to_start=0):
        # Selecting a random default agent.
        if opponent_choice == "":
            opponent = random.choice([*self.env.agents])            
        else: 
            assert opponent_choice in ConnectXOpponents
            opponent = opponent_choice
        # Training agent in first position against a random default agent.
        if player_to_start == 0:
            if random.randint(0, 1):
                print(f"You will start the game against {opponent}.")
                trainer = self.env.train([None, opponent])                
            # Training agent in second position against a random default agent.
            else:
                trainer = self.env.train([opponent, None])
                print(f"Your opponent {opponent} will start the game.")
        else:
            assert player_to_start in [1,2]
            if player_to_start == 1:
                print(f"You will start the game against {opponent}.")
                trainer = self.env.train([None, opponent])
            else:
                trainer = self.env.train([opponent, None])
                print(f"Your opponent {opponent} will start the game.")

        return trainer

In [108]:
# For unit testing
if __name__=="__main__":
    connectx = ConnectX(rows=6, columns=7)

    running = True

    while(running):
        rand_action = random.randint(0, connectx.columns - 1)
        # print(rand_action)
        obs, reward, done, info = connectx.perform_action(rand_action)
        if done:
            if reward == 1: print("You won!")
            elif reward == 0: print("Just a draw?")
            else: print("You lost...")
            running = False

    connectx.render()

Your opponent random will start the game.
Invalid Action: Invalid column: 5
You lost...
