# Chess FEN → Score + Move Model
This notebook:

- Loads a `text` file where each line is `FEN|score_cp|other_cp|uci_move`.

- Splits it into four pandas columns (should drop the last one because useless).

- Encodes the FEN to numeric features (piece planes + side-to-move).

- Trains:

  A nn to predict the first centipawn score.



In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import gc

# Models
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import layers, models, Input
from tensorflow.keras.layers import Dropout
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras import mixed_precision
mixed_precision.set_global_policy("mixed_float16")

2025-10-18 11:10:13.802347: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## 1) Load & split raw lines into columns

In [None]:
DATA_PATH = r"../nnue/useful_chess_data.txt"
assert os.path.exists(DATA_PATH), f"Data file not found at {DATA_PATH}. Upload your file there or change DATA_PATH."


maxLines = int(10**7)
amount = 38548203
modulos_for_selection = (amount//maxLines)
idx = 0
lines = []
skipped = 0
with open(DATA_PATH, "r") as f:
    for line in tqdm(f,desc="Reading positions",total=amount):
        line = line.strip()
        if line and idx%modulos_for_selection==0:
            lines.append(line)
            if (len(lines)>=maxLines):
                print(f"Only using {len(lines)} lines")
                break
        idx+=1

df = pd.DataFrame([ln.split("|") for ln in tqdm(lines,desc = "Parsing chess data")], columns=["fen", "score_cp", "score2_cp", "uci"])
df["score_cp"] = pd.to_numeric(df["score_cp"], errors="coerce")
df["score2_cp"] = pd.to_numeric(df["score2_cp"], errors="coerce")
df = df.drop(columns=["uci"])
df.to_pickle(path="fullDf.pkl",compression="zip")

print("Parsed chess data (preview)")
print(df.head())

del lines
del df
gc.collect()

In [None]:
print("Loading normal df")
df = pd.read_pickle("fullDf.pkl","zip")
    
print(f"Dataframe has {len(df)} entries")

Loading normal df
Dataframe has 10000000 entries


In [None]:
#Reduce the size of loaded because it cannot fit into memory when training
print(len(df))
df = df[len(df)-2000000:]
print(len(df))

10000000
2000000


In [None]:
#Code to balance dataset by adding mirror of game but switching the colors
def reverse_fen(fen):
    board, turn, *rest = fen.split(" ")
    # 1. Swap piece colors
    swapped = "".join(
        c.lower() if c.isupper() else c.upper() if c.islower() else c
        for c in board
    )
    # 2. Reverse ranks
    ranks = swapped.split("/")
    reversed_board = "/".join(ranks[::-1])
    # 3. Flip turn
    new_turn = "w" if turn == "b" else "b"
    # 4. Rebuild FEN
    new_fen = " ".join([reversed_board, new_turn] + rest)
    return new_fen

testFen = "4k3/3pppp1/8/8/8/8/8/3QK3 w - - 0 1"
print(f'Reversing fen "{testFen}" to "{reverse_fen(testFen)}"')

#Augment dataframe
def augmentDf(df):
    print("Original size:", len(df))
    augmented_rows = []
    for _, row in tqdm(df.iterrows(),total=len(df),desc="Equalizing dataframe",colour="green",ncols=100):
        newFen = reverse_fen(row["fen"])
        augmented_rows.append({
            "fen" : newFen,
            "score_cp" : row["score_cp"],
            "score2_cp" : row["score2_cp"],
        })
    df_aug = pd.DataFrame(augmented_rows)
    print("Augmented size:", len(df_aug)+len(df))
    return pd.concat([df, df_aug], ignore_index=True)

df = augmentDf(df)

In [None]:
#Search df for black positions with high centipawn evaluations (for testing)
def searchDf(df):
    blackRows = []
    for _, row in tqdm(df.iterrows(),total=len(df),desc="Searching dataframe",colour="green",ncols=100):
        if row["fen"].split()[1] == "b" and abs(row["score_cp"]) >= 400:

            blackRows.append({
                "fen" : row["fen"],
                "score_cp" : row["score_cp"],
                "score2_cp" : row["score2_cp"],
            })
    print(len(blackRows))
    print(*blackRows,sep="\n")

searchDf(df)

## 2) FEN encoder
We encode each board into a feature vector: 64 squares × 12 piece planes (P,N,B,R,Q,K for white/black), plus side-to-move.

In [None]:
PIECE_TO_PLANE = {
    'P':0,'N':1,'B':2,'R':3,'Q':4,'K':5,
    'p':6,'n':7,'b':8,'r':9,'q':10,'k':11
}

def fen_to_flat(fen: str, debug=False):
    parts = fen.strip().split()
    board, side, castling, ep = parts[0], parts[1], parts[2], parts[3]
    img = np.zeros((8,8,12), dtype=np.float16)
    ranks = board.split('/')
    for r, rank in enumerate(ranks):
        file_idx = 0
        for ch in rank:
            if ch.isdigit():
                file_idx += int(ch)
            else:
                img[r, file_idx, PIECE_TO_PLANE[ch]] = 1.0
                file_idx += 1

    if (debug):
        print("8 x 8 x 12, without extras")
        print(*img)

    features = []
    features.extend(img.reshape(-1).tolist())

    features.append(1.0 if side == 'w' else 0.0)

    # return img.flatten()
    return np.array(features, dtype=np.float16)

# Quick sanity check
x = fen_to_flat(df.iloc[0]["fen"])
print(f"Feature length of flat {x.shape}")

del x
gc.collect()

Feature length of vector: 782
Feature length of image (8, 8, 12) and (13,)
Feature length of flat (769,)
Feature length of small flat (65,)


516

## 3) Build feature matrix X and target y
- `y_score` = first centipawn score (`score_cp`)

In [11]:
y_score = df['score_cp'].values.astype(np.float16)

### Prepare data for NN

In [None]:
board_imgs = []
for fen,static_score in tqdm(zip(df["fen"],df["score2_cp"]),total=len(df),desc="Formating NN training and validation data",colour="green"):
    img = fen_to_flat(fen)
    img = np.append(img, np.float16(static_score))
    board_imgs.append(img)

print("Creating np array to store data")
board_imgs = np.array(board_imgs)

print("Splitting into training and test data")
imgs_train, imgs_test, y_score_train, y_score_test = train_test_split(
    board_imgs, y_score, test_size=0.2, random_state=42
)
print("Switching to float16 to reduce memory usage")
imgs_train = imgs_train.astype("float16")
imgs_test = imgs_test.astype("float16")
y_score_train = y_score_train.reshape(-1, 1).astype("float16")
y_score_test  = y_score_test.reshape(-1, 1).astype("float16")

del board_imgs, y_score, PIECE_TO_PLANE, img
gc.collect()

Formating NN training and validation data: 100%|[32m██████████[0m| 2000000/2000000 [03:24<00:00, 9770.90it/s] 


Creating np array to store data
Splitting into training and test data
Switching to float16 to reduce memory usage


## 4) NN for score prediction

Alternative loss functions for the NN, but testing shows that mse is the best loss function

In [None]:
def weighted_mse_by_true(eps=0.1, power=1.0):
    """
    Weighted MSE where weight = 1 / (|y_true| + eps)**power, then normalized to mean 1.
    - eps: avoids division by zero and controls how strong the upweighting for small y is.
    - power: 1.0 => inverse absolute, 2.0 => inverse-square (stronger).
    """
    def loss(y_true, y_pred):
        # ensure floats
        y_true_f = tf.cast(y_true, tf.float32)
        y_pred_f = tf.cast(y_pred, tf.float32)

        # compute base MSE per sample (reduce over last axis if vector outputs)
        sq_err = tf.reduce_mean(tf.square(y_pred_f - y_true_f), axis=-1)

        # compute weights
        w = 1.0 / (tf.abs(y_true_f) + eps)**power
        # if y has shape (batch,1) reduce to (batch,)
        w = tf.reshape(w, tf.shape(sq_err))

        # normalize weights to mean 1 in the batch to keep gradient scale stable
        w = w / (tf.reduce_mean(w) + 1e-12)

        return tf.reduce_mean(w * sq_err)
    return loss

def relative_mse(eps=0.1):
    def loss(y_true, y_pred):
        y_true_f = tf.cast(y_true, tf.float32)
        y_pred_f = tf.cast(y_pred, tf.float32)
        denom = tf.abs(y_true_f) + eps
        rel = (y_pred_f - y_true_f) / denom
        return tf.reduce_mean(tf.square(rel))
    return loss

The NN

In [None]:
#Would be incredible to be able to do a grid search on first layer size and next layer size and amount
def build_chess_cnn():
    board_input = Input(shape=(770,), dtype='float16', name="board")
    hidden = layers.Dense(32, activation="relu",kernel_regularizer=l2(0.001))(board_input)
    for i in range(7):
        hidden = layers.Dense(16, activation="relu",kernel_regularizer=l2(0.001))(hidden)
    # hidden = layers.Dropout(0.20)(hidden)
    value_out = layers.Dense(1, name="value")(hidden)


    # model = models.Model(inputs=[board_input, extra_input], outputs=[value_out])
    model = models.Model(inputs=[board_input], outputs=[value_out])
    model.compile(
    optimizer="adam",
    loss={"value": "mse"},
    metrics={"value": ["mae"]}
    )
    return model


#A NN just ot see if it works
model = build_chess_cnn()
model.summary()

In [None]:
#Arreter en cas d'overfitting ou de stagnation
early_stop = EarlyStopping(monitor = "val_loss", patience = 5, restore_best_weights=True)

#Permettre qu'il continue meme si plus lentement
reduce_lr = ReduceLROnPlateau(monitor = "val_loss", mode = "min", factor = 0.5, patience = 3)

#Train CNN model
history = model.fit(
    imgs_train,
    y_score_train,
    validation_data=[imgs_test,y_score_test],
    epochs=100,
    verbose=1,
    callbacks = [early_stop, reduce_lr],
    batch_size = 2048,
)

## 5) Data I can visualize as a human to understand the ai's performance

In [None]:
print(history.history.keys())
#Skip first outrageously large error
plt.plot(history.history['loss'][1:], label="Train loss")
plt.plot(history.history['val_loss'][1:], label="Validation loss")
plt.legend()
plt.show()

In [None]:
#Test CNN model
test_results = model.evaluate(
    # {"board": imgs_test, "extra": extras_test},
    imgs_test,
    y_score_test,
    batch_size=1024,
    verbose=1
)

#Best result
# MSE                 MAE
# [754.0731201171875, 21.048927307128906]
print("Test results:", test_results)

# Model prediction distribution

In [None]:
y_pred = model.predict(imgs_test, batch_size=1024)

errors = y_pred.flatten() - y_score_test.flatten()  # assuming 1D outputs

# Plot distribution of errors
plt.figure(figsize=(8,5))
plt.hist(errors, bins=50, color='skyblue', edgecolor='black')
plt.title('Distribution of Prediction Errors')
plt.xlabel('Error (Prediction - True)')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()

# Optional: print summary statistics
print("Mean Absolute Error (MAE):", np.mean(np.abs(errors)))
print("Mean Squared Error (MSE):", np.mean(errors**2))
print("Max error:", np.max(np.abs(errors)))

large_limit = 30
large_error_indices = np.where(np.abs(errors) > large_limit)[0]
print(f"There are {len(large_error_indices)} positions with >{large_limit}cp erros")

print(imgs_test[large_error_indices[0]])

# Model prediction heatmap by true-score

In [None]:
y_true = y_score_test.flatten()
y_pred = model.predict(imgs_test, batch_size=1024, verbose=0).flatten()

errors = y_pred - y_true

true_bins = np.arange(-200, 200, 10)   # bins for true score (Y axis)
error_bins = np.arange(-130, 130, 10)  # bins for error (X axis)

# 2D histogram: H[true_bin, error_bin] = count
H, xedges, yedges = np.histogram2d(y_true, errors,bins=[true_bins, error_bins])

# Normalize each column (true-score bin) so it sums to 1
col_sums = H.sum(axis=1, keepdims=True)
H_norm = np.divide(H, col_sums, out=np.zeros_like(H), where=col_sums!=0)

# Plot normalized heatmap
plt.figure(figsize=(9, 6))
plt.imshow(
    H_norm.T,           # transpose: X=true value, Y=error
    origin="lower",
    aspect="auto",
    extent=[true_bins[0], true_bins[-1], error_bins[0], error_bins[-1]],
    cmap="turbo",
    vmin=0, vmax=np.max(H_norm)
)
plt.colorbar(label="Proportion within true-value bin")
plt.xlabel("True score")
plt.ylabel("Prediction error (pred − true)")
plt.title("Normalized error distribution by true-score range")
plt.axhline(0, color="k", lw=0.8)
plt.show()

# True-score heatmap distribution

In [None]:
y_true = y_score_test.flatten()

limit = 100
bins = np.arange(-limit-1, limit+1, 1)  # bin width = 1
hist, _ = np.histogram(y_true, bins=bins)

# Normalize to get percentage per bin
hist_percent = hist / hist.sum()

# Create a 2D array for heatmap (1 row, n columns)
heatmap = hist_percent[np.newaxis, :]  # shape (1, n_bins)

plt.figure(figsize=(12, 2))  # wide and short
plt.imshow(
    heatmap,
    aspect='auto',
    cmap='turbo',
    extent=[bins[0], bins[-1], 0, 1]
)
plt.colorbar(label='Percentage of true scores')
plt.yticks([])  # hide Y axis
plt.xlabel('True score value')
plt.title('Distribution of True Scores as Heatmap')
plt.show()

In [None]:
def predict_eval(fen,static_eval):
    x_input = fen_to_flat(fen,debug=False)
    x_input = np.append(x_input, np.float32(static_eval))
    print(*x_input,sep=",")
    x_input = x_input.reshape(1, -1)

    print(x_input.shape)
    prediction = model.predict(x_input)
    print(f"Predicted evaluation (centipawns) for '{fen}' : {prediction[0][0]:.3f}")

predict_eval("r2qk4/8/8/8/8/8/PPPPPPPP/RNBQKBNR w KQq - 0 1",2.390)
predict_eval("rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1",0.0)

In [None]:
error_limit = 100
for i in tqdm(range(len(df)),desc=f"Looking for errors >= {error_limit}",total=len(df),colour="green"):
    x_input = fen_to_flat(df["fen"][i])
    x_input = np.append(x_input, np.float32(df["score2_cp"][i]))
    x_input = x_input.reshape(1, -1)

    prediction = model.predict(x_input,verbose=0)
    error = int(df['score_cp'][i])-(prediction[0][0])
    if (abs(error) >= error_limit):
        print(df.loc[[i]])
        print(f"Predicted evaluation (centipawns) for '{df['fen'][i]}' : {prediction[0][0]:.3f} accurate is {df['score_cp'][i]}, error is {error:.3f}")
        break

In [None]:
# model.save("best_on_doubled.keras")
model.export("best_tf_nn")


In [None]:
model = tf.keras.models.load_model("best_on_doubled.keras")  
model.summary()