In [1]:
import chess
import chess.pgn
import numpy as np
import pickle
import requests
import time
import torch
import torch.optim as optim
from torch.utils.data import DataLoader 
from tqdm import tqdm



In [2]:
import os

if not os.path.exists('games.pgn'):
    url = "https://lichess.org/api/games/user/Inamir?tags=true&clocks=false&evals=false&opening=false&literate=false&since=1669849200000&until=1734562800000&perfType=bullet%2Cblitz%2Crapid%2Cclassical%2Cstandard"
    response = requests.get(url)

    with open('games.pgn', 'wb') as f:
        f.write(response.content)

In [None]:
pgn = open("games.pgn")

games = []

while True:
    game = chess.pgn.read_game(pgn)
    if game is None:  # End of file
        break
    games.append(game)

print(f"Loaded {len(games)} games")


# Creating the dataset

#TO DO Explain the board encoding


We will fill the tensor X with the board state converted to matrix notation and y with the move I played in that situation.

In case I played as white we only train on pair moves and vice versa for black in order to train exclusively on my moves and not my opponents'.

In [14]:
from lib.utils import board_to_matrix, prepare_input 
X = []
y_str = []

for game in games:
    board = game.board()
    ina_is_white = games[0].headers["White"] == "Inamir"

    for i, move in enumerate(game.mainline_moves()):
            if (ina_is_white and i % 2 == 0) or (not ina_is_white and i % 2 == 1):
                X.append(board_to_matrix(board))
                y_str.append(move.uci())
            board.push(move)  

X, y_str = np.array(X, dtype=np.float32), np.array(y_str) 

Create a mapping to turn UCI notation strings into integers. 
For instance :
 - d2d4 => 0
 - c1f4 => 1
 - ...

In [15]:
move_to_int = {move: idx for idx, move in enumerate(set(y_str))}

Convert `y` using the dictionary

In [16]:
y_int = np.array([move_to_int[move] for move in y_str], dtype=np.float32)

For comparison purposes :

In [None]:
for i in range(0,5):
    print(f'{y_str[i]} => {y_int[i]}')

Preparing input for PyTorch

In [None]:
X = torch.from_numpy(X)
y = torch.tensor(y_int, dtype=torch.long)

In [20]:
from torch.utils.data import Dataset


class ChessDataset(Dataset):

    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [21]:
import torch.nn as nn

class ChessModel(nn.Module):
    def __init__(self, num_classes):
        super(ChessModel, self).__init__()
        # conv1 -> relu -> batchnorm -> conv2 -> relu -> batchnorm -> flatten -> dropout -> fc1 -> relu -> dropout -> fc2
        self.conv1 = nn.Conv2d(13, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)  # Batch normalization for conv1
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)  # Batch normalization for conv2
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(8 * 8 * 128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.5)  # Dropout with 50% probability

        # Initialize weights
        nn.init.kaiming_uniform_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_uniform_(self.conv2.weight, nonlinearity='relu')
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))  # Add batch normalization after conv1
        x = self.relu(self.bn2(self.conv2(x)))  # Add batch normalization after conv2
        x = self.flatten(x)
        x = self.dropout(self.relu(self.fc1(x)))  # Add dropout after fc1
        x = self.fc2(x)  # Output raw logits
        return x

In [None]:
# Create Dataset and DataLoader
dataset = ChessDataset(X, y)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Model Initialization
model = ChessModel(num_classes=len(move_to_int)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
num_epochs = 50
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        outputs = model(inputs)  # Raw logits

        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()
    epoch_time = end_time - start_time
    minutes: int = int(epoch_time // 60)
    seconds: int = int(epoch_time) - minutes * 60
    print(f'Epoch {epoch + 1 + 50}/{num_epochs + 1 + 50}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s')

In [24]:
# Save the model
torch.save(model.state_dict(), "./model/rle_100epochs.pth")

with open("./model/move_to_int", "wb") as file:
    pickle.dump(move_to_int, file)

In [None]:
# Load the mapping

with open("model/move_to_int", "rb") as file:
    move_to_int = pickle.load(file)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Load the model
model = ChessModel(num_classes=len(move_to_int))
model.load_state_dict(torch.load("model/rle_100epochs.pth"))
model.to(device)
model.eval()  # Set the model to evaluation mode (it may be reductant)

int_to_move = {v: k for k, v in move_to_int.items()}
# Function to make predictions
def predict_move(board: chess.Board):
    X_tensor = prepare_input(board).to(device)
    
    with torch.no_grad():
        logits = model(X_tensor)
    
    logits = logits.squeeze(0)  # Remove batch dimension
    
    probabilities = torch.softmax(logits, dim=0).cpu().numpy()  # Convert to probabilities
    legal_moves = list(board.legal_moves)
    legal_moves_uci = [move.uci() for move in legal_moves]
    sorted_indices = np.argsort(probabilities)[::-1]
    for move_index in sorted_indices:
        move = int_to_move[move_index]
        if move in legal_moves_uci:
            return move
    
    return None

In [26]:
board = chess.Board()

In [None]:
board.fen()

In [None]:
best_move = predict_move(board)
board.push_uci(best_move)
board

In [None]:
board.push_uci("e4f3")
best_move = predict_move(board)
board.push_uci(best_move)
board