# Importing Dependencies

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import json

# Decoder Class Definition
Below is the defintion of our decder class that will be used to generate chess moves. Our decoder uses PyTorch's TransformerDecoder that incorporates multi-head self attention and feedforward neural nets, while adding and normalizing after each layer. Postional embeddings are calculated before being fed into the decoder layer. A fully connected layer is used for the output to calculate a softmax for the most probable chess moves.

In [3]:
class ChessDecoder(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=8, num_layers=4, max_len=200):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.pos_embed = nn.Embedding(max_len, d_model)
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=1024)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # x shape: [batch, seq_len] → transformer expects [seq_len, batch]
        x = x.transpose(0, 1)
        seq_len, batch_size = x.size()

        # Calculate positional embeddings
        positions = torch.arange(seq_len, device=x.device).unsqueeze(1)
        x = self.embed(x) + self.pos_embed(positions)

        # Decoder masking: prevent attention to future tokens
        mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device), diagonal=1).bool()

        x = self.decoder(x, x, tgt_mask=mask)
        logits = self.fc_out(x)  # [seq_len, batch, vocab_size]
        return logits.transpose(0, 1)  # [batch, seq_len, vocab_size]


Pulling in the .pt file that contains the tensor of all encoded chess games that will be used for training.

In [4]:
encoded_tensor = torch.load("encoded_games_500k.pt")

print(type(encoded_tensor))
print(encoded_tensor.shape)
print(encoded_tensor[0][:10])  # first few move IDs of first game


<class 'torch.Tensor'>
torch.Size([500000, 200])
tensor([    1, 10536, 10644, 10609, 10542, 10683,   303,  1791,  1347,  1338])


Defining the dataset we will use during training to store and pull game moves. Also intializing dataset and data loader with tensor containing the encoded chess games.

In [5]:
class ChessDataset(Dataset):
    def __init__(self, encoded_tensor):
        self.data = encoded_tensor

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx][:-1]  # all but last
        y = self.data[idx][1:]   # all but first
        return x, y

dataset = ChessDataset(encoded_tensor)
loader = DataLoader(dataset, batch_size=64, shuffle=True, drop_last=True)


The next few cells are used to test the funtionality of the loader and model when working together.

In [7]:
with open("move_to_id.json", "r") as f:
    move_to_id = json.load(f)

vocab_size = len(move_to_id)
model = ChessDecoder(vocab_size=vocab_size)

In [6]:
x, y = next(iter(loader))

print("Input batch shape:", x.shape)
print("Target batch shape:", y.shape)
print("Example input sequence:", x[0][:10])
print("Example target sequence:", y[0][:10])

Input batch shape: torch.Size([64, 199])
Target batch shape: torch.Size([64, 199])
Example input sequence: tensor([    1, 10533, 10539, 10638, 10644,  1475,  1647,  1632,  1490,  1941])
Example target sequence: tensor([10533, 10539, 10638, 10644,  1475,  1647,  1632,  1490,  1941, 10755])


In [7]:
with torch.no_grad():
    logits = model(x)
print("Model output shape:", logits.shape)

Model output shape: torch.Size([64, 199, 11017])


In [10]:
PAD_ID = move_to_id['<PAD>']
import torch.nn.functional as F

loss = F.cross_entropy(
    logits.reshape(-1, vocab_size),
    y.reshape(-1),
    ignore_index=PAD_ID
)
print("Test loss:", loss.item())


Test loss: 0.009690499864518642


# Hyperparameter Optimization
We will use optuna's functionality on our datadset to tune hyperparameters before training on our larger dataset.

In [9]:
from torch.utils.data import TensorDataset, random_split, DataLoader

train_size = int(0.9 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)

import optuna
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'device {device} found')
vocab_size = len(move_to_id)

def objective(trial):
    # search space
    d_model = trial.suggest_categorical('d_model', [128, 256, 512])
    nhead = trial.suggest_categorical('nhead', [4,8])
    num_layers = trial.suggest_int('num_layers', 2, 6)
    lr = trial.suggest_float('lr', 1e-4, 5e-3)

    # create model
    model = ChessDecoder(
        vocab_size=vocab_size,
        d_model=d_model,
        nhead=nhead,
        num_layers=num_layers,
        max_len=encoded_tensor.size(1)
    ).to(device)

    optimizer = optim.AdamW(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_ID)

    # train a few epochs
    EPOCHS = 7
    for epoch in range(EPOCHS):
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            logits = model(x)
            loss = criterion(logits.reshape(-1, vocab_size), y.reshape(-1))
            loss.backward()
            optimizer.step()

    # validation
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for x, y, in val_loader:
            x, y = x.to(device), y.to(device)
            logits = model(x)
            loss = criterion(logits.reshape(-1,vocab_size), y.reshape(-1))
            total_val_loss += loss.item()
    avg_val_loss = total_val_loss / len(val_loader)

    return avg_val_loss


