In [107]:
import pandas as pd
import numpy as np
import chess
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import GroupShuffleSplit
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

In [108]:
scaler = StandardScaler()
le = LabelEncoder()

BATCH_SIZE = 16
LEARNING_RATE = 0.001
EPOCHS = 40

In [109]:
# Function to convert the board to a flat list
def board_to_flat_list(board):
    # Get the board as a string in ASCII format
    board_string = board.__str__()
    # Split the board string into rows
    rows = board_string.split("\n")
    # Initialize an empty list to store the flat board
    flat_list = []
    # Iterate through each row
    for row in rows:
        # Split the row by spaces to get individual pieces
        pieces = row.split(" ")
        # Extend the flat list with the pieces
        flat_list.extend(pieces)

    # Replace "." with None
    flat_list = [None if piece == "." else piece for piece in flat_list]
    return flat_list


# Vocabulary
def get_vocabulary() -> tuple[dict, dict]:
    fresh_chess_board = chess.Board()
    # Get all unique characters in fresh_chess_board
    unique_characters = list(set(fresh_chess_board.board_fen()))
    unique_characters = [char for char in unique_characters if char.isalpha()]
    unique_characters.sort()
    vocabulary_dict = {char: i+1 for i, char in enumerate(unique_characters)}
    vocabulary_dict["EMPTY"] = -1
    reverse_vocabulary_dict = {i: char for char, i in vocabulary_dict.items()}
    return vocabulary_dict, reverse_vocabulary_dict


# For all values in X, transform string to int using vocabulary_dict
def encode_df(df: pd.DataFrame, vocabulary_dict: dict) -> pd.DataFrame:
    df = df.fillna("EMPTY")
    df = df.map(lambda x: vocabulary_dict[x] if x in vocabulary_dict else x)
    return df


def decode_model_prediction(prediction_list: list, reverse_vocabulary_dict: dict) -> list[str]:
    ret_list = [reverse_vocabulary_dict[i] for i in prediction_list]
    # If ret_list contains "EMPTY", replace it with None
    ret_list = [None if x == "EMPTY" else x for x in ret_list]
    return ret_list

In [110]:
# Create dataset
def determine_game_stage(move_number):
    # 0-10 early game
    # 11-30 mid game
    # 31+ end game
    if move_number <= 10:
        return 1
    elif move_number <= 30:
        return 2
    else:
        return 3

def create_dataset(lichess_username: str) -> pd.DataFrame:
    data_df = pd.read_csv(f"../data/raw/games_{lichess_username}.csv")
    output_df = pd.DataFrame()
    game_list = []

    game_id_stage_count_map = {}
    for game_id in data_df["game_id"].unique():
        game_id_stage_count_map[str(game_id)] = 0

    for idx, row in data_df.iterrows():
        game_id = str(row.get("game_id"))
        white_player = row.get("white_player")
        move_list = row.get("move_list")

        if isinstance(move_list, float):
            continue

        move_list = move_list.split(" ")

        generator_start_index = 0 if white_player == lichess_username else 1
        for move_idx in range(generator_start_index, len(move_list), 2):
            game_id_stage_count_map[game_id] += 1
            move_number = game_id_stage_count_map.get(game_id)
            game_stage = determine_game_stage(move_number)

            target_move = move_list[move_idx]
            input_sequence_list = move_list[:move_idx]

            board = chess.Board()
            for input_move in input_sequence_list:
                board.push_san(input_move)

            board_flat_list = [game_id]
            board_flat_list.extend(board_to_flat_list(board))
            board_flat_list.append(game_stage)
            board_flat_list.append(target_move)
            game_list.append(board_flat_list)

    output_df = pd.DataFrame(game_list)
    # Rename first col to game_id and last col to target_move
    output_df = output_df.rename(
        columns={
            0: "game_id",
            len(output_df.columns)-1: "target_move",
            len(output_df.columns)-2: "game_stage"
        }
    )
    return output_df


# Split dataset into train and validation
def get_dataset_split(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    # Assuming df is your DataFrame and has columns for features and 'game_id'
    X = df.drop(['target_move', 'game_id'], axis=1).values  # Features
    X = scaler.fit_transform(X)

    y = df['target_move'].values
    y = le.fit_transform(y)

    # Convert to PyTorch tensors
    X = torch.tensor(X, dtype=torch.float32)
    y = torch.tensor(y, dtype=torch.long)  # Use long for classification targets

    # Use GroupShuffleSplit to keep games together
    gss = GroupShuffleSplit(test_size=0.2, n_splits=1, random_state=0)
    train_idx, val_idx = next(gss.split(X, y, groups=df['game_id']))

    X_train, X_val = X[train_idx], X[val_idx]
    y_train, y_val = y[train_idx], y[val_idx]

    print("X_train")
    display(X_train)
    print("y_train")
    display(y_train)

    print("Shapes:", X_train.shape, X_val.shape, y_train.shape, y_val.shape)
    return X_train, X_val, y_train, y_val

In [111]:
# Create custom datasets
class ChessDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]
    
