# CNN Upgraded

Notebook para la nueva versión del modelo CNN basado en: (1) dataset enriquecido, (2) pipeline `tf.data`/TFRecords, (3) aprendizaje multitarea (clasificación + regresión de centipawns).

In [18]:
import sys
import math
from pathlib import Path

import numpy as np
import pandas as pd
import tensorflow as tf

sys.path.append('..')
from functions.fen_analyzer import extract_active_turn

print('TensorFlow version:', tf.__version__)


TensorFlow version: 2.20.0


In [19]:
# Rutas y parámetros base
PROCESSED_DIR = Path('..') / 'data' / 'processed' / 'enriched'
TFRECORD_DIR = Path('..') / 'data' / 'tfrecords'
TFRECORD_DIR.mkdir(parents=True, exist_ok=True)

BOARD_SHAPE = (8, 8, 12)
AUX_FEATURES = [
    'material_difference', 'total_pieces', 'pawn_advantage', 'rook_advantage',
    'bishop_advantage', 'knight_advantage', 'queen_advantage', 'white_material', 'black_material'
]
TARGET_CLASS_COL = 'eval_class_active'
TARGET_REG_COL = 'evaluation_cp_clipped'
BATCH_SIZE = 256
AUTOTUNE = tf.data.AUTOTUNE
MAX_TRAIN = 1_000_000
MAX_VAL = 200_000
MAX_TEST = 200_000


## 1. Exportar TFRecords desde el dataset enriquecido

In [20]:
PIECE_MAP = {p: i for i, p in enumerate(['P','N','B','R','Q','K'])}
CHANNELS = len(PIECE_MAP) * 2
SPLIT_SIZES = {}

def fen_to_tensor(fen: str) -> np.ndarray:
    board = fen.split(' ')[0]
    rows = board.split('/')
    tensor_rows = []
    for rank in rows:
        row_vecs = []
        for ch in rank:
            vec = [0] * CHANNELS
            if ch.isdigit():
                for _ in range(int(ch)):
                    row_vecs.append(vec.copy())
                continue
            if ch.isupper():
                vec[PIECE_MAP[ch]] = 1
            else:
                vec[PIECE_MAP[ch.upper()] + len(PIECE_MAP)] = 1
            row_vecs.append(vec)
        tensor_rows.append(row_vecs)
    return np.array(tensor_rows, dtype=np.float32)

def to_example(row: pd.Series) -> tf.train.Example:
    tensor = fen_to_tensor(row['FEN']).flatten().astype(np.float32)
    feature = {
        'board': tf.train.Feature(float_list=tf.train.FloatList(value=tensor)),
        TARGET_CLASS_COL: tf.train.Feature(int64_list=tf.train.Int64List(value=[int(row[TARGET_CLASS_COL])])),
        TARGET_REG_COL: tf.train.Feature(float_list=tf.train.FloatList(value=[float(row[TARGET_REG_COL])])),
    }
    for col in AUX_FEATURES:
        feature[col] = tf.train.Feature(float_list=tf.train.FloatList(value=[float(row[col])]))
    return tf.train.Example(features=tf.train.Features(feature=feature))

def write_tfrecord(split: str, limit: int | None = None):
    input_path = PROCESSED_DIR / f'{split}_enriched.parquet'
    assert input_path.exists(), f'No se encontró {input_path}'
    df = pd.read_parquet(input_path)
    if limit is not None and limit < len(df):
        df = df.sample(limit, random_state=42)
    output_path = TFRECORD_DIR / f'{split}.tfrecord'
    with tf.io.TFRecordWriter(str(output_path)) as writer:
        for _, row in df.iterrows():
            example = to_example(row)
            writer.write(example.SerializeToString())
    SPLIT_SIZES[split] = len(df)
    print(f"{split}: {len(df):,} ejemplos -> {output_path}")


In [21]:
write_tfrecord('training', MAX_TRAIN)
write_tfrecord('validation', MAX_VAL)
write_tfrecord('testing', MAX_TEST)


training: 1,000,000 ejemplos -> ..\data\tfrecords\training.tfrecord
validation: 200,000 ejemplos -> ..\data\tfrecords\validation.tfrecord
testing: 200,000 ejemplos -> ..\data\tfrecords\testing.tfrecord


