# 🧠 Hybrid Neural Chess Engine (Keras)

### Learning from Chess.com (Hikaru Nakamura) + Self-Play Reinforcement

## ✅ SECTION 0 — Setup (Colab Compatible)

In [None]:
!pip -q install python-chess tensorflow requests tqdm

import os
import random
import requests
import numpy as np
from tqdm.auto import tqdm

import chess
import chess.pgn

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)
print('TensorFlow:', tf.__version__)


## 📌 SECTION 1 — BUSINESS UNDERSTANDING

This notebook demonstrates the same hybrid logic in **Keras**:
- Imitation learning on real Chess.com games.
- Self-play reinforcement-style rollout for policy improvement loops.
- Policy + Value split.
- CNN + Transformer architecture for spatial/contextual reasoning.

## 🌐 SECTION 2 — DOWNLOAD REAL DATA FROM CHESS.COM

In [None]:
def get_chesscom_archives(username: str):
    url = f"https://api.chess.com/pub/player/{username}/games/archives"
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.json().get("archives", [])


def download_chesscom_pgn(username: str, max_archives: int = 12, out_path: str = "hikaru_chesscom.pgn"):
    archives = get_chesscom_archives(username)
    archives = archives[::-1][:max_archives]  # latest first

    if not archives:
        raise ValueError(f"No archives found for user: {username}")

    all_pgn_chunks = []
    for archive_url in tqdm(archives, desc="Downloading monthly PGNs"):
        pgn_url = archive_url + "/pgn"
        r = requests.get(pgn_url, timeout=60)
        if r.status_code == 200 and r.text.strip():
            all_pgn_chunks.append(r.text.strip())

    if not all_pgn_chunks:
        raise ValueError("Could not download PGN data from Chess.com archives.")

    with open(out_path, "w", encoding="utf-8") as f:
        f.write("\n\n".join(all_pgn_chunks))

    return out_path, len(all_pgn_chunks)


pgn_path, months_downloaded = download_chesscom_pgn("hikaru", max_archives=12, out_path="hikaru_chesscom.pgn")
print(f"Saved PGN to: {pgn_path} | months downloaded: {months_downloaded}")


## 📊 SECTION 3 — DATA PREPARATION

In [None]:
def board_to_tensor(board: chess.Board) -> np.ndarray:
    # Channel-first for easier chess encoding; we'll transpose to channel-last for Keras.
    tensor = np.zeros((12, 8, 8), dtype=np.float32)
    for square, piece in board.piece_map().items():
        row = 7 - chess.square_rank(square)
        col = chess.square_file(square)
        piece_type = piece.piece_type - 1
        color_offset = 0 if piece.color == chess.WHITE else 6
        tensor[piece_type + color_offset, row, col] = 1.0
    return tensor


def generate_move_vocab():
    files = "abcdefgh"
    ranks = "12345678"
    promotions = ["q", "r", "b", "n"]

    moves = set()
    for ff in files:
        for fr in ranks:
            for tf in files:
                for tr in ranks:
                    if ff == tf and fr == tr:
                        continue
                    base = f"{ff}{fr}{tf}{tr}"
                    moves.add(base)
                    if (fr == "7" and tr == "8") or (fr == "2" and tr == "1"):
                        for p in promotions:
                            moves.add(base + p)

    moves = sorted(moves)
    move_to_idx = {m: i for i, m in enumerate(moves)}
    idx_to_move = {i: m for m, i in move_to_idx.items()}
    return moves, move_to_idx, idx_to_move


all_moves, move_to_idx, idx_to_move = generate_move_vocab()
print('Move vocab size:', len(all_moves))


## 📦 SECTION 4 — DATASET BUILD (PGN -> NUMPY)

In [None]:
def load_positions_from_pgn(pgn_file: str, max_games: int = 500):
    X, y = [], []
    loaded_games = 0

    with open(pgn_file, "r", encoding="utf-8", errors="ignore") as f:
        while True:
            game = chess.pgn.read_game(f)
            if game is None:
                break

            board = game.board()
            for mv in game.mainline_moves():
                encoded = board_to_tensor(board)              # (12,8,8)
                encoded = np.transpose(encoded, (1, 2, 0))   # (8,8,12) for Keras Conv2D
                X.append(encoded)
                y.append(move_to_idx.get(mv.uci(), 0))
                board.push(mv)

            loaded_games += 1
            if max_games is not None and loaded_games >= max_games:
                break

    X = np.asarray(X, dtype=np.float32)
    y = np.asarray(y, dtype=np.int32)
    print(f"Loaded games: {loaded_games}, positions: {len(X)}")
    return X, y


X, y = load_positions_from_pgn(pgn_path, max_games=500)

# train/val split
split = int(0.9 * len(X)) if len(X) > 1 else len(X)
X_train, y_train = X[:split], y[:split]
X_val, y_val = X[split:], y[split:]
print('Train:', X_train.shape, 'Val:', X_val.shape)


## 🧠 SECTION 5 — KERAS TRANSFORMER ARCHITECTURE