class ChessLSTMN(nn.Module):
    def __init__(self, num_features, num_classes, hidden_dim, seq_length, num_layers=1):
        super(ChessLSTMN, self).__init__()
        # LSTM specific parameters
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.seq_length = seq_length
        
        # LSTM layer
        self.lstm = nn.LSTM(num_features, hidden_dim, num_layers, batch_first=True)
        
        # Fully connected layers
        self.fc1 = nn.Linear(hidden_dim, 1024)  # Adjusted for LSTM output
        self.bn1 = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, 512)
        self.bn2 = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 256)
        self.bn3 = nn.BatchNorm1d(256)
        self.fc4 = nn.Linear(256, num_classes)
        
        # Dropout layer
        self.dropout = nn.Dropout(p=0.3)

    def forward(self, x):
        # LSTM layer
        # Forward propagate LSTM
        out, _ = self.lstm(x)  # out: tensor of shape (batch_size, seq_length, hidden_dim)
        
        # Reshape output from the LSTM layer
        out = out.reshape(out.shape[0], -1)  # Reshape to fit the following dense layer
        
        # Fully connected layers
        out = F.relu(self.bn1(self.fc1(out)))
        out = self.dropout(out)
        out = F.relu(self.bn2(self.fc2(out)))
        out = self.dropout(out)
        out = F.relu(self.bn3(self.fc3(out)))
        out = self.fc4(out)  # No activation before CrossEntropyLoss
        return out
    
def train_and_test_data_loader(X_train, X_test, y_train, y_test):
    # Create dataset and dataloader
    train_dataset = ChessDataset(X_train, y_train)
    test_dataset = ChessDataset(X_test, y_test)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    return train_loader, test_loader

In [112]:
# Train model
def train_model(df: pd.DataFrame):
    X_train, X_val, y_train, y_val = get_dataset_split(df)

    num_features = X_train.shape[1]
    num_classes = len(le.classes_)
    hidden_dim = 128  # Example, you can adjust this
    seq_length = 10   # Example, adjust based on your data and how many moves back you want to consider
    num_layers = 2    # Example, adjust as needed
    print("Num features:", num_features)
    print("Num classes:", num_classes)
    model = ChessLSTMN(num_features, num_classes, hidden_dim, seq_length, num_layers)

    train_loader, test_loader = train_and_test_data_loader(X_train, X_val, y_train, y_val)

    # Set up loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # Training loop
    for epoch in range(EPOCHS):
        model.train()
        running_loss = 0.0
        running_acc = 0.0
        for features, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * features.size(0)
            running_acc += (outputs.argmax(1) == labels).float().sum()
        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_acc / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{EPOCHS}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')

    # Evaluation mode
    model.eval() 
    total, correct = 0, 0
    with torch.no_grad():
        for features, labels in test_loader:
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f'Accuracy: {100 * correct / total:.2f}%')

    return model


In [113]:
def save_model(model: nn.Module, model_name: str):
    torch.save(model, model_name)

In [114]:
lichess_username = "ritutoshniwal"
vocabulary_dict, reverse_vocabulary_dict = get_vocabulary()

dataset_df = create_dataset(lichess_username)
dataset_df = encode_df(dataset_df, vocabulary_dict)
model = train_model(dataset_df)

save_model(model, "../models/chess_nn_model.pth")

X_train


tensor([[ 0.8052,  1.7095,  1.0135,  ...,  1.2567,  1.2591, -1.2115],
        [ 0.8052,  1.7095,  1.0135,  ...,  1.2567,  1.2591, -1.2115],
        [ 0.8052,  1.7095,  1.0135,  ...,  1.2567,  1.2591, -1.2115],
        ...,
        [-1.2527, -0.5491, -0.8329,  ...,  0.7127, -0.8031,  1.5685],
        [-1.2527, -0.5491, -0.8329,  ..., -0.9195, -0.8031,  1.5685],
        [-1.2527, -0.5491, -0.8329,  ..., -0.9195, -0.8031,  1.5685]])

y_train


tensor([1763,  517, 1785,  ...,  289,  418,  480])

Shapes: torch.Size([42960, 65]) torch.Size([11109, 65]) torch.Size([42960]) torch.Size([11109])
Num features: 65
Num classes: 1938
Epoch 1/40, Loss: 5.8694, Accuracy: 0.1009
Epoch 2/40, Loss: 5.3894, Accuracy: 0.1488
Epoch 3/40, Loss: 5.1039, Accuracy: 0.1752
Epoch 4/40, Loss: 4.8486, Accuracy: 0.1989
Epoch 5/40, Loss: 4.6200, Accuracy: 0.2177
Epoch 6/40, Loss: 4.4107, Accuracy: 0.2352
Epoch 7/40, Loss: 4.2140, Accuracy: 0.2528
Epoch 8/40, Loss: 4.0369, Accuracy: 0.2672
Epoch 9/40, Loss: 3.8620, Accuracy: 0.2837
Epoch 10/40, Loss: 3.7123, Accuracy: 0.3004
Epoch 11/40, Loss: 3.5610, Accuracy: 0.3157
Epoch 12/40, Loss: 3.4268, Accuracy: 0.3320
Epoch 13/40, Loss: 3.3002, Accuracy: 0.3467
Epoch 14/40, Loss: 3.1834, Accuracy: 0.3620
Epoch 15/40, Loss: 3.0634, Accuracy: 0.3780
Epoch 16/40, Loss: 2.9739, Accuracy: 0.3910
Epoch 17/40, Loss: 2.8754, Accuracy: 0.4053
Epoch 18/40, Loss: 2.7773, Accuracy: 0.4208
Epoch 19/40, Loss: 2.7078, Accuracy: 0.4313
Epoch 20/40, Loss: 2.6131, Accuracy: 0.443