# NNUE training

Great source on NNUE: https://official-stockfish.github.io/docs/nnue-pytorch-wiki/docs/nnue.html

## Input data

Stockfish has a lot of data available for NNUE training in the .binpack format. They have a repo for training NNUEs (nnue-pytorch) that enables efficient dataloading with this format. I don't want to use nnue-pytorch, i want to make my own NNUE training setup.

The nnue-pytorch repo also has information on training datasets for NNUEs: https://github.com/official-stockfish/nnue-pytorch/wiki/Training-datasets. They explain how to make your own dataset and link some of the datasets they generated. I will use some of this data, because generating the data myself would be too time-consuming on my hardware.

Currently using training data: test80-2024-01-jan-2tb7p.min-v2.v6.binpack.zst from https://huggingface.co/datasets/linrock/test80-2024/tree/main

This file contains billions of positions with evaluations in the .binpack format. The stockfish tools branch has a tool to covert the .binpack data into .plain data (https://github.com/official-stockfish/Stockfish/blob/tools/docs/convert.md). I used this tool and stored the first 200M evaluated positions.

### Load input data

In [1]:
import pandas as pd
import numpy as np
import torch

### Turn FEN into input layer

In [2]:
piece_dict = {'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5, 'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11}

def FEN_to_input(fen):
    """
    Convert a FEN string to an NNUE input vector.
    """
    # Split the FEN string into its components
    sub_FEN = fen.split(' ')
    board = sub_FEN[0]
    ranks = board.split('/')

    # Convert the board to a 1D boolean array
    # in the chess engine, position 0 corresponds to a1, so the ranks in the FEN string will need to be reversed
    input_layer = np.zeros(768, dtype = np.float32)
    position = 0
    for rank in ranks[::-1]:
        for char in rank:
            if char.isdigit():
                position += int(char)
            else:
                input_layer[position*piece_dict[char]] = 1

    return torch.tensor(input_layer, dtype=torch.float32)

## Model architecture

Input: a sparse, binary array of length 768. Each element of the array represents a possible combination of piece type (6), piece_color (2) and position (64) (6*2*64 = 768).



The fully connected feedfoward network has 4 hidden layers: 768 -> 1024, 1024 -> 8, 8 -> 32 and 32 -> 1.

The output is a single scalar.



In [None]:
import torch
import torch.nn as nn

class SimpleNNUE(nn.Module):
    def __init__(self):
        super(SimpleNNUE, self).__init__()
        # three fully connected layers
        self.fc1 = nn.Linear(768, 1024)
        self.fc2 = nn.Linear(1024, 8)
        self.fc3 = nn.Linear(8, 32)
        self.fc4 = nn.Linear(32, 1)

    def forward(self, x):
        x = torch.clamp(self.fc1(x), min = 0, max = 1)
        x = torch.clamp(self.fc2(x), min = 0, max = 1)
        x = torch.clamp(self.fc3(x), min = 0, max = 1)
        x = self.fc4(x)  # output can be raw score
        return x.float()


In [4]:
import csv
import torch
from torch.utils.data import IterableDataset, DataLoader

class CsvFenScoreDataset(IterableDataset):
    def __init__(self, csv_path, shuffle_buffer=0):
        """
        csv_path: path to CSV file with two columns: fen, score
        fen_to_tensor: function(str) -> torch.Tensor
        shuffle_buffer: size of in-memory shuffle buffer; 0 = no shuffle
        """
        super().__init__()
        self.csv_path = csv_path
        self.shuffle_buffer = shuffle_buffer

    def _row_stream(self):
        """
        Generator that yields (fen, score) tuples from the CSV file.
        """
        with open(self.csv_path, newline='') as csvfile:
            reader = csv.reader(csvfile)
            for row in reader:
                if not row or row[0].startswith('#'):
                    continue
                fen, score = FEN_to_input(row[0].strip()), row[1].strip()
                yield fen, torch.tensor(float(score), dtype=torch.float32)

    def __iter__(self):
        stream = self._row_stream()
        if self.shuffle_buffer > 1:
            # reservoir-style shuffle buffer
            buf = []
            for fen, score in stream:
                buf.append((fen, score))
                if len(buf) >= self.shuffle_buffer:
                    idx = torch.randint(len(buf), (1,)).item()
                    yield buf.pop(idx)
            # drain remaining buffer
            while buf:
                idx = torch.randint(len(buf), (1,)).item()
                yield buf.pop(idx)
        else:
            for fen, score in stream:
                yield fen, score

In [5]:
from torch.utils.data import DataLoader

# tranport data to torch tensors
#input_data = np.array([FEN_to_input(fen) for fen in training_df['FEN'].values])
#output_data = np.array(training_df['Evaluation'].values)
#dataset = MyDataset(input_data, output_data)
#loader = DataLoader(dataset, batch_size = 2048, shuffle = True, num_workers = 4, pin_memory = True)


from datasets import load_dataset
csv_path = '/home/yvlaere/projects/yvl-chess/NNUE_training/training_data/sf_training_data.csv'
dataset = CsvFenScoreDataset(csv_path, shuffle_buffer=10000)
loader = DataLoader(dataset, batch_size = 2048, num_workers = 4, pin_memory = True)

# about 2.4G VRAM available

https://talkchess.com/viewtopic.php?start=60&t=75724

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
nr_epochs = 500
model = SimpleNNUE().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
total_size = 200000000
batch_size = 2048
steps_per_epoch = total_size // batch_size
#scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=1e-3, steps_per_epoch=steps_per_epoch, epochs=nr_epochs, pct_start=0.3, anneal_strategy='cos')
criterion = nn.MSELoss()
MAE_loss = nn.L1Loss()
lowest_MAE = 10000

for epoch in range(nr_epochs):
    for batch in loader:
        # get data from the dataloader
        batch_x, batch_y = batch

        # move data to GPU
        batch_x = batch_x.to(device, non_blocking = True)
        batch_y = batch_y.to(device, non_blocking = True)
        pred = model(batch_x).squeeze(1)  # remove the last dimension
        loss = criterion(pred, batch_y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # calculate MAE
        MAE = MAE_loss(pred, batch_y)
        if MAE < lowest_MAE:
            lowest_MAE = MAE
            torch.save(model.state_dict(), 'best_model.pth')
            print(f"New best model saved with MAE: {lowest_MAE.item():.4f}, loss: {loss.item():.4f}")
        
    #scheduler.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")
    print(f"Epoch {epoch+1}, MAE: {MAE.item():.4f}, lowest MAE: {lowest_MAE:.4f}")

New best model saved with MAE: 7778.6611, loss: 229368960.0000
New best model saved with MAE: 7350.9570, loss: 216233600.0000
New best model saved with MAE: 7066.1265, loss: 210160000.0000
New best model saved with MAE: 6929.0723, loss: 203875008.0000
New best model saved with MAE: 6849.6157, loss: 203404608.0000
New best model saved with MAE: 6616.1055, loss: 197880320.0000
New best model saved with MAE: 6317.0488, loss: 188735072.0000
New best model saved with MAE: 6239.4551, loss: 188846560.0000


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f865d674350>>
Traceback (most recent call last):
  File "/home/yvlaere/projects/yvl-chess/.venv/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 

KeyboardInterrupt: 


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f865d674350>>
Traceback (most recent call last):
  File "/home/yvlaere/projects/yvl-chess/.venv/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(



KeyboardInterrupt: 


In [None]:
input_path = '/home/yvlaere/projects/yvl-chess/NNUE_training/training_data/train.csv'

training_df = pd.read_csv(input_path)

print(training_df.head())

print(np.mean(training_df['Evaluation']))
print(np.std(training_df['Evaluation']))
print(np.min(training_df['Evaluation']))
print(np.max(training_df['Evaluation']))
print(np.median(training_df['Evaluation']))

                                                 FEN  Evaluation
0  r1b2rk1/ppp2pbp/3q1np1/n3p1B1/2B5/1Q3N2/PP1N1P...        -135
1  8/1pp2p2/6k1/4P2p/p1PR1K1P/2r2P2/6P1/8 w - - 0 33         -57
2  r2qk1nr/1b3pbp/n3p1p1/1pp1P3/p2PN3/2P2N2/PPB3P...         541
3  2b2rk1/5pp1/p2q1n1p/P2pn3/3N4/3BP1B1/2Q2PPP/Rr...         163
4  r2qkb1r/ppp2ppb/2n1p3/3n2PQ/3Pp3/2P4P/PP6/RNB1...        -332
18.06853701380683
619.097853227714
-6462
7462
14.0


In [None]:
# general information on data
input_data = np.array([FEN_to_input(fen) for fen in training_df['FEN'].values])
output_data = np.array(training_df['Evaluation'].values)
print(input_data.shape)
print(output_data.shape)

#1 979 383 entries
# if converted to torch tensors, it becomes too large for the RAM (bool -> float32)
# so it needs to be fed in batches

In [3]:
from torch.utils.data import IterableDataset

def get_evaluation(item):
    if item['mate'] is not None:
        # Assign a high positive or negative value based on the side to move
        # Assuming positive for white mates, negative for black mates
        return torch.tensor(10000.0 if item['mate'] > 0 else -10000.0, dtype=torch.float32)
    elif item['cp'] is not None:
        return torch.tensor(item['cp'], dtype=torch.float32)

class ChessEvalDataset(IterableDataset):
    def __init__(self, hf_dataset):
        self.dataset = hf_dataset

    def __iter__(self):
        for item in self.dataset:
            # Example: assuming "fen" and "eval" are keys
            x = torch.tensor(FEN_to_input(item['fen']))       # Convert FEN to tensor
            y = get_evaluation(item)  # Evaluation score
            yield x, y

In [None]:
from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self, np_input, np_output):
        self.input = np_input
        self.output = np_output

    def __len__(self):
        return self.input.shape[0]
    
    def __getitem__(self, idx):
        # get one data point
        np_x = self.input[idx]
        np_y = self.output[idx]

        # convert to torch tensors
        x = torch.tensor(np_x, dtype=torch.float32)
        y = torch.tensor(np_y, dtype=torch.float32)
        return x, y