In [None]:
import pandas as pd
from datasets import load_dataset
import os

Our data begins as a bunch of PGN transcripts. However, to work in tensors we need all transcripts to be the same length. So, this file takes our PGNs and performs some filtering.

This notebook has a very similar counterpart, `utils\chess_gpt_eval_data_filtering.ipynb`. The lichess and chess_gpt_eval datasets have a different structure and different column names. For most peoples' needs, the lichess dataset alone should suffice, so I made two separate notebooks to keep this one simple.

The output of this file is 4 different csv's:

`lichess_100mb.csv`" 100 MB of lichess PGN games, with every game also containing player Elo information.

`lichess_100mb_filtered.csv`: We perform some filtering for game length, add player Elo bucket, and do some manipulation of the PGN string.

`lichess_train.csv` and `lichess_test.csv` a 50 / 50 train / test split of `lichess_100mb_filtered.csv`, used for training and testing linear probes.

In [None]:
DATA_DIR = "data/"
prefix = "lichess_"


input_file = f'{DATA_DIR}{prefix}100mb.csv'
output_file = input_file.replace(".csv", "_filtered.csv")

First, we download the dataset if not present.

In [None]:
if not os.path.exists(input_file):
    dataset_path = "adamkarvonen/chess_games"
    file_path = f"{prefix}100mb.zip"
    # No idea why streaming=True is required to avoid an error here. Huggingface ¯\_(ツ)_/¯
    dataset = load_dataset(dataset_path, data_files=file_path,streaming=True)
    df = pd.DataFrame(dataset['train'])
    df.to_csv(input_file, index=False)

Our LLMs need a delimiter token ";" at the beginning of every PGN string or it won't work as well.

In [None]:
df = pd.read_csv(input_file)

def format_transcript(game: str) -> str:
    new_game = ';' + game
    return new_game

df['transcript'] = df['transcript'].apply(format_transcript)

for game in df.head()['transcript']:
    print(game)
    print()

Filter all games to be len 365. This means we discard anything under that length. I chose 365 because that's the 50% of df.describe(). I also count the number of moves (with x.split()) and discard anything below the 25th percentile. This makes it easier if I want to do any move based indexing.

In [None]:
len_df = df['transcript'].apply(lambda x: len(x))
print(len_df.describe())

game_length_in_chars = 365

# Data setup. All games must have same length. 50% are >= 690 moves. I will discard all games less than 680, and truncate the rest to 680.
filtered_df = df[df['transcript'].apply(lambda x: len(x) >= game_length_in_chars)].copy()
filtered_df.loc[:, 'transcript'] = filtered_df['transcript'].apply(lambda x: x[:game_length_in_chars])

len_df = filtered_df['transcript'].apply(lambda x: len(x))
print(len_df.describe())

move_count_df = filtered_df['transcript'].apply(lambda x: len(x.split()))
move_count = move_count_df.describe()
print("move count", move_count_df.describe())
quarter_percentile = move_count['25%']
print("quarter percentile", quarter_percentile)

# Now I need to filter out games that are too short. I will discard all games less than 25th percentile  moves.
filtered_df = filtered_df[filtered_df['transcript'].apply(lambda x: len(x.split()) >= quarter_percentile)]
print(filtered_df.describe())
print(filtered_df.head())

filtered_df.to_csv(output_file, index=False)

move_count_df = filtered_df['transcript'].apply(lambda x: len(x.split()))
print(move_count_df.describe())

In [None]:
print(len(filtered_df))
print(filtered_df['WhiteElo'].describe())

For the classification task, I wanted some Elo bins for the probe to classify. This somewhat arbitrarily creates 6 different Elo bins.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

np.random.seed(0)

# Function to create binned columns and bin index columns
def create_binned_columns(df, column_name):

    # Ensure column is numeric and handle NaN values. Here, we choose to drop them, but you might fill them instead.
    if df[column_name].dtype.kind not in 'biufc' or pd.isnull(df[column_name]).any():
        df = df.dropna(subset=[column_name])
        df[column_name] = pd.to_numeric(df[column_name], errors='coerce')

    binned_column_name = f'{column_name}Binned'
    bin_index_column_name = f'{column_name}BinIndex'

    # Create quantile-based bins
    num_bins = 6
    # Create quantile-based bins with range labels, dropping duplicates if necessary
    df[binned_column_name], bins = pd.qcut(df[column_name], q=num_bins, retbins=True, duplicates='drop')

    # Convert bin labels to strings and assign to the column
    df[binned_column_name] = df[binned_column_name].apply(lambda x: f'({x.left}, {x.right}]')

    # Create bin index column
    df[bin_index_column_name] = pd.qcut(df[column_name], q=num_bins, labels=False, duplicates='drop')