In [None]:
def transformer_block(x, num_heads=8, ff_dim=512, dropout=0.1):
    attn_out = layers.MultiHeadAttention(num_heads=num_heads, key_dim=x.shape[-1])(x, x)
    attn_out = layers.Dropout(dropout)(attn_out)
    x = layers.LayerNormalization(epsilon=1e-6)(x + attn_out)

    ff = layers.Dense(ff_dim, activation='relu')(x)
    ff = layers.Dense(x.shape[-1])(ff)
    ff = layers.Dropout(dropout)(ff)
    return layers.LayerNormalization(epsilon=1e-6)(x + ff)


def build_policy_model(move_vocab_size: int):
    inp = keras.Input(shape=(8, 8, 12), name='board')
    x = layers.Conv2D(64, 3, padding='same', activation='relu')(inp)
    x = layers.Conv2D(128, 3, padding='same', activation='relu')(x)

    # Convert spatial map to sequence of 64 tokens
    x = layers.Reshape((64, 128))(x)
    x = layers.Dense(256)(x)

    x = transformer_block(x, num_heads=8, ff_dim=512)
    x = transformer_block(x, num_heads=8, ff_dim=512)

    x = layers.GlobalAveragePooling1D()(x)
    logits = layers.Dense(move_vocab_size, name='policy_logits')(x)

    model = keras.Model(inp, logits, name='policy_model')
    model.compile(
        optimizer=keras.optimizers.Adam(1e-3),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=['accuracy']
    )
    return model


def build_value_model():
    inp = keras.Input(shape=(8, 8, 12), name='board')
    x = layers.Conv2D(64, 3, padding='same', activation='relu')(inp)
    x = layers.Flatten()(x)
    x = layers.Dense(256, activation='relu')(x)
    out = layers.Dense(1, activation='tanh', name='value')(x)

    model = keras.Model(inp, out, name='value_model')
    model.compile(optimizer=keras.optimizers.Adam(1e-3), loss='mse')
    return model


policy_model = build_policy_model(len(move_to_idx))
value_model = build_value_model()
policy_model.summary()


## 🎯 SECTION 6 — IMITATION LEARNING

In [None]:
if len(X_train) > 0:
    history = policy_model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val) if len(X_val) > 0 else None,
        epochs=2,
        batch_size=32,
        verbose=1
    )
else:
    print('Dataset is empty. Check PGN download and parsing.')


## 🔥 SECTION 7 — SELF-PLAY REINFORCEMENT (SIMPLIFIED)

In [None]:
def board_to_keras_input(board: chess.Board) -> np.ndarray:
    arr = board_to_tensor(board)
    arr = np.transpose(arr, (1, 2, 0))
    return np.expand_dims(arr, axis=0).astype(np.float32)


def select_legal_move_from_logits(board: chess.Board, logits: np.ndarray):
    legal = list(board.legal_moves)
    legal_indices = [move_to_idx[m.uci()] for m in legal if m.uci() in move_to_idx]
    if not legal_indices:
        mv = random.choice(legal)
        return mv, move_to_idx.get(mv.uci(), 0)

    local_logits = logits[0, legal_indices]
    probs = tf.nn.softmax(local_logits).numpy()
    probs = probs / probs.sum()
    local_choice = np.random.choice(len(legal_indices), p=probs)
    move_idx = legal_indices[local_choice]
    return chess.Move.from_uci(idx_to_move[move_idx]), move_idx


def self_play_game(max_plies=200):
    board = chess.Board()
    trajectory = []

    while not board.is_game_over() and len(trajectory) < max_plies:
        state = board_to_keras_input(board)
        logits = policy_model.predict(state, verbose=0)
        move, move_idx = select_legal_move_from_logits(board, logits)
        trajectory.append((state, move_idx, board.turn))
        board.push(move)

    result = board.result()
    reward = 1 if result == '1-0' else -1 if result == '0-1' else 0
    return trajectory, reward, result


for i in range(3):
    _, reward, result = self_play_game()
    print(f'Self-play game {i+1}: {result}, reward={reward}')


## 🎮 SECTION 8 — PLAY AGAINST ENGINE

In [None]:
def predict_move(board: chess.Board) -> chess.Move:
    state = board_to_keras_input(board)
    logits = policy_model.predict(state, verbose=0)

    legal = list(board.legal_moves)
    legal_indices = [move_to_idx[m.uci()] for m in legal if m.uci() in move_to_idx]
    if not legal_indices:
        return random.choice(legal)

    local_logits = logits[0, legal_indices]
    best_local = int(np.argmax(local_logits))
    best_idx = legal_indices[best_local]
    return chess.Move.from_uci(idx_to_move[best_idx])


board = chess.Board()
print('Engine move from start:', predict_move(board).uci())


## 📊 SECTION 9 — EVALUATION

- Top-1 and Top-5 move prediction on held-out Chess.com games.
- Win rate vs random legal-move baseline.
- Optional Elo-like checkpoint tracking.