<a href="https://colab.research.google.com/github/julienif/julifsChessBot/blob/main/julifsChessBot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!pip3 install torchrl
!pip3 install gym[mujoco]
!pip3 install gymnasium
!pip3 install tqdm
!pip3 install chess
!pip install torch torchvision torchaudio
!pip3 install tensorflow



In [6]:
# imports

In [57]:
import warnings
warnings.filterwarnings("ignore")
from torch import multiprocessing

import numpy as np
import chess
import torch
import tensorflow as tf
import gymnasium as gym
from torchrl.envs.libs.gym import GymWrapper
from torch import nn
from tensordict.nn import TensorDictModule
from tensordict.nn.distributions import NormalParamExtractor
from tensordict.tensordict import TensorDict
from gym import spaces
from torchrl.envs.utils import check_env_specs, ExplorationType, set_exploration_type
from torchrl.envs import (Compose, DoubleToFloat, ObservationNorm, StepCounter, TransformedEnv)


In [58]:
# chess RL environment

In [78]:
class ChessEnv(gym.Env):
  def __init__(self, color_trained='white', device='cpu'):
    super(ChessEnv, self).__init__()
    self.device = device
    self.color_trained = color_trained
    self.board = chess.Board()
    self.action_space = spaces.Discrete(4096) # assume max 4096 legal moves
    self.observation_space = spaces.Box(low=0, high=1, shape=(8, 8, 13))
    self.reward_range = (-1, 1)

  def reset(self, seed=None, options=None):
    super().reset(seed=seed)
    self.board.reset()
    observation = self._get_observation()
    return observation

  def step(self, action):
    legal_moves = list(self.board.legal_moves)
    if 0 <= action < len(legal_moves):
      uci_move = legal_moves[action].uci()
      self.board.push(chess.Move.from_uci(uci_move))

    obs = self._get_observation()
    reward = self._get_reward()
    done = self.board.is_game_over()
    info = {}

    return obs, reward, done, info

  def render(self):
    print(self.board)
    return None

  def close(self):
    pass

  def _get_observation(self):
    board_state = np.zeros((8, 8, 13), dtype=np.float64)

    piece_map = {
      'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
      'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11,
      '.': 12
    }

    for square in chess.SQUARES:
      piece = self.board.piece_at(square)
      if piece is not None:
        idx = piece_map[piece.symbol()]
        board_state[chess.square_rank(square), chess.square_file(square), idx] = 1
      else:
        board_state[chess.square_rank(square), chess.square_file(square), 12] = 1

    return board_state

  def _get_reward(self):
    piece_value = {
        'P': 1,
        'R': 5,
        'N': 3,
        'B': 3,
        'Q': 9,
        'K': 0,
        'p': 1,
        'r': 5,
        'n': 3,
        'b': 3,
        'q': 9,
        'k': 0,
    }
    # might change this as we would have a constant negative reward if the bot is lead in the game
    reward = 0.0
    if self.board.is_checkmate():
      if self.board.turn == chess.WHITE: # black won
        reward = -1.0
      else: # white won
        reward = 1.0
    elif self.board.is_stalemate() or self.board.is_insufficient_material() or self.board.is_seventyfive_moves():
      reward = 0.0
    else:
      white_score = sum([piece_value[piece.symbol()] for piece in self.board.piece_map().values() if piece.color == chess.WHITE])
      black_score = sum([piece_value[piece.symbol()] for piece in self.board.piece_map().values() if piece.color == chess.BLACK])

      material_adv = white_score - black_score
      norm_material_adv = material_adv / 39.0 # 39 is the max mat diff

      total_moves = len(list(self.board.move_stack))
      progress = min(total_moves / 150.0, 1.0) # 150 moves is the average game

      reward = 0.7 * norm_material_adv + 0.3 * progress

    return reward if self.color_trained == 'white' else -reward

In [79]:
# implement training using PPO

In [81]:
# define hyper parameters

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)
print(device)
num_cells = 256  # number of cells in each layer i.e. output dim.
lr = 3e-4
max_grad_norm = 1.0

frames_per_batch = 1000
# For a complete training, bring the number of frames up to 1M
total_frames = 50_000

sub_batch_size = 64  # cardinality of the sub-samples gathered from the current data in the inner loop
num_epochs = 10  # optimization steps per batch of data collected
clip_epsilon = (
    0.2  # clip value for PPO loss
)
gamma = 0.99
lmbda = 0.95
entropy_eps = 1e-4

# init environment
chess_env = GymWrapper(ChessEnv(color_trained='white', device=device))

norm_chess_env = TransformedEnv(
    chess_env,
    Compose(
        # normalize observations
        ObservationNorm(in_keys=["observation"]),
        DoubleToFloat(),
        StepCounter(),
    ),
)

norm_chess_env.transform[0].init_stats(num_iter=1000, reduce_dim=0, cat_dim=0)

check_env_specs(norm_chess_env)























































cuda:0


2024-07-10 22:40:48,890 [torchrl][INFO] check_env_specs succeeded!