In [22]:
TRAIN_STEPS = math.ceil(SPLIT_SIZES['training'] / BATCH_SIZE)
VAL_STEPS = math.ceil(SPLIT_SIZES['validation'] / BATCH_SIZE)
TEST_STEPS = math.ceil(SPLIT_SIZES['testing'] / BATCH_SIZE)
print(f"Steps/epoch → train: {TRAIN_STEPS:,}, val: {VAL_STEPS:,}, test: {TEST_STEPS:,}")

Steps/epoch → train: 3,907, val: 782, test: 782


## 2. Pipeline `tf.data` con augmentación por simetrías

In [23]:
feature_description = {
    'board': tf.io.FixedLenFeature([BOARD_SHAPE[0] * BOARD_SHAPE[1] * BOARD_SHAPE[2]], tf.float32),
    TARGET_CLASS_COL: tf.io.FixedLenFeature([], tf.int64),
    TARGET_REG_COL: tf.io.FixedLenFeature([], tf.float32),
}
for col in AUX_FEATURES:
    feature_description[col] = tf.io.FixedLenFeature([], tf.float32)

def parse_example(serialized):
    parsed = tf.io.parse_single_example(serialized, feature_description)
    board = tf.reshape(parsed.pop('board'), BOARD_SHAPE)
    class_id = tf.cast(parsed.pop(TARGET_CLASS_COL), tf.int32)
    reg_target = parsed.pop(TARGET_REG_COL)
    class_label = tf.one_hot(class_id, 3)
    aux_vec = tf.stack([parsed[col] for col in AUX_FEATURES], axis=-1)
    return (board, aux_vec), (class_label, reg_target)

def augment(board, aux):
    if tf.random.uniform([]) > 0.5:
        board = tf.image.flip_left_right(board)
    if tf.random.uniform([]) > 0.5:
        board = tf.image.flip_up_down(board)
    board = tf.ensure_shape(board, BOARD_SHAPE)
    return board, aux

def make_dataset(split: str, batch_size: int, training: bool = True):
    tfrecord_path = TFRECORD_DIR / f'{split}.tfrecord'
    ds = tf.data.TFRecordDataset(str(tfrecord_path))
    ds = ds.map(parse_example, num_parallel_calls=AUTOTUNE)
    if training:
        ds = ds.shuffle(10_000)
    ds = ds.repeat()
    if training:
        def _map(features, labels):
            board_aug, aux_aug = augment(features[0], features[1])
            return (board_aug, aux_aug), labels
        ds = ds.map(_map, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(AUTOTUNE)
    return ds


## 3. Modelo multitarea (clasificación + regresión)

In [24]:
from keras import layers, models, regularizers

def build_multitask_model(board_shape=BOARD_SHAPE, aux_dim=len(AUX_FEATURES), l2w=1e-4, dropout=0.3):
    board_input = layers.Input(shape=board_shape, name='board_input')
    aux_input = layers.Input(shape=(aux_dim,), name='aux_input')

    x = layers.Conv2D(128, 3, padding='same', activation='relu', kernel_regularizer=regularizers.l2(l2w))(board_input)
    x = layers.BatchNormalization()(x)
    x = layers.Conv2D(128, 3, padding='same', activation='relu', kernel_regularizer=regularizers.l2(l2w))(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D()(x)
    x = layers.Conv2D(256, 3, padding='same', activation='relu', kernel_regularizer=regularizers.l2(l2w))(x)
    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(dropout)(x)

    aux_branch = layers.Dense(64, activation='relu')(aux_input)
    aux_branch = layers.Dropout(dropout)(aux_branch)

    merged = layers.concatenate([x, aux_branch])
    shared = layers.Dense(128, activation='relu')(merged)
    shared = layers.Dropout(dropout)(shared)

    class_head = layers.Dense(64, activation='relu')(shared)
    class_output = layers.Dense(3, activation='softmax', name='class_head')(class_head)

    reg_head = layers.Dense(64, activation='relu')(shared)
    reg_output = layers.Dense(1, activation='linear', name='reg_head')(reg_head)

    model = models.Model(inputs=[board_input, aux_input], outputs=[class_output, reg_output])
    model.compile(
        optimizer='adam',
        loss={'class_head': 'categorical_crossentropy', 'reg_head': 'mae'},
        loss_weights={'class_head': 1.0, 'reg_head': 0.5},
        metrics={'class_head': ['accuracy', tf.keras.metrics.AUC(name='auc', multi_label=False)],
                 'reg_head': ['mae', 'mse']}
    )
    return model

model = build_multitask_model()
model.summary()


## 4. Entrenamiento y evaluación

In [None]:
train_ds = make_dataset('training', BATCH_SIZE, training=True)
val_ds = make_dataset('validation', BATCH_SIZE, training=False)
test_ds = make_dataset('testing', BATCH_SIZE, training=False)

callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='val_class_head_accuracy', mode='max', patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='val_class_head_accuracy', mode='max', factor=0.5, patience=3, min_lr=1e-5),
]