NameError: name 'dataset' is not defined

run optimization with optuna

In [None]:
# set up optuna

from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner

# run a few rounds at random first
sampler = TPESampler(n_startup_trials=5)
pruner = MedianPruner(n_startup_trials=5, n_warmup_steps=1)

study = optuna.create_study(direction='minimize', sampler=sampler, pruner=pruner)
study.optimize(objective, n_trials=40)

# print results

print('\n\nBest trial:')
print('  Value (val loss):', study.best_trial.value)
print('  Params:', study.best_trial.params)

[I 2025-10-20 14:06:50,773] A new study created in memory with name: no-name-e1b65a99-cda1-4333-adc6-754f06380a97
[I 2025-10-20 14:12:42,695] Trial 0 finished with value: 0.0073929192708769385 and parameters: {'d_model': 128, 'nhead': 4, 'num_layers': 6, 'lr': 0.000980025811877717}. Best is trial 0 with value: 0.0073929192708769385.
[I 2025-10-20 14:19:51,837] Trial 1 finished with value: 0.005282709680870586 and parameters: {'d_model': 512, 'nhead': 8, 'num_layers': 3, 'lr': 0.0007545065766000772}. Best is trial 1 with value: 0.005282709680870586.
[I 2025-10-20 14:23:37,924] Trial 2 finished with value: 0.01237118748875405 and parameters: {'d_model': 128, 'nhead': 4, 'num_layers': 3, 'lr': 0.00261027736598459}. Best is trial 1 with value: 0.005282709680870586.
[I 2025-10-20 14:28:07,065] Trial 3 finished with value: 0.005774908191084158 and parameters: {'d_model': 256, 'nhead': 8, 'num_layers': 3, 'lr': 0.0006363415651286342}. Best is trial 1 with value: 0.005282709680870586.
[I 2025-



Best trial:
  Value (val loss): 0.005282709680870586
  Params: {'d_model': 512, 'nhead': 8, 'num_layers': 3, 'lr': 0.0007545065766000772}


The following cell allows for easy storage and retrieval of the best hyperparameters as found above

In [None]:
# save best hyperparameters to file

import json
from datetime import datetime

best_params = study.best_trial.params
best_value = study.best_trial.value

best_params_with_meta = {
    "best_params": best_params,
    "best_val_loss": best_value,
    "n_trials": len(study.trials),
}

filename = f'best_hparams.json'

with open(filename, 'w') as f:
    json.dump(best_params_with_meta, f)


# how to open later or in different file
'''
with open("best_hparams.json", "r") as f:
    best_hparams = json.load(f)

params = best_hparams["best_params"]

best_model = ChessDecoder(
    vocab_size=vocab_size,
    d_model=params["d_model"],
    nhead=params["nhead"],
    num_layers=params["num_layers"],
    max_len=encoded_tensor.size(1)
).to(device)
'''

'\nwith open("best_hparams.json", "r") as f:\n    best_hparams = json.load(f)\n\nparams = best_hparams["best_params"]\n\nbest_model = ChessDecoder(\n    vocab_size=vocab_size,\n    d_model=params["d_model"],\n    nhead=params["nhead"],\n    num_layers=params["num_layers"],\n    max_len=encoded_tensor.size(1)\n).to(device)\n'

# Model Training
Training is done after the best combination of hyperparameters have been found. These hyperparameters are used to create the final instance of our model. This model is trained on an encoded dataset of 1 million chess games and uses cross entropy to calcuate loss.

In [11]:
import torch
import torch.nn as nn
import torch.optim as optim

vocab_size = len(move_to_id)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'On device {device}')

with open("best_hparams.json", "r") as f:
    best_hparams = json.load(f)

params = best_hparams["best_params"]

model = ChessDecoder(
    vocab_size=vocab_size,
    d_model=params["d_model"],
    nhead=params["nhead"],
    num_layers=params["num_layers"],
    max_len=encoded_tensor.size(1)
).to(device)

model.load_state_dict(torch.load('500k_model.pt', map_location=device))
model.train()

optimizer = optim.Adam(model.parameters(), lr=params["lr"])

epoch_loss = 0
num_batches = 0
for epoch in range(50):
    for x, y in loader:
        x = x.to(device)
        y = y.to(device)
        
        logits = model(x)
        loss = F.cross_entropy(
            logits.reshape(-1, vocab_size),
            y.reshape(-1),
            ignore_index=PAD_ID
        )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        num_batches += 1

    torch.save(model.state_dict(), '500k_model.pt')
    print(f"Epoch {epoch+1} average loss = {epoch_loss / num_batches:.4f}")
 


On device cuda
Epoch 1 average loss = 0.0191
Epoch 2 average loss = 0.0169
Epoch 3 average loss = 0.0163
Epoch 4 average loss = 0.0151


KeyboardInterrupt: 