# Apply the function to both WhiteElo and BlackElo
create_binned_columns(filtered_df, 'WhiteElo')
create_binned_columns(filtered_df, 'BlackElo')

filtered_df.to_csv(output_file, index=False)

# Plotting
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(10, 8))

# Histogram for WhiteElo
axes[0].hist(filtered_df['WhiteElo'], bins=30, color='blue', alpha=0.7)
axes[0].set_title('WhiteElo Distribution')
axes[0].set_xlabel('WhiteElo')
axes[0].set_ylabel('Frequency')

# Bar chart for WhiteEloBinned
bin_counts = filtered_df['WhiteEloBinned'].value_counts()
axes[1].bar(bin_counts.index.astype(str), bin_counts.values, color='green', alpha=0.7)
axes[1].set_title('WhiteElo Binned Distribution')
axes[1].set_xlabel('WhiteElo Bins')
axes[1].set_ylabel('Count')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()



In [None]:
print(filtered_df['WhiteEloBinned'].value_counts())

In [None]:
print(filtered_df.head())

In [None]:
# shuffle all rows of the dataset

df = pd.read_csv(output_file)
df = df.sample(frac=1, random_state=200).reset_index(drop=True)
df.to_csv(output_file, index=False)

In [None]:
import pandas as pd
df = pd.read_csv(output_file)

print(len(df))

# Split df into a train and test split
train = df.sample(frac=0.5, random_state=200)
test = df.drop(train.index)

print(len(train))
print(len(test))

# Save the train and test splits to csv
train.to_csv(f'{DATA_DIR}{prefix}train.csv', index=False)
test.to_csv(f'{DATA_DIR}{prefix}test.csv', index=False)

In [6]:
import chess
import chess.pgn
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

# Constants
MAX_MOVES = 15  # Limit to first 15 moves
BOARD_SIZE = 8
NUM_PIECE_TYPES = 12  # 6 piece types for each color

# Function to convert board to tensor
def board_to_tensor(board):
    tensor = torch.zeros(NUM_PIECE_TYPES, BOARD_SIZE, BOARD_SIZE)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            color = int(piece.color)
            piece_type = piece.piece_type - 1
            rank, file = divmod(square, 8)
            tensor[piece_type + 6 * color][rank][file] = 1
    return tensor

# Custom dataset
class StaffordGambitDataset(Dataset):
    def __init__(self, games):
        self.positions = []
        self.moves = []

        for game in games:
            board = game.board()
            for i, move in enumerate(game.mainline_moves()):
                if i >= MAX_MOVES * 2:  # Both players' moves
                    break
                if board.turn == chess.BLACK:  # We're only interested in Black's moves
                    self.positions.append(board_to_tensor(board))
                    self.moves.append(move.from_square * 64 + move.to_square)
                board.push(move)

    def __len__(self):
        return len(self.positions)

    def __getitem__(self, idx):
        return self.positions[idx], self.moves[idx]

