# RBC AlphaZero-like Bot

Reconnaissance Blind Chess (RBC) is a partially observable variant of chess in which players have perfect information about their own pieces but only limited observations of the opponent.

This notebook describes a learning-based RBC agent inspired by the AlphaZero framework, combining neural network evaluation, search, and self-play training. The emphasis is on a clear and rule-compliant implementation that can be trained and evaluated end to end.

##DEPENDENCIES


This section lists the external libraries required to run the notebook.

The implementation relies on standard numerical and deep learning tools for tensor computation and optimization, together with a chess engine library and the ReconChess framework to ensure correct handling of game rules and interaction between agents.

These dependencies provide the basic infrastructure for representing game states, running self-play matches, training neural networks, and evaluating the resulting agent.

In [None]:
%pip -q install python-chess reconchess


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.1/6.1 MB[0m [31m136.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.7/63.7 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for chess (setup.py) ... [?25l[?25hdone


Install required dependencies.
 - python-chess: standard chess representation and move generation
 - reconchess: official framework for Reconnaissance Blind Chess,
   enabling rule-compliant gameplay against other bots


In [None]:
import os
import math
import  random
from dataclasses import dataclass
from typing import Dict, Tuple, List, Optional, Any
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import chess
import reconchess
from reconchess import Player, Color, Square
import csv
import datetime

#REPRODUCIBILITY

This section lists the external libraries required to run the notebook.

The implementation relies on standard numerical and deep learning tools for tensor computation and optimization, together with a chess engine library and the ReconChess framework to ensure correct handling of game rules and interaction between agents.

These dependencies provide the basic infrastructure for representing game states, running self-play matches, training neural networks, and evaluating the resulting agent.

In [None]:
import sys
from pathlib import Path
sys.path.append(str(Path("..").resolve()))

from src.utils import set_seeds
from src.config import DEVICE

set_seeds(0)
print("DEVICE:", DEVICE)


## ACTION ENCODING (20480 fixed policy head)


In order to use a fixed-size policy head, all possible chess moves are mapped to a discrete action space of fixed dimensionality.

Each move is encoded as an index in a predefined action set of size 20,480, covering all standard chess moves, including promotions. This encoding allows the policy network to produce a fixed-length output independent of the current position.

During play, only the subset of actions corresponding to legal moves provided by the game environment is considered, while the remaining entries are masked implicitly


In [None]:
from src.encoding import (
    PROMO_TO_ID,
    ID_TO_PROMO,
    POLICY_SIZE,
    move_to_index,
    index_to_move,   # se esiste nel tuo blocco
)


## BELIEF TENSOR (7 channels) + SENSE UPDATE


Uncertainty about the opponent’s pieces is represented through a per-square belief tensor with seven channels, corresponding to the six standard chess piece types plus an explicit EMPTY channel.

For each board square, the belief tensor stores a probability distribution over these channels, normalized independently per square. This representation makes uncertainty explicit while remaining simple and easy to inspect.

Sensing actions update the belief deterministically within the sensed 3×3 region: observed squares are set to the corresponding piece type or to EMPTY, while beliefs outside the sensed area remain unchanged. This local update rule provides a lightweight mechanism to incorporate new information without maintaining a full probabilistic game history.

Belief tensor: definitions

In [None]:
PIECE_TYPES_6 = ["P", "N", "B", "R", "Q", "K"]
EMPTY = "EMPTY"
CHANNELS_7 = PIECE_TYPES_6 + [EMPTY]
C_BELIEF = 7
CH2I = {ch: i for i, ch in enumerate(CHANNELS_7)}
START_COUNTS = {"P": 8, "N": 2, "B": 2, "R": 2, "Q": 1, "K": 1}


Normalization over channels (to make it a true distribution)

In [None]:
def normalize_over_channels(B: torch.Tensor, eps: float = 1e-8) -> torch.Tensor:
    s = B.sum(dim=0, keepdim=True).clamp_min(eps)
    return B / s


In [None]:
def piece_symbol_upper(pc: chess.Piece) -> str:
    return pc.symbol().upper()

Belief's initialization from the standard board position

In [None]:
def init_belief_from_initial(my_color: chess.Color) -> torch.Tensor:
    """
    RBC starts from the standard chess initial position.
    We initialize the opponent belief with a strong prior: probability 1 on the opponent’s
    initial piece placement, and EMPTY elsewhere.
    This is a reasonable baseline because the starting configuration is known in RBC;
    only after the game starts the opponent becomes hidden.
    """
    B = torch.zeros((C_BELIEF, 8, 8), dtype=torch.float32)
    B[CH2I[EMPTY]].fill_(1.0)
    board = chess.Board()
    opp_color = not my_color
    for sq, pc in board.piece_map().items():
        if pc.color != opp_color:
            continue
        r = chess.square_rank(sq)
        f = chess.square_file(sq)
        sym = piece_symbol_upper(pc)
        B[:, r, f] = 0.0
        B[CH2I[sym], r, f] = 1.0
    return normalize_over_channels(B)


Updates the opponent belief tensor after a sensing action.

In [None]:
def apply_sense_to_belief(
    B: torch.Tensor,
    sense_result: List[Tuple[Square, Optional[chess.Piece]]],
    my_color: chess.Color
) -> torch.Tensor:
    """Update opponent belief using ReconChess sense results (copy-based, not in-place)."""
    B2 = B.clone()
    for sq, pc in sense_result:
        r = chess.square_rank(sq)
        f = chess.square_file(sq)
        B2[:, r, f] = 0.0
        if pc is None or pc.color == my_color:
            B2[CH2I[EMPTY], r, f] = 1.0
        else:
            sym = piece_symbol_upper(pc)
            B2[CH2I[sym], r, f] = 1.0
    return normalize_over_channels(B2)

## SENSE SELECTION: entropy-max 3×3 center


At each turn, the agent selects a sensing action by evaluating the uncertainty of the opponent’s belief distribution.

For each allowed sensing square, the total entropy over the corresponding 3×3 region is computed, and the square that maximizes this value is selected. This heuristic prioritizes sensing actions that are expected to provide the largest reduction in uncertainty.

The approach is purely information-driven and independent of the immediate move selection, making it simple, efficient, and consistent with the belief representation.

In [None]:
SENSE_OFFSETS = [(dr, df) for dr in (-1,0,1) for df in (-1,0,1)]
def square_entropy(p: torch.Tensor, eps: float = 1e-8) -> float:
    p = p.clamp_min(eps)
    return float(-(p * p.log()).sum().item())
def choose_sense_square_entropy(B: torch.Tensor, sense_actions: List[Square]) -> Square:
    """Choose among *allowed* sense_actions provided by ReconChess."""
    best_sq = sense_actions[0]
    best_e = -1e18
    for sq in sense_actions:
        r0 = chess.square_rank(sq)
        f0 = chess.square_file(sq)
        tot = 0.0
        for dr, df in SENSE_OFFSETS:
            r = r0 + dr; f = f0 + df
            if 0 <= r < 8 and 0 <= f < 8:
                tot += square_entropy(B[:, r, f])
        if tot > best_e:
            best_e = tot
            best_sq = sq
    return best_sq


## GREEDY DETERMINIZATION FROM BELIEF + remaining opponent inventory


To enable fast planning with standard chess move generation, the opponent’s hidden position is approximated by constructing a single fully specified “determinized” board state from the belief tensor.

For each opponent piece type, the algorithm places the remaining pieces on the highest-probability squares according to the belief distribution, while respecting already occupied squares (including all known own pieces). A simple opponent inventory is maintained to ensure that the determinized position contains a consistent number of pieces of each type.

The resulting determinized board is used only as a hypothesis for search and evaluation; it provides a concrete state on which legal moves can be checked and simulated efficiently.

Determinize the hidden opponent position from belief.

In [None]:
def greedy_determinize_opponent(
    B: torch.Tensor,
    own_board: chess.Board,
    my_color: chess.Color,
    opp_counts: Dict[str, int],
) -> chess.Board:
    det = chess.Board(None)
    det.clear()
    for sq, pc in own_board.piece_map().items():
        det.set_piece_at(sq, pc)
    occupied = set(own_board.piece_map().keys())
    opp_color = not my_color
    for sym in PIECE_TYPES_6:
        k = int(opp_counts.get(sym, 0))
        if k <= 0:
            continue
        probs = []
        ch = CH2I[sym]
        for sq in chess.SQUARES:
            if sq in occupied:
                continue
            r = chess.square_rank(sq)
            f = chess.square_file(sq)
            probs.append((float(B[ch, r, f].item()), sq))
        probs.sort(reverse=True, key=lambda x: x[0])
        placed = 0
        for _, sq in probs:
            if sq in occupied:
                continue
            psym = sym.lower() if opp_color == chess.BLACK else sym
            det.set_piece_at(sq, chess.Piece.from_symbol(psym))
            occupied.add(sq)
            placed += 1
            if placed >= k:
                break
    det.turn = own_board.turn
    det.castling_rights = own_board.castling_rights
    return det

In [None]:
def apply_taken_move_to_own_board(own_board: chess.Board, mv: chess.Move, my_color: chess.Color) -> None:
    pc = own_board.piece_at(mv.from_square)
    if pc is None or pc.color != my_color:
        return
    own_board.remove_piece_at(mv.from_square)
    if mv.promotion is not None and pc.piece_type == chess.PAWN:
        pc = chess.Piece(mv.promotion, my_color)
    own_board.set_piece_at(mv.to_square, pc)
    if pc.piece_type == chess.KING:
        f_from = chess.square_file(mv.from_square)
        f_to = chess.square_file(mv.to_square)
        r_rank = chess.square_rank(mv.from_square)
        if abs(f_to - f_from) == 2:
            if f_to > f_from:
                rook_from = chess.square(7, r_rank)
                rook_to = chess.square(5, r_rank)
            else:
                rook_from = chess.square(0, r_rank)
                rook_to = chess.square(3, r_rank)
            rook = own_board.piece_at(rook_from)
            if rook is not None and rook.color == my_color and rook.piece_type == chess.ROOK:
                own_board.remove_piece_at(rook_from)
                own_board.set_piece_at(rook_to, rook)
    own_board.turn = not own_board.turn
    if my_color == chess.BLACK:
        own_board.fullmove_number += 1

## ENCODER (own pieces + belief + small metadata) → 15×8×8


The neural network input is a stack of 2D feature planes with fixed spatial resolution (8×8), producing a tensor of shape 15×8×8.

The encoding includes: (i) six binary planes for the agent’s own pieces (one per piece type), (ii) the seven-channel opponent belief tensor, and (iii) a small set of global metadata planes (side to move and a normalized move counter).

This representation keeps the input compact while preserving the spatial structure of the board, allowing convolutional layers to exploit local patterns and piece configurations.

In [None]:
PIECE_ORDER = [chess.PAWN, chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN, chess.KING]
def board_to_own_planes(board: chess.Board, my_color: chess.Color) -> torch.Tensor:
    X = torch.zeros((6, 8, 8), dtype=torch.float32)
    for sq, pc in board.piece_map().items():
        if pc.color != my_color:
            continue
        r = chess.square_rank(sq)
        f = chess.square_file(sq)
        i = PIECE_ORDER.index(pc.piece_type)
        X[i, r, f] = 1.0
    return X
def metadata_planes(board: chess.Board, my_color: chess.Color) -> torch.Tensor:
    turn_plane = torch.full((1,8,8), 1.0 if board.turn == my_color else 0.0, dtype=torch.float32)
    ply = min(board.fullmove_number * 2, 200) / 200.0
    ply_plane = torch.full((1,8,8), float(ply), dtype=torch.float32)
    return torch.cat([turn_plane, ply_plane], dim=0)
def encode_state(own_board: chess.Board, my_color: chess.Color, B: torch.Tensor) -> torch.Tensor:
    own = board_to_own_planes(own_board, my_color)
    meta = metadata_planes(own_board, my_color)
    return torch.cat([own, B, meta], dim=0)


## SMALL POLICY/VALUE NET


The agent uses a lightweight convolutional neural network with a shared trunk and two output heads: a policy head and a value head.

The policy head produces logits over the fixed 20,480-action encoding, which are later restricted to the legal moves available in the current position. The value head outputs a single scalar estimating the expected game outcome from the current player’s perspective.

The network is intentionally small to keep self-play and training fast while still capturing the spatial structure of the board representation.

In [None]:
class FastPolicyValueNet(nn.Module):
    def __init__(self, in_ch: int = 15, trunk: int = 64):
        super().__init__()
        self.trunk = nn.Sequential(
            nn.Conv2d(in_ch, trunk, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(trunk, trunk, 3, padding=1),
            nn.ReLU(),
        )
        self.pol = nn.Sequential(nn.Conv2d(trunk, 32, 1), nn.ReLU())
        self.pol_fc = nn.Linear(32*8*8, POLICY_SIZE)
        self.val = nn.Sequential(nn.Conv2d(trunk, 16, 1), nn.ReLU())
        self.val_fc1 = nn.Linear(16*8*8, 64)
        self.val_fc2 = nn.Linear(64, 1)
    def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        h = self.trunk(x)
        logits = self.pol_fc(self.pol(h).flatten(1))
        v = torch.tanh(self.val_fc2(F.relu(self.val_fc1(self.val(h).flatten(1))))).squeeze(-1)
        return logits, v


## ROOT-only PUCT (search on determinization, choose among provided legal actions)


Move selection is performed using a lightweight, root-only PUCT search guided by the network’s policy priors.

The search is run on the determinized board hypothesis and considers only the move actions provided by the ReconChess environment for the current turn. Each simulation selects the move that maximizes a PUCT score combining an exploitation term (estimated value) and an exploration term weighted by the network prior.

The final move is chosen from the resulting visit counts, producing a search-improved policy target that is also reused during training.

In [None]:
@torch.no_grad()
def nn_priors_and_value(model: nn.Module, x: torch.Tensor, legal_moves: List[chess.Move], device: str) -> Tuple[Dict[int,float], float]:
    model.eval()
    xb = x.unsqueeze(0).to(device)
    logits, v = model(xb)
    logits = logits[0].cpu()
    v = float(v.item())
    idxs = [move_to_index(m) for m in legal_moves]
    l = logits[idxs]
    p = torch.softmax(l, dim=0).numpy()
    pri = {idxs[i]: float(p[i]) for i in range(len(idxs))}
    return pri, v
@torch.no_grad()
def puct_root(
    model: nn.Module,
    x_root: torch.Tensor,
    det_board: chess.Board,
    move_actions: List[chess.Move],
    sims: int = 80,
    c_puct: float = 1.5,
    device: str = "cpu",
) -> Dict[int,int]:
    if not move_actions:
        return {}
    priors, _ = nn_priors_and_value(model, x_root, move_actions, device=device)
    N = 0
    N_a: Dict[int,int] = {move_to_index(m): 0 for m in move_actions}
    W_a: Dict[int,float] = {move_to_index(m): 0.0 for m in move_actions}
    def Q(a):
        n = N_a[a]
        return 0.0 if n == 0 else W_a[a] / n
    for _ in range(sims):
        N += 1
        best_a = None
        best_s = -1e18
        for m in move_actions:
            a = move_to_index(m)
            u = c_puct * priors.get(a, 0.0) * math.sqrt(N) / (1 + N_a[a])
            s = Q(a) + u
            if s > best_s:
                best_s = s
                best_a = a
        mv = index_to_move(best_a)
        b2 = det_board.copy()
        if mv not in b2.legal_moves:
            v_leaf = -1.0
        else:
            b2.push(mv)
            own2 = chess.Board(None); own2.clear()
            for sq, pc in b2.piece_map().items():
                if pc.color == det_board.turn:
                    pass
            _, v = model(x_root.unsqueeze(0).to(device))
            v_leaf = float(v.item())
        N_a[best_a] += 1
        W_a[best_a] += float(v_leaf)
    return N_a
def visits_to_policy_target(N_a: Dict[int,int]) -> np.ndarray:
    P = np.zeros((POLICY_SIZE,), dtype=np.float32)
    if not N_a:
        return P
    total = sum(N_a.values())
    for a, n in N_a.items():
        P[a] = n / max(1, total)
    return P

## THE **RECONCHESS PLAYER** (FAST, rule-compliant)


The agent is implemented as a ReconChess Player, fully compliant with the game’s interface and rules.

All game interactions—including sensing, move selection, belief updates, and board state tracking—are handled through the standard ReconChess callbacks. This ensures that the agent can play against other bots without relying on privileged information or modified game mechanics. The focus is on robustness and correct interaction rather than maximal performance.


In [None]:
@dataclass
class FastBotConfig:
    sims: int = 80
    c_puct: float = 1.5
class RBCFastAZPlayer(Player):
    def __init__(self, model: nn.Module, cfg: FastBotConfig, seed: int = 0):
        self.model = model
        self.cfg = cfg
        self.rng = np.random.default_rng(seed)
        self.color: Optional[chess.Color] = None
        self.own_board: Optional[chess.Board] = None
        self.B: Optional[torch.Tensor] = None
        self.opp_counts: Optional[Dict[str,int]] = None
        self.store_training = False
        self.X: List[np.ndarray] = []
        self.P: List[np.ndarray] = []
        self.Z: List[float] = []
    def handle_game_start(self, color: Color, board: chess.Board, opponent_name: str):
        self.color = bool(color)
        self.own_board = chess.Board(None)
        self.own_board.clear()
        for sq, pc in board.piece_map().items():
            if pc.color == self.color:
                self.own_board.set_piece_at(sq, pc)
        self.own_board.turn = board.turn
        self.own_board.castling_rights = board.castling_rights
        self.B = init_belief_from_initial(self.color)
        self.opp_counts = dict(START_COUNTS)
    def handle_opponent_move_result(self, captured_my_piece: bool, capture_square: Optional[Square]):
        if captured_my_piece and capture_square is not None:
            if self.own_board is not None:
                self.own_board.remove_piece_at(capture_square)
    def choose_sense(self, sense_actions: List[Square], move_actions: List[chess.Move], seconds_left: float) -> Square:
        assert self.B is not None
        return choose_sense_square_entropy(self.B, sense_actions)
    def handle_sense_result(self, sense_result: List[Tuple[Square, Optional[chess.Piece]]]):
        assert self.B is not None and self.color is not None
        self.B = apply_sense_to_belief(self.B, sense_result, self.color)
    def choose_move(self, move_actions: List[chess.Move], seconds_left: float) -> Optional[chess.Move]:
        assert self.own_board is not None and self.color is not None and self.B is not None and self.opp_counts is not None
        if not move_actions:
            return None
        x_root = encode_state(self.own_board, self.color, self.B)
        det = greedy_determinize_opponent(self.B, self.own_board, self.color, self.opp_counts)
        det.turn = self.own_board.turn
        N_a = puct_root(
            model=self.model,
            x_root=x_root,
            det_board=det,
            move_actions=move_actions,
            sims=self.cfg.sims,
            c_puct=self.cfg.c_puct,
            device=DEVICE,
        )
        if self.store_training:
            self.X.append(x_root.detach().cpu().numpy().astype(np.float32))
            self.P.append(visits_to_policy_target(N_a))
        if N_a:
            best_a = max(N_a.items(), key=lambda kv: kv[1])[0]
            mv = index_to_move(best_a)
            if mv in move_actions:
                return mv
        return move_actions[int(self.rng.integers(0, len(move_actions)))]
    def handle_move_result(
        self,
        requested_move: Optional[chess.Move],
        taken_move: Optional[chess.Move],
        captured_opponent_piece: bool,
        capture_square: Optional[Square],
    ):
        if self.own_board is None or self.color is None:
            return
        if taken_move is not None:
            apply_taken_move_to_own_board(self.own_board, taken_move, self.color)
        if captured_opponent_piece and capture_square is not None and self.opp_counts is not None and self.B is not None:
            r = chess.square_rank(capture_square)
            f = chess.square_file(capture_square)
            probs = self.B[:, r, f].detach().cpu()
            best_sym = None
            best_p = -1.0
            for sym in PIECE_TYPES_6:
                p = float(probs[CH2I[sym]].item())
                if p > best_p and self.opp_counts.get(sym, 0) > 0:
                    best_p = p
                    best_sym = sym
            if best_sym is None:
                if self.opp_counts.get("P", 0) > 0:
                    best_sym = "P"
                else:
                    for sym in ["N", "B", "R", "Q", "K"]:
                        if self.opp_counts.get(sym, 0) > 0:
                            best_sym = sym
                            break
            if best_sym is not None:
                self.opp_counts[best_sym] = max(0, self.opp_counts.get(best_sym, 0) - 1)
    def handle_game_end(self, winner_color: Optional[Color], reason: str, game_history: Any):
        if not self.store_training or self.color is None:
            return
        if winner_color is None:
            z = 0.0
        else:
            z = 1.0 if bool(winner_color) == self.color else -1.0
        self.Z = [z] * len(self.X)


## LOCAL MATCH HARNESS (ReconChess) — smoke test


A local match harness is used to run short games against a baseline opponent, serving as an end-to-end smoke test for rule compliance and framework integration.

In [None]:
def try_discover_play_local_game():
    try:
        from reconchess.utilities import play_local_game
        return play_local_game
    except Exception:
        pass
    try:
        from reconchess.scripts.rc_play_game import play_local_game
        return play_local_game
    except Exception:
        pass
    raise ImportError(
        "Could not find play_local_game in reconchess. Install a version that provides local runner utilities."
    )
def smoke_test_vs_random(n_games: int = 5, seed: int = 0):
    play_local_game = try_discover_play_local_game()
    model = FastPolicyValueNet(in_ch=15, trunk=64).to(DEVICE)
    cfg = FastBotConfig(sims=40, c_puct=1.5)
    wins = 0
    losses = 0
    draws = 0
    for g in range(n_games):
        bot = RBCFastAZPlayer(model=model, cfg=cfg, seed=seed + g)
        opp = reconchess.bots.random_bot.RandomBot()
        winner_color, reason, history = play_local_game(bot, opp)
        if winner_color is None:
            draws += 1
        elif bool(winner_color) == chess.WHITE:
            wins += 1
        else:
            losses += 1
        print(f"[game {g}] winner={winner_color} reason={reason}")
    print({"wins": wins, "losses": losses, "draws": draws})


### Run smoke test (uncomment)


## RUN ALL CHECKS (fast sanity gate)
These checks are meant to fail fast if something is inconsistent. If they pass, the agent is generally safe to run in local matches and self-play


In [None]:
def run_all_checks():
    model = FastPolicyValueNet(in_ch=15, trunk=64).to(DEVICE)
    bot = RBCFastAZPlayer(model=model, cfg=FastBotConfig(sims=10, c_puct=1.5), seed=0)
    start_board = chess.Board()
    bot.handle_game_start(color=chess.WHITE, board=start_board, opponent_name="opp")
    assert bot.B is not None
    s = bot.B.sum(dim=0)
    assert float((s - 1.0).abs().max().item()) < 1e-4, "Belief not normalized per square"
    sense_actions = list(range(64))
    mv_actions = list(start_board.legal_moves)
    sq = bot.choose_sense(sense_actions, mv_actions, seconds_left=100.0)
    assert sq in sense_actions, "choose_sense returned illegal square"
    mv = bot.choose_move(mv_actions, seconds_left=100.0)
    assert (mv is None) or (mv in mv_actions), "choose_move returned move not in move_actions"
    print("Core invariants: OK")
    try:
        smoke_test_vs_random(n_games=2, seed=0)
        print("Smoke games: OK")
    except Exception as e:
        print("Smoke games: SKIPPED/FAILED (environment issue):", e)
run_all_checks()


Core invariants: OK
Smoke games: SKIPPED/FAILED (environment issue): Could not find play_local_game in reconchess. Install a version that provides local runner utilities.


# SELF-PLAY + TRAINING (FAST)

This section makes the bot trainable with minimal extra code:

1) ReplayBuffer in RAM (FAST)
2) Training step: KL(policy) + MSE(value)
3) Self-play game generator (ReconChess local runner)
4) Iterative loop: self-play → train → eval → checkpoint


## ReplayBuffer (RAM) + Dataset


Self-play generates training samples over time, so the implementation stores them in a replay buffer kept in RAM.

The buffer collects tuples (X,π,z), where X is the encoded state, π is the search-improved policy target, and z is the final game outcome from the player’s perspective. A maximum capacity is enforced by discarding the oldest samples, keeping the dataset bounded and biased toward more recent experience.

A lightweight PyTorch Dataset wrapper exposes the buffer in a format suitable for batching with a DataLoader, enabling standard supervised updates of the policy and value network.

In [None]:
class ReplayBuffer:
    def __init__(self, max_size: int = 200_000):
        self.max_size = max_size
        self.X: List[np.ndarray] = []
        self.P: List[np.ndarray] = []
        self.Z: List[float] = []
    def add(self, X: List[np.ndarray], P: List[np.ndarray], Z: List[float]):
        assert len(X) == len(P) == len(Z)
        self.X.extend(X)
        self.P.extend(P)
        self.Z.extend(Z)
        if len(self.X) > self.max_size:
            extra = len(self.X) - self.max_size
            self.X = self.X[extra:]
            self.P = self.P[extra:]
            self.Z = self.Z[extra:]
    def __len__(self) -> int:
        return len(self.X)
class BufferDataset(torch.utils.data.Dataset):
    def __init__(self, buf: ReplayBuffer):
        self.buf = buf
    def __len__(self) -> int:
        return len(self.buf)
    def __getitem__(self, idx: int):
        x = torch.from_numpy(self.buf.X[idx]).float()
        p = torch.from_numpy(self.buf.P[idx]).float()
        z = torch.tensor(self.buf.Z[idx], dtype=torch.float32)
        return x, p, z


## Training step (KL policy + MSE value)


Network parameters are updated using supervised learning on batches sampled from the replay buffer.

The policy head is trained by minimizing the Kullback–Leibler divergence between the network’s predicted action distribution and the search-derived policy target. In parallel, the value head is trained using a mean squared error loss against the final game outcome.

The two losses are combined into a single objective and optimized using standard gradient-based methods, with gradient clipping applied for stability.

In [None]:
def train_steps(
    model: nn.Module,
    opt: torch.optim.Optimizer,
    dl: torch.utils.data.DataLoader,
    device: str,
    steps: int = 200,
) -> Dict[str, float]:
    model.train()
    it = iter(dl)
    ema = {"loss": None, "lp": None, "lv": None}
    for _ in range(steps):
        try:
            x, p, z = next(it)
        except StopIteration:
            it = iter(dl)
            x, p, z = next(it)
        x = x.to(device)
        p = p.to(device)
        z = z.to(device)
        opt.zero_grad(set_to_none=True)
        logits, v_pred = model(x)
        logp = F.log_softmax(logits, dim=-1)
        loss_policy = F.kl_div(logp, p, reduction="batchmean")
        loss_value = F.mse_loss(v_pred, z)
        loss = loss_policy + loss_value
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()
        for k, val in [("loss", loss.item()), ("lp", loss_policy.item()), ("lv", loss_value.item())]:
            if ema[k] is None:
                ema[k] = float(val)
            else:
                ema[k] = 0.98 * ema[k] + 0.02 * float(val)
    return {k: float(v) for k, v in ema.items()}


## Checkpoint helpers


To support long-running experiments and allow training to be resumed across sessions, helper functions are provided to save and load model checkpoints.

Each checkpoint stores the network parameters, optimizer state, and basic training metadata, ensuring that training can be restarted consistently without loss of information.

In [None]:
def save_checkpoint(
    path: str,
    model: nn.Module,
    opt: torch.optim.Optimizer,
    step: int,
    extra: Optional[dict] = None
) -> None:
    ckpt = {
        "model": model.state_dict(),
        "opt": opt.state_dict(),
        "step": int(step),
        "extra": extra or {},
    }
    torch.save(ckpt, path)
def load_checkpoint(
    path: str,
    model: nn.Module,
    opt: Optional[torch.optim.Optimizer] = None
) -> dict:
    ckpt = torch.load(path, map_location="cpu")
    model.load_state_dict(ckpt["model"])
    if opt is not None and "opt" in ckpt:
        opt.load_state_dict(ckpt["opt"])
    return ckpt


In [None]:
def save_checkpoint(path: str, model: nn.Module, opt: torch.optim.Optimizer, step: int, extra: Optional[dict] = None) -> None:
    ckpt = {"model": model.state_dict(), "opt": opt.state_dict(), "step": int(step), "extra": extra or {}}
    torch.save(ckpt, path)
def load_checkpoint(path: str, model: nn.Module, opt: Optional[torch.optim.Optimizer] = None) -> dict:
    ckpt = torch.load(path, map_location="cpu")
    model.load_state_dict(ckpt["model"])
    if opt is not None and "opt" in ckpt:
        opt.load_state_dict(ckpt["opt"])
    return ckpt

## One self-play game (ReconChess) → (X, P, Z)


A single self-play game is executed by running two instances of the same ReconChess player against each other using the local game runner.

During the game, each player records training samples (X,π) at decision time, where X is the encoded state and π is derived from the search visit counts. After the game ends, the final outcome is converted into a value target z and assigned to all samples collected by each player.

The resulting lists (X,P,Z) provide one complete episode of training data that can be appended to the replay buffer.

In [None]:
def selfplay_one_game(
    model: nn.Module,
    cfg: FastBotConfig,
    seed: int = 0,
) -> Tuple[List[np.ndarray], List[np.ndarray], List[float], Optional[Color], str]:
    play_local_game = try_discover_play_local_game()
    bot_w = RBCFastAZPlayer(model=model, cfg=cfg, seed=seed)
    bot_b = RBCFastAZPlayer(model=model, cfg=cfg, seed=seed + 1)
    bot_w.store_training = True
    bot_b.store_training = True
    winner_color, reason, history = play_local_game(bot_w, bot_b)
    X = bot_w.X + bot_b.X
    P = bot_w.P + bot_b.P
    Z = bot_w.Z + bot_b.Z
    if len(Z) != len(X):
        if winner_color is None:
            z_w = 0.0
        else:
            z_w = 1.0 if bool(winner_color) == chess.WHITE else -1.0
        z_b = -z_w
        Z = [z_w] * len(bot_w.X) + [z_b] * len(bot_b.X)
    return X, P, Z, winner_color, reason


## Self-play → train → eval loop (FAST)


The main training loop alternates between data generation and network updates.

At each iteration, a small batch of self-play games is generated to produce new (X,π,z) samples, which are appended to the replay buffer. The policy/value network is then updated for a fixed number of gradient steps using mini-batches sampled from the buffer.

After training, the current model is evaluated in a short match series against a simple baseline opponent to provide a quick progress signal, and a checkpoint plus a CSV log entry are saved for later inspection

In [None]:
def run_selfplay_training(
    iters: int = 5,
    games_per_iter: int = 6,
    train_steps_per_iter: int = 300,
    batch_size: int = 64,
    sims_selfplay: int = 40,
    sims_eval: int = 20,
    seed: int = 0,
    out_dir: str = "fast_checkpoints",
    results_csv: str = "results.csv",
):
    os.makedirs(out_dir, exist_ok=True)
    results_path = os.path.join(out_dir, results_csv)
    fieldnames = [
        "timestamp", "iter", "buffer_size",
        "loss", "loss_policy", "loss_value",
        "eval_wins", "eval_losses", "eval_draws",
        "sims_selfplay", "sims_eval",
        "games_per_iter", "train_steps_per_iter", "batch_size",
        "ckpt_path",
    ]
    if not os.path.exists(results_path):
        with open(results_path, "w", newline="") as f:
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writeheader()
    model = FastPolicyValueNet(in_ch=15, trunk=64).to(DEVICE)
    opt = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
    buf = ReplayBuffer(max_size=200_000)
    step = 0
    for it in range(iters):
        cfg = FastBotConfig(sims=sims_selfplay, c_puct=1.5)
        for g in range(games_per_iter):
            X, P, Z, winner, reason = selfplay_one_game(model, cfg, seed=seed + it * 1000 + g)
            buf.add(X, P, Z)
        ds = BufferDataset(buf)
        dl = torch.utils.data.DataLoader(
            ds, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=True
        )
        metrics = train_steps(model, opt, dl, device=DEVICE, steps=train_steps_per_iter)
        try:
            wins = 0
            losses = 0
            draws = 0
            play_local_game = try_discover_play_local_game()
            for eg in range(6):
                bot = RBCFastAZPlayer(
                    model=model,
                    cfg=FastBotConfig(sims=sims_eval, c_puct=1.5),
                    seed=seed + 999 + it * 10 + eg,
                )
                opp = reconchess.bots.random_bot.RandomBot()
                winner_color, reason, history = play_local_game(bot, opp)
                if winner_color is None:
                    draws += 1
                elif bool(winner_color) == chess.WHITE:
                    wins += 1
                else:
                    losses += 1
            eval_res = {"wins": wins, "losses": losses, "draws": draws}
        except Exception as e:
            eval_res = {"wins": 0, "losses": 0, "draws": 0}
            print("Eval skipped/failed:", e)
        ckpt_path = os.path.join(out_dir, f"ckpt_iter_{it}.pt")
        save_checkpoint(
            ckpt_path,
            model,
            opt,
            step,
            extra={"iter": it, "buffer": len(buf)},
        )
        print(f"[iter {it}] buffer={len(buf)} train={metrics} eval6={eval_res} saved={ckpt_path}")
        row = {
            "timestamp": datetime.datetime.now().isoformat(timespec="seconds"),
            "iter": it,
            "buffer_size": len(buf),
            "loss": metrics.get("loss"),
            "loss_policy": metrics.get("lp"),
            "loss_value": metrics.get("lv"),
            "eval_wins": eval_res.get("wins", 0),
            "eval_losses": eval_res.get("losses", 0),
            "eval_draws": eval_res.get("draws", 0),
            "sims_selfplay": sims_selfplay,
            "sims_eval": sims_eval,
            "games_per_iter": games_per_iter,
            "train_steps_per_iter": train_steps_per_iter,
            "batch_size": batch_size,
            "ckpt_path": ckpt_path,
        }
        with open(results_path, "a", newline="") as f:
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writerow(row)
        step += 1
    return model


### Run a small self-play training (start tiny)


## Results plots
After training, plot loss and winrate vs random.


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
def plot_results(csv_path: str):
    df = pd.read_csv(csv_path)
    if len(df) == 0:
        print("Empty CSV.")
        return df
    plt.figure()
    plt.plot(df["iter"], df["loss"])
    plt.xlabel("iter")
    plt.ylabel("loss")
    plt.title("Training loss")
    plt.show()
    plt.figure()
    total = (df["eval_wins"] + df["eval_losses"] + df["eval_draws"]).clip(lower=1)
    winrate = df["eval_wins"] / total
    plt.plot(df["iter"], winrate)
    plt.xlabel("iter")
    plt.ylabel("winrate vs random (eval)")
    plt.title("Winrate vs RandomBot")
    plt.show()
    return df
'''Example:
df = plot_results("fast_checkpoints/results.csv")'''

'Example:\ndf = plot_results("fast_checkpoints/results.csv")'

This notebook provides a complete and executable reference implementation of a learning-based RBC agent, suitable for experimentation and further extensions.