history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=30,
    steps_per_epoch=TRAIN_STEPS,
    validation_steps=VAL_STEPS,
    callbacks=callbacks
)

eval_results = model.evaluate(test_ds, steps=TEST_STEPS)
print('Test metrics:', eval_results)

pred_class, pred_cp = model.predict(test_ds, steps=TEST_STEPS)


Epoch 1/30
[1m3907/3907[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m233s[0m 59ms/step - class_head_accuracy: 0.5478 - class_head_auc: 0.7428 - class_head_loss: 0.9297 - loss: 144.4704 - reg_head_loss: 286.7777 - reg_head_mae: 286.7777 - reg_head_mse: 210709.8125 - val_class_head_accuracy: 0.5750 - val_class_head_auc: 0.7740 - val_class_head_loss: 0.8817 - val_loss: 137.6219 - val_reg_head_loss: 272.9683 - val_reg_head_mae: 272.9683 - val_reg_head_mse: 197677.1562 - learning_rate: 0.0010
Epoch 2/30
[1m3907/3907[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m232s[0m 59ms/step - class_head_accuracy: 0.5749 - class_head_auc: 0.7718 - class_head_loss: 0.8834 - loss: 137.5029 - reg_head_loss: 272.5521 - reg_head_mae: 272.5521 - reg_head_mse: 196359.1875 - val_class_head_accuracy: 0.5815 - val_class_head_auc: 0.7818 - val_class_head_loss: 0.8652 - val_loss: 134.4556 - val_reg_head_loss: 266.3307 - val_reg_head_mae: 266.3307 - val_reg_head_mse: 188242.7188 - learning_rate: 0.0010
Epoch 

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix, mean_absolute_error, mean_squared_error

def collect_true_labels(dataset, steps):
    cls_list, reg_list = [], []
    for (_, _), (cls, reg) in dataset.take(steps):
        cls_list.append(cls.numpy())
        reg_list.append(reg.numpy())
    y_cls = np.concatenate(cls_list, axis=0)
    y_reg = np.concatenate(reg_list, axis=0)
    return y_cls, y_reg

y_true_cls_onehot, y_true_reg = collect_true_labels(test_ds, TEST_STEPS)
y_true_cls = np.argmax(y_true_cls_onehot, axis=1)
y_pred_cls = np.argmax(pred_class, axis=1)

print(classification_report(y_true_cls, y_pred_cls, target_names=['Ventaja','Empate','Desventaja']))
cm = confusion_matrix(y_true_cls, y_pred_cls)
fig, ax = plt.subplots(figsize=(4,4))
im = ax.imshow(cm, cmap='Blues')
ax.set_xticks([0,1,2]); ax.set_yticks([0,1,2])
ax.set_xticklabels(['V','=', 'D']); ax.set_yticklabels(['V','=', 'D'])
for i in range(3):
    for j in range(3):
        ax.text(j, i, cm[i, j], ha='center', va='center')
ax.set_title('Matriz de confusión (lado activo)')
plt.colorbar(im); plt.tight_layout(); plt.show()

y_true_reg_flat = y_true_reg.squeeze()
y_pred_reg_flat = pred_cp.squeeze()
mae = mean_absolute_error(y_true_reg_flat, y_pred_reg_flat)
rmse = mean_squared_error(y_true_reg_flat, y_pred_reg_flat, squared=False)
print(f'Clasificación samples: {len(y_true_cls):,} | Regresión MAE: {mae:.1f} cp | RMSE: {rmse:.1f} cp')