# Define the model
class StaffordGambitModel(nn.Module):
    def __init__(self):
        super(StaffordGambitModel, self).__init__()
        self.conv1 = nn.Conv2d(NUM_PIECE_TYPES, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(256 * BOARD_SIZE * BOARD_SIZE, 1024)
        self.fc2 = nn.Linear(1024, 64 * 64)  # Output for all possible moves

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(-1, 256 * BOARD_SIZE * BOARD_SIZE)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Training function
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

# Main execution
def main():
    # Load your PGN file with Stafford Gambit games
    games = []
    with open('stafford.txt') as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break
            games.append(game)

    # Create dataset and dataloader
    dataset = StaffordGambitDataset(games)
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Initialize model, optimizer, and loss function
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = StaffordGambitModel().to(device)
    optimizer = optim.Adam(model.parameters())
    criterion = nn.CrossEntropyLoss()

    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        train(model, train_loader, optimizer, criterion, device)
        print(f"Epoch {epoch+1}/{num_epochs} completed")

    # Save the model
    torch.save(model.state_dict(), 'stafford_gambit_model.pth')

if __name__ == "__main__":
    main()


Epoch 1/10 completed
Epoch 2/10 completed
Epoch 3/10 completed
Epoch 4/10 completed
Epoch 5/10 completed
Epoch 6/10 completed
Epoch 7/10 completed
Epoch 8/10 completed
Epoch 9/10 completed
Epoch 10/10 completed


In [7]:
def load_model(model_path, device):
    model = StaffordGambitModel().to(device)
    model.load_state_dict(torch.load(model_path))
    model.eval()  # Set the model to evaluation mode
    return model


In [8]:
# Custom validation dataset
class StaffordGambitValidationDataset(Dataset):
    def __init__(self, games):
        self.positions = []
        self.moves = []

        for game in games:
            board = game.board()
            for i, move in enumerate(game.mainline_moves()):
                if i >= MAX_MOVES * 2:  # Both players' moves
                    break
                if board.turn == chess.BLACK:  # We're only interested in Black's moves
                    self.positions.append(board_to_tensor(board))
                    self.moves.append(move.from_square * 64 + move.to_square)
                board.push(move)

    def __len__(self):
        return len(self.positions)

    def __getitem__(self, idx):
        return self.positions[idx], self.moves[idx]

# Load validation games
def load_validation_games(file_path):
    games = []
    with open(file_path) as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break
            games.append(game)
    return games


In [10]:
def evaluate_model(model, validation_loader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    validation_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for data, target in validation_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            validation_loss += loss.item() * data.size(0)  # Sum the batch losses
            _, predicted = torch.max(output, 1)
            correct += (predicted == target).sum().item()
            total += target.size(0)

    validation_loss /= total  # Average loss
    accuracy = correct / total
    return validation_loss, accuracy

# Main validation function
def main_validation():
    # Load validation games
    validation_games = load_validation_games('stafford.txt')

    # Create validation dataset and dataloader
    validation_dataset = StaffordGambitValidationDataset(validation_games)
    validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False)

    # Initialize model, criterion
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = load_model('stafford_gambit_model.pth', device)
    criterion = nn.CrossEntropyLoss()

    # Evaluate model
    validation_loss, validation_accuracy = evaluate_model(model, validation_loader, criterion, device)
    print(f"Validation Loss: {validation_loss:.4f}, Validation Accuracy: {validation_accuracy:.4f}")

if __name__ == "__main__":
    main_validation()


Validation Loss: 2.0788, Validation Accuracy: 0.5166


In [11]:
import chess
import chess.pgn
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split

# Constants
MAX_MOVES = 15
BOARD_SIZE = 8
NUM_PIECE_TYPES = 12
BATCH_SIZE = 32
EPOCHS = 20
LEARNING_RATE = 0.001

# Function to convert board to tensor
def board_to_tensor(board):
    tensor = torch.zeros(NUM_PIECE_TYPES, BOARD_SIZE, BOARD_SIZE)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            color = int(piece.color)
            piece_type = piece.piece_type - 1
            rank, file = divmod(square, 8)
            tensor[piece_type + 6 * color][rank][file] = 1
    return tensor

# Custom dataset
class StaffordGambitDataset(Dataset):
    def __init__(self, games):
        self.positions = []
        self.moves = []

        for game in games:
            board = game.board()
            for i, move in enumerate(game.mainline_moves()):
                if i >= MAX_MOVES * 2:
                    break
                if board.turn == chess.BLACK:
                    self.positions.append(board_to_tensor(board))
                    self.moves.append(move.from_square * 64 + move.to_square)
                board.push(move)

    def __len__(self):
        return len(self.positions)

    def __getitem__(self, idx):
        return self.positions[idx], self.moves[idx]

# Define the model
class StaffordGambitModel(nn.Module):
    def __init__(self):
        super(StaffordGambitModel, self).__init__()
        self.conv1 = nn.Conv2d(NUM_PIECE_TYPES, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(256 * BOARD_SIZE * BOARD_SIZE, 1024)
        self.fc2 = nn.Linear(1024, 64 * 64)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        x = x.view(-1, 256 * BOARD_SIZE * BOARD_SIZE)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Training function
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

# Evaluation function
def evaluate(model, loader, criterion, device):
    model.eval()
    loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss += criterion(output, target).item() * data.size(0)
            _, predicted = torch.max(output, 1)
            correct += (predicted == target).sum().item()
            total += target.size(0)
    return loss / total, correct / total

# Main function
def main():
    # Load your PGN file with Stafford Gambit games
    games = []
    with open('stafford.txt') as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break
            games.append(game)

    # Split data into training and validation sets
    train_games, val_games = train_test_split(games, test_size=0.2, random_state=42)

    # Create datasets and dataloaders
    train_dataset = StaffordGambitDataset(train_games)
    val_dataset = StaffordGambitDataset(val_games)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # Initialize model, optimizer, and loss function
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = StaffordGambitModel().to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss()

    # Training loop
    for epoch in range(EPOCHS):
        train(model, train_loader, optimizer, criterion, device)
        train_loss, train_accuracy = evaluate(model, train_loader, criterion, device)
        val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)
        print(f"Epoch {epoch+1}/{EPOCHS} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
        print(f"                 - Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

    # Save the model
    torch.save(model.state_dict(), 'stafford_gambit_model.pth')

if __name__ == "__main__":
    main()


Epoch 1/20 - Train Loss: 5.6364, Train Accuracy: 0.0984
                 - Val Loss: 5.3306, Val Accuracy: 0.1379
Epoch 2/20 - Train Loss: 4.3830, Train Accuracy: 0.0984
                 - Val Loss: 4.6930, Val Accuracy: 0.1379
Epoch 3/20 - Train Loss: 3.8586, Train Accuracy: 0.0984
                 - Val Loss: 4.7174, Val Accuracy: 0.1379
Epoch 4/20 - Train Loss: 3.5258, Train Accuracy: 0.1066
                 - Val Loss: 4.8619, Val Accuracy: 0.1379
Epoch 5/20 - Train Loss: 3.3068, Train Accuracy: 0.0984
                 - Val Loss: 4.4309, Val Accuracy: 0.1379
Epoch 6/20 - Train Loss: 3.1950, Train Accuracy: 0.0984
                 - Val Loss: 4.1250, Val Accuracy: 0.1379
Epoch 7/20 - Train Loss: 3.1110, Train Accuracy: 0.1066
                 - Val Loss: 3.9788, Val Accuracy: 0.1379
Epoch 8/20 - Train Loss: 3.0633, Train Accuracy: 0.1066
                 - Val Loss: 3.9084, Val Accuracy: 0.1379
Epoch 9/20 - Train Loss: 2.9973, Train Accuracy: 0.1066
                 - Val Loss: 3.8

In [None]:
import chess
import chess.pgn
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from sklearn.model_selection import train_test_split

# Constants
MAX_MOVES = 15
BOARD_SIZE = 8
NUM_PIECE_TYPES = 12
BATCH_SIZE = 32
EPOCHS = 20
LEARNING_RATE = 0.001

# Function to convert board to tensor
def board_to_tensor(board):
    tensor = torch.zeros(NUM_PIECE_TYPES, BOARD_SIZE, BOARD_SIZE)
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            color = int(piece.color)
            piece_type = piece.piece_type - 1
            rank, file = divmod(square, 8)
            tensor[piece_type + 6 * color][rank][file] = 1
    return tensor

# Custom dataset
class StaffordGambitDataset(Dataset):
    def __init__(self, games):
        self.positions = []
        self.moves = []

        for game in games:
            board = game.board()
            for i, move in enumerate(game.mainline_moves()):
                if i >= MAX_MOVES * 2:
                    break
                if board.turn == chess.BLACK:
                    self.positions.append(board_to_tensor(board))
                    self.moves.append(move.from_square * 64 + move.to_square)
                board.push(move)

    def __len__(self):
        return len(self.positions)

    def __getitem__(self, idx):
        return self.positions[idx], self.moves[idx]

# Define the model with dropout and batch normalization
class StaffordGambitModel(nn.Module):
    def __init__(self):
        super(StaffordGambitModel, self).__init__()
        self.conv1 = nn.Conv2d(NUM_PIECE_TYPES, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(256)
        self.fc1 = nn.Linear(256 * BOARD_SIZE * BOARD_SIZE, 1024)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(1024, 64 * 64)

    def forward(self, x):
        x = torch.relu(self.bn1(self.conv1(x)))
        x = torch.relu(self.bn2(self.conv2(x)))
        x = torch.relu(self.bn3(self.conv3(x)))
        x = x.view(-1, 256 * BOARD_SIZE * BOARD_SIZE)
        x = torch.relu(self.dropout(self.fc1(x)))
        x = self.fc2(x)
        return x

# Training function
def train(model, train_loader, optimizer, criterion, device):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

# Evaluation function
def evaluate(model, loader, criterion, device):
    model.eval()
    loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss += criterion(output, target).item() * data.size(0)
            _, predicted = torch.max(output, 1)
            correct += (predicted == target).sum().item()
            total += target.size(0)
    return loss / total, correct / total

# Main function
def main():
    # Load your PGN file with Stafford Gambit games
    games = []
    with open('stafford.txt') as pgn:
        while True:
            game = chess.pgn.read_game(pgn)
            if game is None:
                break
            games.append(game)

    # Split data into training and validation sets
    train_games, val_games = train_test_split(games, test_size=0.2, random_state=42)

    # Create datasets and dataloaders
    train_dataset = StaffordGambitDataset(train_games)
    val_dataset = StaffordGambitDataset(val_games)
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # Initialize model, optimizer, and loss function
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = StaffordGambitModel().to(device)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss()

    # Learning rate scheduler
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    # Training loop
    for epoch in range(EPOCHS):
        train(model, train_loader, optimizer, criterion, device)
        train_loss, train_accuracy = evaluate(model, train_loader, criterion, device)
        val_loss, val_accuracy = evaluate(model, val_loader, criterion, device)
        print(f"Epoch {epoch+1}/{EPOCHS} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
        print(f"                 - Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")
        scheduler.step()

    # Save the model
    torch.save(model.state_dict(), 'stafford_gambit_model.pth')

if __name__ == "__main__":
    main()
