<a href="https://colab.research.google.com/github/natisitotaw/Chess-Engine/blob/main/train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn
import torch.optim as optim

In [None]:
# Install the chess library for handling PGN files
!pip install chess



In [None]:
import os
import time
from chess import pgn


In [None]:

# Install zstandard for decompressing zst files
!pip install zstandard




In [None]:
import os
import requests
import zstandard as zstd

# URL of the zst file containing PGN chess games
url = 'https://database.lichess.org/standard/lichess_db_standard_rated_2013-01.pgn.zst'

# Directory to save the downloaded file and extract it
directory = 'data/pgn'

# Create the directory if it doesn't exist
os.makedirs(directory, exist_ok=True)

# Path to save the downloaded zst file
zst_file_path = os.path.join(directory, 'lichess_elite_2020-08.pgn.zst')

# Download the zst file
response = requests.get(url)
with open(zst_file_path, 'wb') as f:
    f.write(response.content)

# Path to save the decompressed PGN file
pgn_file_path = os.path.join(directory, 'lichess_elite_2020-08.pgn')

# Decompress the zst file into a PGN file
with open(zst_file_path, 'rb') as compressed_file, open(pgn_file_path, 'wb') as decompressed_file:
    dctx = zstd.ZstdDecompressor()
    dctx.copy_stream(compressed_file, decompressed_file)

# Remove the downloaded zst file to save space
os.remove(zst_file_path)

print(f"File downloaded, decompressed, and placed in '{directory}'.")

File downloaded, decompressed, and placed in 'data/pgn'.


In [None]:
!pip install tqdm



In [None]:
from tqdm import tqdm
def load_pgn(file_path):
    """
    Load chess games from a PGN file.

    Args:
        file_path (str): Path to the PGN file.

    Returns:
        list: List of chess games.
    """
    games = []
    with open(file_path, 'r') as pgn_file:
        while True:
            game = pgn.read_game(pgn_file)
            if game is None:
                break
            games.append(game)
    return games

files = [file for file in os.listdir("data/pgn") if file.endswith(".pgn")]
LIMIT_OF_FILES = min(len(files), 28)
games = []
i = 1
for file in tqdm(files):
    games.extend(load_pgn(f"data/pgn/{file}"))
    if i >= LIMIT_OF_FILES:
        break
    i += 1

  0%|          | 0/1 [06:53<?, ?it/s]


In [None]:
print(f"GAMES PARSED: {len(games)}")

GAMES PARSED: 121332


In [None]:
import numpy as np
from chess import Board

def board_to_matrix(board: Board) -> np.ndarray:
    """
    Convert a chess board to a 3D matrix representation.

    Args:
        board (Board): Chess board object from the chess library.

    Returns:
        np.ndarray: 3D matrix representing the board state.
    """
    # Initialize a matrix of shape (13, 8, 8) to represent different pieces and legal moves
    matrix = np.zeros((13, 8, 8), dtype=np.uint8)
    piece_map = board.piece_map()

    # Populate the matrix with piece information
    for square, piece in piece_map.items():
        row, col = divmod(square, 8)
        piece_type = piece.piece_type - 1
        piece_color = 0 if piece.color else 6
        matrix[piece_type + piece_color, row, col] = 1

    # Populate the 13th layer of the matrix with legal moves
    legal_moves = board.legal_moves
    for move in legal_moves:
        to_square = move.to_square
        row_to, col_to = divmod(to_square, 8)
        matrix[12, row_to, col_to] = 1

    return matrix

def create_input_for_nn(games) -> tuple[np.ndarray, np.ndarray]:
    """
    Prepare the data for training by converting game moves to input matrices and labels.

    Args:
        games (list): List of chess games.

    Returns:
        tuple: Tuple of numpy arrays (X, y) where X is the input data and y is the labels.
    """
    X = []
    y = []
    move_to_int = {}  # Dictionary to map move to integer index
    move_idx = 0

    for game in games:
        board = game.board()
        for move in game.mainline_moves():
            board_matrix = board_to_matrix(board)
            X.append(board_matrix)

            move_str = move.uci()
            if move_str not in move_to_int:
                move_to_int[move_str] = move_idx
                move_idx += 1
            y.append(move_to_int[move_str])

            board.push(move)

    # Convert lists to numpy arrays
    X = np.array(X, dtype=np.uint8)
    y = np.array(y, dtype=np.uint16)  # Use uint16 if move indices are within 2^16

    return X, y

def encode_moves(moves) -> tuple[np.ndarray, dict]:
    """
    Encode moves into integer indices for model training.

    Args:
        moves (list): List of moves in string format.

    Returns:
        tuple: Encoded moves as numpy array and a dictionary mapping moves to indices.
    """
    move_to_int = {move: idx for idx, move in enumerate(set(moves))}
    encoded_moves = np.array([move_to_int[move] for move in moves], dtype=np.uint16)
    return encoded_moves, move_to_int


In [None]:

X, y = create_input_for_nn(games)

print(f"NUMBER OF SAMPLES: {len(y)}")

In [None]:
X = X[0:2500000]
y = y[0:2500000]

In [None]:
y, move_to_int = encode_moves(y)
num_classes = len(move_to_int)

In [None]:
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)

In [None]:
import torch.nn as nn


class ChessModel(nn.Module):
    def __init__(self, num_classes):
        """
        Initialize the neural network model for chess move prediction.

        Args:
            num_classes (int): Number of unique moves (output classes).
        """
        super(ChessModel, self).__init__()
        # Define the layers of the neural network
        self.conv1 = nn.Conv2d(13, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(8 * 8 * 128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

        # Initialize weights
        nn.init.kaiming_uniform_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_uniform_(self.conv2.weight, nonlinearity='relu')
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        """
        Forward pass through the network.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output logits from the network.
        """
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)  # Output raw logits
        return x

In [None]:
from dataset import ChessDataset

# Create Dataset and DataLoader
dataset = ChessDataset(X, y)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Model Initialization
model = ChessModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Traning

In [None]:
num_epochs = 50
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        outputs = model(inputs)  # Raw logits

        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()
    epoch_time = end_time - start_time
    minutes: int = int(epoch_time // 60)
    seconds: int = int(epoch_time) - minutes * 60
    print(f'Epoch {epoch + 1 + 50}/{num_epochs + 1 + 50}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s')

# Save the model and mapping

In [None]:
# Save the model
torch.save(model.state_dict(), "../../models/model.pth")
import pickle

with open("../../models/heavy_move_to_int", "wb") as file:
    pickle.dump(move_to_int, file)