In [6]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/6_epochs_chess_cnn/pytorch/default/1/move_mappings.pkl
/kaggle/input/6_epochs_chess_cnn/pytorch/default/1/9channel_CE-LOSS_model_6_epochs_lr_0-0001.pth
/kaggle/input/chess-games/chess_games_processed_new.csv


In [7]:
import chess
import numpy as np
import torch
import torch.nn as nn
import pandas as pd
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
class ChessCNNModule(nn.Module):
    def __init__(self, num_classes):
        super(ChessCNNModule, self).__init__()
        # conv1 -> relu -> conv2 -> relu -> flatten -> fc1 -> relu -> fc2
        self.conv1 = nn.Conv2d(9, 64, kernel_size=3, padding=1) # shape of input data is (9,8,8)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(8 * 8 * 128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

        nn.init.kaiming_uniform_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_uniform_(self.conv2.weight, nonlinearity='relu')
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)  # Outputs raw logits
        return x

    def createBoardRep(self, board):
        '''
    Takes as an argument a pychess board object representing the current position of all current pieces.
    White pieces are represented by '1', black pieces by '-1'

    Returns: Stacked Layers (np.array) of each chess piece represented on a 8x8 np array

    Example:
    reps = createBoardRep(board)
    print(reps[0])
    -->
    [[ 0  0  0  0  0  0  0  0]
    [-1 -1 -1 -1 -1 -1 -1 -1]
    [ 0  0  0  0  0  0  0  0]
    [ 0  0  0  0  0  0  0  0]
    [ 0  0  0  0  0  0  0  0]
    [ 0  0  0  0  0  0  0  0]
    [ 1  1  1  1  1  1  1  1]
    [ 0  0  0  0  0  0  0  0]]

    '''
        pieces = ['p', 'r', 'n', 'b', 'q', 'k']
        layers = []

        for piece in pieces:
            layer = [[0 for _ in range(8)] for _ in range(8)]
            for square, piece_obj in board.piece_map().items():
                piece_type = piece_obj.symbol().lower()
                if piece_type == piece:
                    row = 7 - (square // 8)
                    col = square % 8
                    layer[row][col] = -1 if piece_obj.color == chess.BLACK else 1

            layers.append(layer)

        from_layer = np.zeros((8, 8), dtype=int)
        for move in board.legal_moves:
          from_square = move.from_square
          row = 7 - (from_square // 8)
          col = from_square % 8
          from_layer[row][col] = 1
        layers.append(from_layer)

        # "To" squares layer
        to_layer = np.zeros((8, 8), dtype=int)
        for move in board.legal_moves:
          to_square = move.to_square
          row = 7 - (to_square // 8)
          col = to_square % 8
          to_layer[row][col] = 1
        layers.append(to_layer)

        # Turn indicator layer
        turn_layer = np.full((8, 8), 1 if board.turn == chess.WHITE else -1, dtype=int)
        layers.append(turn_layer)

        return np.stack(layers)  # Shape: (9, 8, 8)

In [None]:
class ChessBoardEncoder:
    def createBoardRep(self, board):
        '''
    Takes as an argument a pychess board object representing the current position of all current pieces.
    White pieces are represented by '1', black pieces by '-1'

    Returns: Stacked Layers (np.array) of each chess piece represented on a 8x8 np array

    Example:
    reps = createBoardRep(board)
    print(reps[0])
    -->
    [[ 0  0  0  0  0  0  0  0]
    [-1 -1 -1 -1 -1 -1 -1 -1]
    [ 0  0  0  0  0  0  0  0]
    [ 0  0  0  0  0  0  0  0]
    [ 0  0  0  0  0  0  0  0]
    [ 0  0  0  0  0  0  0  0]
    [ 1  1  1  1  1  1  1  1]
    [ 0  0  0  0  0  0  0  0]]

    '''
        pieces = ['p', 'r', 'n', 'b', 'q', 'k']
        layers = []

        for piece in pieces:
            layer = [[0 for _ in range(8)] for _ in range(8)]
            for square, piece_obj in board.piece_map().items():
                piece_type = piece_obj.symbol().lower()
                if piece_type == piece:
                    row = 7 - (square // 8)
                    col = square % 8
                    layer[row][col] = -1 if piece_obj.color == chess.BLACK else 1

            layers.append(layer)

        from_layer = np.zeros((8, 8), dtype=int)
        for move in board.legal_moves:
          from_square = move.from_square
          row = 7 - (from_square // 8)
          col = from_square % 8
          from_layer[row][col] = 1
        layers.append(from_layer)

        # "To" squares layer
        to_layer = np.zeros((8, 8), dtype=int)
        for move in board.legal_moves:
          to_square = move.to_square
          row = 7 - (to_square // 8)
          col = to_square % 8
          to_layer[row][col] = 1
        layers.append(to_layer)

        # Turn indicator layer
        turn_layer = np.full((8, 8), 1 if board.turn == chess.WHITE else -1, dtype=int)
        layers.append(turn_layer)

        return np.stack(layers)  # Shape: (9, 8, 8)






    def create_move_mapping(games):
      all_moves = set()
      white_moves = 0
      black_moves = 0

      for game_idx, game in enumerate(games):
          board = chess.Board()
          moves = game.split()

          for move in moves:
              try:
                  uci_move = board.push_san(move).uci()  # Convert to UCI

                  if board.turn == chess.WHITE:
                      black_moves += 1  
                  else:
                      white_moves += 1  

                  if uci_move not in all_moves:
                      all_moves.add(uci_move)

                    
                      print(f"Game {game_idx + 1}: Added move {uci_move} | Total moves: {len(all_moves)}")
                      print(f"White moves count: {white_moves}, Black moves count: {black_moves}")

              except ValueError:
                  print(f"Skipping invalid move: {move}")
                  continue  

      move_to_int = {move: idx for idx, move in enumerate(sorted(all_moves))}
      int_to_move = {idx: move for move, idx in move_to_int.items()}

      print("\nFinal Move Mapping Summary:")
      print(f"Total unique moves stored: {len(all_moves)}")
      print(f"White moves count: {white_moves}, Black moves count: {black_moves}")
      print(f"Ratio (White/Black): {white_moves / max(black_moves, 1):.2f}")  # Avoid division by zero

      print("\nLast 10 Move Mappings:")
      for move, idx in list(move_to_int.items())[-10:]:
          print(f"{move} -> {idx}")

      return move_to_int, int_to_move



    def getMoveMateIn1(self, board):
        board = board.copy()
        for move in board.legal_moves:
            board.push_uci(str(move))
            if board.is_checkmate():
                return move
            board.pop()

        return None

    def getMoveMateIn2(self, board):
        board = board.copy()

        current_player = board.turn  

        for move in board.legal_moves:
            

            board.push_uci(str(move))  

            
            for opponent_move in board.legal_moves:
                board.push_uci(str(opponent_move))  # The opponent responds
                
                # Now check if the current player can deliver checkmate in 1 after the opponent's move
                mate_in_1_move = self.getMoveMateIn1(board)
                #print(f"Mate in 1 for this sequence: {mate_in_1_move}")

                if mate_in_1_move:
                    board.pop()  # Undo the opponent's move
                    board.pop()  # Undo the current player's move
                    #if current_player:
                    #    print(f"Found Mate in 2 for White: {move} followed by {mate_in_1_move}")
                    #else:
                    #    print(f"Found Mate in 2 for Black: {move} followed by {mate_in_1_move}")
                    #return move, mate_in_1_move  # Return the CONSECUTIVE moves of the same player color leading to Mate in 2

                board.pop()

            board.pop()

        print("No mate in 2 found")
        return None

In [None]:
torch.manual_seed(55)
np.random.seed(55) #delete or change later for more randomness
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class ChessDataset(Dataset):
    def __init__(self, games, model, move_to_int):
        super(ChessDataset, self).__init__()
        self.games = games
        self.model = model
        self.move_to_int = move_to_int
        self.num_classes = len(move_to_int)



    def __len__(self):
        #return 50_000
        return len(self.games)

    def __getitem__(self, idx):
        row = self.games[idx]
        moves = row.split()
        num_moves = len(moves)

        # Generate a random number between 1 and the number of chess moves
        random_move_index = np.random.randint(1, num_moves)
        #print("random num is :", random_move_index)

        features = moves[0:random_move_index]
        label_move = moves[random_move_index]
        #print("features are: ", features)
        #print("label move is:", label_move)

        board = chess.Board()

        for move in features:
            try:
                board.push_san(move)
            except ValueError:
                print(f"Invalid move in dataset: {move}")
                raise ValueError(f"Invalid move in dataset: {move}")

        # Convert SAN move to UCI
        try:
            board.push_san(label_move)  # Apply the move
            label_move_uci = board.peek().uci()  # Get last move in UCI format
            board.pop()  # Undo the move to keep board unchanged
        except ValueError:
            print(f"Invalid label move in dataset or unable to convert to UCI: {label_move}")
            raise ValueError(f"Invalid label move move: {label_move}")

        x = torch.tensor(self.model.createBoardRep(board), dtype=torch.float32).to(device)

        if label_move_uci in self.move_to_int:
            y = torch.tensor(self.move_to_int[label_move_uci], dtype=torch.long).to(device)  # Categorical label y can be between (1, len(move_to_int)]
        else:
            print(f"[WARNING] Move {label_move_uci} not in move_to_int. Assigning random index.")
            y = torch.tensor(np.random.randint(0, self.num_classes), dtype=torch.long).to(device)


        return x,y

In [11]:
import pickle
def load_move_mappings(filename="/kaggle/input/6_epochs_chess_cnn/pytorch/default/1/move_mappings.pkl"):
    with open(filename, "rb") as f:
        data = pickle.load(f)
    print(f"Move mappings loaded from {filename}")
    return data["move_to_int"], data["int_to_move"]

move_to_int, int_to_move = load_move_mappings()

Move mappings loaded from /kaggle/input/6_epochs_chess_cnn/pytorch/default/1/move_mappings.pkl


In [None]:
import time
from tqdm import tqdm

csv_path = "/kaggle/input/chess-games/chess_games_processed_new.csv"
df = pd.read_csv(csv_path)
# Extract all moves from the dataset
games = df["AN"].tolist()

print("using cuda:", torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_classes = len(move_to_int)
model = ChessCNNModule(num_classes).to(device)
dataset = ChessDataset(games, model, move_to_int)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)


num_epochs = 10
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Training loop
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0

    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device).long()

        optimizer.zero_grad()
        outputs = model(inputs)  # Raw logits

        loss = criterion(outputs, labels)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  

        optimizer.step()
        running_loss += loss.item()

    end_time = time.time()
    minutes = int((end_time - start_time) // 60)
    seconds = int((end_time - start_time) % 60)

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s")


torch.save(model, "9channel_CE-LOSS_model_26_epochs_lr_0-0001.pth")

using cuda: True


100%|██████████| 13803/13803 [23:04<00:00,  9.97it/s]


Epoch 1/10, Loss: 4.0327, Time: 23m4s


100%|██████████| 13803/13803 [22:59<00:00, 10.00it/s]


Epoch 2/10, Loss: 3.0219, Time: 22m59s


100%|██████████| 13803/13803 [22:59<00:00, 10.00it/s]


Epoch 3/10, Loss: 2.7384, Time: 22m59s


100%|██████████| 13803/13803 [23:07<00:00,  9.95it/s]


Epoch 4/10, Loss: 2.5963, Time: 23m7s


100%|██████████| 13803/13803 [23:01<00:00,  9.99it/s]


Epoch 5/10, Loss: 2.5110, Time: 23m1s


100%|██████████| 13803/13803 [22:59<00:00, 10.00it/s]


Epoch 6/10, Loss: 2.4557, Time: 22m59s


100%|██████████| 13803/13803 [23:13<00:00,  9.91it/s]


Epoch 7/10, Loss: 2.4143, Time: 23m13s


100%|██████████| 13803/13803 [23:07<00:00,  9.95it/s]


Epoch 8/10, Loss: 2.3853, Time: 23m7s


100%|██████████| 13803/13803 [23:02<00:00,  9.98it/s]


Epoch 9/10, Loss: 2.3537, Time: 23m2s


100%|██████████| 13803/13803 [23:02<00:00,  9.99it/s]

Epoch 10/10, Loss: 2.3328, Time: 23m2s



