In [2]:
import re
import csv
import chess
import numpy as np
import pandas as pd
from tqdm import tqdm
from itertools import islice
from datasets import load_dataset

# Data Arrangement

In [3]:
ds = load_dataset("Lichess/tournament-chess-games", split="train", streaming=True)

In [92]:
board = chess.Board()
board.fen()

'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1'

In [118]:
def extract_positions_and_evals(moves_str):
    fen_positions, evals = [], []
    board = chess.Board()
    moves = re.findall(r'. (o-o|o-o-o|O-O|O-O-O|\w{2,5})[+?!]*? { \[%eval (.*?)]', moves_str)
    for pose, val in moves:
        try:
            board.push_san(pose)
            if '#' in val:
                val = 1000 if not val.startswith('-') else -1000
            else:
                val = float(val)
            
            fen_positions.append(board.fen())
            evals.append(val)
        except:
            return [pose[0] for pose in moves], fen_positions, evals, 0
    
    return [pose[0] for pose in moves], fen_positions, evals, 1

In [5]:
sample_iter = islice(ds, 30_000)
sample_list = list(sample_iter)

In [119]:
all_moves, positions, labels, game_ids = [], [], [], []
success = 0
for game_id, row in tqdm(enumerate(sample_list)):
    moves_list, fen_list, eval_list, count = extract_positions_and_evals(row["movetext"])
    success+=count
    positions.extend(fen_list)
    labels.extend(eval_list)
    game_ids.extend([game_id] * len(fen_list))
    all_moves.extend(moves_list[:len(fen_list)])
    # break
print(success/30000)
df = pd.DataFrame({"id":game_ids, "fen": positions, "move": all_moves, "eval": labels})
# df = df.groupby('id').agg({'fen_current': list, 'move': list, 'eval': list, 'eval': list, 'fen_next':list})
df.to_csv("chess_positions.csv", index=False)

30000it [01:11, 417.08it/s] 


0.9695


# Fitting The Data To The Model

In [38]:
import chess
import torch
import numpy as np
from torch.nn.utils.rnn import pad_sequence

In [None]:
df = pd.read_csv("chess_positions.csv")

In [120]:
def fen_to_array(fen):
    board = chess.Board(fen)
    mapping = {
        'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
        'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11
    }
    arr = np.zeros((8, 8, 12), dtype=np.int8)
    for i in range(8):
        for j in range(8):
            piece = board.piece_at(i * 8 + j)
            if piece:
                arr[i, j, mapping[piece.symbol()]] = 1
    return arr

In [121]:
def fen_to_numeric(fen):
    arr = fen_to_array(fen)
    turn = np.full((8, 8, 1), int(chess.Board(fen).turn))
    return np.concatenate([arr, turn], axis=-1)

In [122]:
def fen_to_numeric(fen):
    arr = fen_to_array(fen)
    turn = np.full((8, 8, 1), int(chess.Board(fen).turn))
    return np.concatenate([arr, turn], axis=-1)

In [123]:
X = np.array([fen_to_numeric(f) for f in df["fen"]])
y = np.array(df["eval"])

In [102]:
all_moves = set(all_moves)

In [None]:
[item for sublist in list_2d for item in sublist]

In [117]:
all_moves = sorted(set([i for j in df["move"].tolist() for i in j]))
move2idx = {m: i for i, m in enumerate(all_moves)}
idx2move = {i: m for m, i in move2idx.items()}
num_moves = len(move2idx)

In [104]:
piece_map = {'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
             'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11}

In [105]:
def fen_to_onehot(fen):
    board = chess.Board(fen)
    mat = np.zeros((8,8,12), dtype=np.float32)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            row = 7 - chess.square_rank(square)
            col = chess.square_file(square)
            idx = piece_map[piece.symbol()]
            mat[row, col, idx] = 1.0
    return mat

In [106]:
def game_to_tensors(fens, vals):

    X_seq = [torch.tensor(fen_to_onehot(fen)) for fen in fens[:-1]]
    y_seq = []

    for i in range(len(fens)-1):
        move_vals = [(fens[i+1], vals[i+1])]
        best_move = max(move_vals, key=lambda x: x[1])[0]
        y_seq.append(move2idx[best_move])
    
    if len(X_seq)==0 or len(y_seq)==0:
        return [], [], 0
        
    return torch.stack(X_seq), torch.tensor(y_seq), 1

In [None]:
X_list = []
y_list = []
scores = []

for fen, move, val in zip(df["fen_current"], df["move"], df["eval"]):
        x_tensor = torch.tensor(fen_to_onehot(fen))
        y_tensor = torch.tensor(move2idx[move])
        X_list.append(x_tensor)
        y_list.append(y_tensor)
        scores.append(val)


In [107]:
# old code
# piece_map = {'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
#              'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11}
# def fen_to_onehot(fen):
#     board = chess.Board(fen)
#     mat = np.zeros((8,8,12), dtype=np.float32)
#     for square in chess.SQUARES:
#         piece = board.piece_at(square)
#         if piece:
#             row = 7 - chess.square_rank(square)
#             col = chess.square_file(square)
#             idx = piece_map[piece.symbol()]
#             mat[row, col, idx] = 1.0
#     return mat
# def game_to_tensors(fens, vals):

#     X_seq = [torch.tensor(fen_to_onehot(fen)) for fen in fens[:-1]]
#     y_seq = []

#     for i in range(len(fens)-1):
#         move_vals = [(fens[i+1], vals[i+1])]
#         best_move = max(move_vals, key=lambda x: x[1])[0]
#         y_seq.append(move2idx[best_move])
    
#     if len(X_seq)==0 or len(y_seq)==0:
#         return [], [], 0
        
#     return torch.stack(X_seq), torch.tensor(y_seq), 1
# X_list = []
# y_list = []
# for fens, vals in tqdm(zip(df["fen"], df["eval"])):
#     X_seq, y_seq, ok = game_to_tensors(fens[:100], vals[:100])
#     if ok:
#         X_list.append(X_seq)
#         y_list.append(y_seq)

# X_padded = pad_sequence(X_list, batch_first=True)
# y_padded = pad_sequence(y_list, batch_first=True, padding_value=-1)

KeyError: 'fen'

In [74]:
torch.save({"X": X_padded, "y": y_padded}, "data.pt")

# Training

In [65]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset

In [68]:
# simple model
import torch
import torch.nn as nn

class ChessMovePredictor(nn.Module):
    def __init__(self, num_moves):
        super().__init__()
        self.flatten = nn.Flatten(start_dim=2)
        self.lstm = nn.LSTM(8*8*12, 256, batch_first=True)
        self.fc = nn.Linear(256, num_moves)

    def forward(self, x):
        x = self.flatten(x) 
        out, _ = self.lstm(x)
        logits = self.fc(out)
        return logits

In [None]:
data = torch.load("data.pt")
X_padded = data["X"]
y_padded = data["y"]

In [64]:
X_train, X_test, y_train, y_test = train_test_split(X_padded, y_padded, test_size=0.2, random_state=42)

In [66]:
train_ds = TensorDataset(X_train, y_train)
test_ds = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=16)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ChessMovePredictor(num_moves=len(move2idx)).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=-1)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(3):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        logits = model(X_batch)
        loss = criterion(logits.view(-1, len(move2idx)), y_batch.view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")