#### Define environment

In [1]:
import numpy as np
import tensorflow as tf
from tf_agents.environments import py_environment
from tf_agents.specs import BoundedArraySpec
from tf_agents.trajectories import time_step as ts

# Define constants for board cells
EMPTY, P1, P2 = 0, 1, -1  # EMPTY = unoccupied, P1 = player 1, P2 = player 2

# Game board dimensions
BOARD_SIZE = 12

# Create a mask for legal playable positions shaped like a cross
CROSS_MASK = np.zeros((BOARD_SIZE, BOARD_SIZE), bool)
# Fill the center 4x4 block
CROSS_MASK[4:8, 4:8] = True
# Fill the vertical and horizontal arms of the cross
CROSS_MASK[0:4, 4:8] = True
CROSS_MASK[8:12, 4:8] = True
CROSS_MASK[4:8, 0:4] = True
CROSS_MASK[4:8, 8:12] = True

# Total number of legal action positions
ACTION_SIZE = CROSS_MASK.sum()  # 80 legal squares

# Precompute the (row, col) coordinates of each legal action index
LEGAL_IDXS = np.stack(np.where(CROSS_MASK), axis=1)  # Map 0-79 ➜ (r, c)


class CrossTicTacToe(py_environment.PyEnvironment):
    def __init__(self):
        # Define the action space: a single integer from 0 to ACTION_SIZE - 1
        self._action_spec = BoundedArraySpec((), np.int32, 0, ACTION_SIZE - 1)
        # Define the observation space: a BOARD_SIZE x BOARD_SIZE grid with values in [-1, 1]
        self._observation_spec = BoundedArraySpec((BOARD_SIZE, BOARD_SIZE), np.int8, -1, 1)
        self._board = np.zeros((BOARD_SIZE, BOARD_SIZE), np.int8)  # Game state
        self._current_player = P1  # Player 1 starts
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        # Reset game state to initial conditions
        self._board.fill(EMPTY)
        self._current_player = P1
        self._episode_ended = False
        return ts.restart(self._board.copy())

    def _step(self, action):
        # If game is over, restart
        if self._episode_ended:
            return self.reset()

        # Get (row, col) from action index
        r, c = LEGAL_IDXS[action]
        
        # If the selected square is occupied, skip the move (penalty: turn lost)
        if self._board[r, c] != EMPTY:
            self._current_player *= -1
            return ts.transition(self._board.copy(), reward=0.0, discount=1.0)

        # Probabilistic placement: 50% chance to place at selected cell,
        # otherwise randomly pick one of its 8 neighbors
        if np.random.rand() < 0.5:
            target = (r, c)
        else:
            neigh = [(r + dr, c + dc)
                     for dr in [-1, 0, 1] for dc in [-1, 0, 1] if not (dr == dc == 0)]
            target = neigh[np.random.randint(8)]

        tr, tc = target
        # Place piece if target is valid and legal
        if (0 <= tr < BOARD_SIZE and 0 <= tc < BOARD_SIZE and
                CROSS_MASK[tr, tc] and self._board[tr, tc] == EMPTY):
            self._board[tr, tc] = self._current_player
            win = self._check_win(tr, tc)
            full = (self._board != EMPTY).sum() == ACTION_SIZE
            # Win condition met
            if win:
                self._episode_ended = True
                marker = self._board[tr, tc]
                reward = float(marker)  # P1: +1, P2: -1
                return ts.termination(self._board.copy(), reward)
            # Draw condition: board is full
            elif full:
                self._episode_ended = True
                return ts.termination(self._board.copy(), 0.0)

        # Continue game: switch player, no reward
        self._current_player *= -1
        return ts.transition(self._board.copy(), reward=0.0, discount=1.0)

    def _check_win(self, r, c):
        """Check if placing at (r, c) resulted in a win."""
        b = self._board
        p = self._current_player
        # Check horizontal and vertical lines (need 4 in a row)
        if (self._run_length(b[r, :] == p) or
            self._run_length(b[:, c] == p)):
            return True
        # Check diagonals (need 5 in a row)
        diag1 = np.diagonal(b, c - r)
        diag2 = np.diagonal(np.fliplr(b), BOARD_SIZE - 1 - c - r)
        return (self._run_length(diag1 == p, 5) or
                self._run_length(diag2 == p, 5))

    @staticmethod
    def _run_length(arr, needed=4):
        """Check if there is a run of 'needed' consecutive True values in arr."""
        count = 0
        for v in arr:
            count = count + 1 if v else 0
            if count >= needed:
                return True
        return False

    def clone(self):
        """Create a fresh copy of this environment, preserving game state."""
        new = CrossTicTacToe()
        new._board = self._board.copy()
        new._current_player = self._current_player
        new._episode_ended = self._episode_ended
        new._current_time_step = self._current_time_step
        return new


In [2]:
# smoke_test_env.py

import numpy as np

from tf_agents.trajectories import time_step as ts

# Utility to visualize the board in terminal
def print_board(board):
    for i in range(board.shape[0]):
        row = ""
        for j in range(board.shape[1]):
            if not CROSS_MASK[i, j]:  # Skip spaces outside the playable cross
                row += "  "
            else:
                v = board[i, j]
                row += {EMPTY: ". ", P1: "X ", P2: "O "}[v]
        print(row)
    print()

# Explain why a move did or didn't result in placing a piece
def detect_reason(prev, new, action):
    """Determine why the move did or didn’t place a piece."""
    r, c = LEGAL_IDXS[action]
    if prev[r, c] != EMPTY:
        return "🛑 Occupied → forfeited"
    
    # Compute which cells changed
    diff = (new != prev) & CROSS_MASK
    locs = list(zip(*np.where(diff)))
    
    if len(locs) == 1:
        tr, tc = locs[0]
        if (tr, tc) == (r, c):
            return "✅ Accepted on target"
        else:
            return f"↪️ Redirected to neighbor ({tr},{tc})"
    elif len(locs) == 0:
        return "🚫 Forfeited (neighbor bad)"
    else:
        return f"❓ Unexpected placement at {locs}"

# Main test function to simulate random play and validate behavior
def smoke_test_env(seed=42, max_steps=500):
    np.random.seed(seed)
    env = CrossTicTacToe()
    t = env.reset()

    # --- Basic reset condition checks ---
    assert isinstance(t, ts.TimeStep), "reset() must return a TimeStep"
    assert t.step_type == ts.StepType.FIRST
    assert float(t.reward) == 0.0
    assert t.observation.shape == (12, 12)

    print("✔︎ reset OK — initial board:")
    print_board(t.observation)

    prev_board = t.observation.copy()
    for step in range(1, max_steps + 1):
        # Randomly select a legal action
        action = np.random.randint(0, env.action_spec().maximum + 1)
        t = env.step(action)
        new_board = t.observation.copy()
        reward = float(t.reward)

        # Explain the move
        reason = detect_reason(prev_board, new_board, action)

        print(f"Step{step:3d} — action={action:2d} — reward={reward: .1f}")
        print("   ", reason)
        print_board(new_board)

        # Check correctness of forfeited or illegal actions
        if "illegal" in reason.lower() or "forfeited" in reason.lower():
            assert reward == 0.0, "Illegal or forfeited move should give reward 0"

        # Terminal case: win (±1) or draw (0)
        if t.is_last():
            assert reward in (-1.0, 0.0, 1.0), "Invalid terminal reward"
            print(f"✔︎ terminated after {step} steps with reward={reward}")
            break

        prev_board = new_board
    else:
        # If loop completes without termination, raise error
        raise AssertionError(f"Env did not terminate within {max_steps} steps")

    print("✔︎ smoke test passed!")

# Run test if script is executed directly
if __name__ == "__main__":
    smoke_test_env()



✔︎ reset OK — initial board:
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step  1 — action=51 — reward= 0.0
    🚫 Forfeited (neighbor bad)
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step  2 — action=71 — reward= 0.0
    ↪️ Redirected to neighbor (10,7)
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        

#### Neural Network, MCTS, self play driver and train

In [3]:
# nn_policy_value.py
import tensorflow as tf
from tensorflow import keras

def build_model():
    # tell Keras up‐front that our input is float32
    inputs = keras.Input(shape=(12,12,1), dtype=tf.float32)

    x = inputs   # no need to cast

    for filters in [64, 64, 128]:
        x = keras.layers.Conv2D(filters, 3, padding='same', activation='relu')(x)
        x = keras.layers.BatchNormalization()(x)

    # policy head
    # Outputs logits for each of the 80 legal positions (flattened)
    p = keras.layers.Conv2D(2, 1, activation='relu')(x)
    p = keras.layers.Flatten()(p)
    p = keras.layers.Dense(80)(p)                 

    # value head
    # Predicts the expected outcome of the game from current state
    v = keras.layers.Conv2D(1, 1, activation='relu')(x)
    v = keras.layers.Flatten()(v)
    v = keras.layers.Dense(64, activation='relu')(v)
    v = keras.layers.Dense(1, activation='tanh')(v)  # -1 .. 1

    # Create and return the full model with two heads: policy and value
    return keras.Model(inputs=inputs, outputs=[p, v])



In [4]:
# mcts_alpha.py
import numpy as np

C_PUCT = 1.5  # Exploration constant for PUCT formula 

class Node:
    __slots__ = ("state", "player", "P", "N", "W", "children", "terminal", "reward")

    def __init__(self, state, player, P, terminal=False, reward=0.0):
        self.state    = state              # Game board: 12×12 numpy array
        self.player   = player             # Current player at this node
        self.P        = P                  # Prior probabilities from the policy network (shape: (80,))
        self.N        = np.zeros_like(P, np.int32)     # Visit count per action
        self.W        = np.zeros_like(P, np.float32)   # Total action value per action
        self.children = {}                 # Action → Node (child nodes)
        self.terminal = terminal           # Whether this node is terminal (game ended)
        self.reward   = reward             # If terminal, reward for the current player

class MCTS:
    def __init__(self, model, simulations=100):
        self._model = model               # Neural network model (outputs policy + value)
        self._S     = simulations         # Number of simulations per move

    def run(self, env):
        """Run MCTS simulations starting from the current environment state."""
        root = self._expand(env)  # Build root node with NN priors and value
        for _ in range(self._S):
            sim_env = env.clone()              # Clone the environment for simulation
            self._simulate(sim_env, root)      # Perform one MCTS simulation

        # After simulations, derive move probabilities (π) from visit counts
        pi = root.N / np.sum(root.N)
        return pi, root

    def _expand(self, env):
        """Initial expansion and evaluation of the root node using the NN."""
        # 1) Prepare board for neural network input
        board_tensor = env._board.astype(np.float32)[None, ..., None]  # Shape: (1, 12, 12, 1)
        logits_t, value_t = self._model(board_tensor)
        logits = logits_t[0].numpy()  # Raw logits for policy (shape: (80,))
        v = float(value_t[0, 0])      # Scalar value prediction

        # 2) Mask out illegal moves
        legal_mask = (env._board[CROSS_MASK] == EMPTY)  # Shape: (80,)
        neg_inf = -1e9
        masked = np.where(legal_mask, logits, neg_inf)
        exps = np.exp(masked - np.max(masked))  # Stabilized softmax
        prior = exps / np.sum(exps)

        # Add Dirichlet noise to encourage exploration
        alpha = 0.3
        epsilon = 0.25
        noise = np.random.dirichlet([alpha] * prior.shape[0])
        prior = (1 - epsilon) * prior + epsilon * noise

        # 3) Create and return root node
        return Node(
            state=env._board.copy(),
            player=env._current_player,
            P=prior,
            terminal=env._episode_ended,
            reward=v
        )

    def _simulate(self, env, node):
        """Recursively simulate a game from the given node."""
        # 1) If node is terminal, return the stored reward
        if node.terminal:
            return node.reward

        # 2) Select action using PUCT formula
        total_N = np.sum(node.N)
        U = C_PUCT * node.P * np.sqrt(total_N) / (1 + node.N)
        Q = node.W / (1 + node.N)  # Mean value estimate
        scores = Q + U

        # Mask out illegal moves
        legal_mask = (env._board[CROSS_MASK] == EMPTY)
        scores[~legal_mask] = -np.inf

        # Choose action with highest PUCT score
        a = int(np.argmax(scores))

        # 3) Step the environment using the selected action
        ts = env.step(a)

        # 4) If action has not been explored yet, expand child
        if a not in node.children:
            done = ts.is_last()
            reward = float(ts.reward)

            # Initialize child node with placeholder priors
            child = Node(
                state=env._board.copy(),
                player=env._current_player,
                P=np.zeros_like(node.P),  # Will be filled in if not terminal
                terminal=done,
                reward=reward
            )

            # Evaluate NN if the game is still ongoing
            if not done:
                b2 = env._board.astype(np.float32)[None, ..., None]
                l2, v2 = self._model(b2)
                l2 = l2[0].numpy()
                mask2 = (env._board[CROSS_MASK] == EMPTY)
                m2 = np.where(mask2, l2, -1e9)
                e2 = np.exp(m2 - np.max(m2))
                child.P = e2 / np.sum(e2)

            # Register child node
            node.children[a] = child
            value = child.reward if done else float(v2[0, 0])
        else:
            # 5) If already expanded, simulate from existing child
            child = node.children[a]
            value = self._simulate(env, child)

        # 6) Backpropagate value estimate up the tree
        node.N[a] += 1
        node.W[a] += value
        return value


In [5]:
# selfplay_driver_simple.py

import numpy as np
import tensorflow as tf
from tf_agents.replay_buffers import tf_uniform_replay_buffer


# --- Build model and replay buffer ---

# Instantiate the policy+value network
model = build_model()

# Define data_spec: what data we store for each time step
data_spec = {
    'obs':    tf.TensorSpec((12, 12, 1), tf.float32),   # Board observation (with channel)
    'policy': tf.TensorSpec((80,), tf.float32),         # MCTS-derived action probabilities
    'value':  tf.TensorSpec((), tf.float32),            # Outcome from current player's view
}

# Replay buffer to collect training data
batch_size = 1  # We generate one episode at a time
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=data_spec,
    batch_size=batch_size,
    max_length=100000,  # Total capacity
)

def reset_replay_buffer():
    global replay_buffer
    replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
        data_spec=data_spec,
        batch_size=1,
        max_length=100000,
    )

# --- Self-play episode function ---

def self_play_episode(model, replay_buffer, simulations=50):
    """Play a full episode using MCTS-guided self-play, storing data in replay buffer."""
    env = CrossTicTacToe()
    mcts = MCTS(model, simulations=simulations)
    time_step = env.reset()

    while not time_step.is_last():
        # Run MCTS from the current position to get improved policy π
        pi, _ = mcts.run(env)

        # Sample action from π (adds exploration vs greedy max)
        action = np.random.choice(80, p=pi)

        # Prepare input observation: board with shape (12, 12, 1)
        obs = env._board.astype(np.int8)[..., None]  # Add channel dimension
        player = env._current_player  # Needed for correct value sign

        # Step environment
        next_time_step = env.step(action)

        # Compute value from current player's perspective
        z = float(next_time_step.reward) * player

        # Store experience: (obs, policy, value)
        replay_buffer.add_batch({
            'obs':    tf.convert_to_tensor(obs[None, ...], dtype=tf.float32),   # (1, 12, 12, 1)
            'policy': tf.convert_to_tensor(pi[None, ...], dtype=tf.float32),    # (1, 80)
            'value':  tf.convert_to_tensor([z], dtype=tf.float32),              # (1,)
        })

        # Advance to next time step
        time_step = next_time_step

# --- Run multiple self-play games to collect data ---

def run_self_play(n_episodes=25):
    """Generate multiple self-play games and populate the replay buffer."""
    for _ in range(n_episodes):
        self_play_episode(model, replay_buffer)



In [6]:
# trainer_simple.py

import tensorflow as tf
from tf_agents.replay_buffers import tf_uniform_replay_buffer  # Only needed for dataset

# --- Model and optimizer setup ---

model = build_model()
optimizer = tf.keras.optimizers.Adam(1e-3)  # Learning rate = 0.001

# Create checkpoint manager for saving model and optimizer state
ckpt = tf.train.Checkpoint(model=model, optimizer=optimizer)

# --- Dataset setup from replay buffer ---

# Replay buffer must be pre-filled before training starts
# Each batch contains 1 time step per sample (no sequences)
dataset = replay_buffer.as_dataset(sample_batch_size=256, num_steps=1)
iterator = iter(dataset)

# --- Training step function (compiled with tf.function for speed) ---

@tf.function
def train_step(obs, target_pi, target_v):
    """Single training step: compute loss, gradients, and apply updates."""
    with tf.GradientTape() as tape:
        logits, v_pred = model(obs, training=True)  # Forward pass

        # Policy loss: cross-entropy between predicted logits and MCTS probabilities
        p_loss = tf.keras.losses.CategoricalCrossentropy(from_logits=True)(
            target_pi, logits)

        # Value loss: mean squared error between predicted and true outcome
        v_loss = tf.keras.losses.MeanSquaredError()(
            target_v, tf.squeeze(v_pred, axis=-1))  # Remove last dim from value

        # Total loss: policy + weighted value 
        loss = p_loss + 0.25 * v_loss

    # Backpropagation
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))
    return loss

# --- Training loop ---

def train(n_iterations=1000):
    """Train the model on replay buffer data."""
    for step in range(n_iterations + 1):
        batch = next(iterator)

        # Canonicalize batch format to a flat dict
        if isinstance(batch, dict):
            data = batch
        elif isinstance(batch, tuple) and len(batch) == 2 and isinstance(batch[0], dict):
            data = batch[0]  # (data_dict, info)
        elif isinstance(batch, (list, tuple)) and len(batch) == 3:
            obs_b, pi_b, v_b = batch
            data = {'obs': obs_b, 'policy': pi_b, 'value': v_b}
        else:
            raise ValueError(f"Unexpected batch format: {batch}")

        # Remove time dimension (1) from (batch_size, time, ...)
        obs_b = tf.squeeze(tf.cast(data['obs'], tf.float32), axis=1)   # (256, 12, 12, 1)
        pi_b  = tf.squeeze(data['policy'], axis=1)                     # (256, 80)
        v_b   = tf.squeeze(tf.cast(data['value'], tf.float32), axis=1) # (256,)

        # Train the model on this batch
        loss = train_step(obs_b, pi_b, v_b)

        # Monitor training reward/value targets
        mean_reward = tf.reduce_mean(v_b)

        # Log progress and save checkpoints every 100 steps
        if step % 100 == 0:
            print(f"Step {step:<4d} — loss={loss:.4f} — mean target_v={mean_reward:.3f}")
            ckpt.save('checkpoints/ckpt')  # Save to ./checkpoints/ckpt-*


Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=False) instead.


In [7]:
print("Seeding replay buffer with 50 episodes of self‐play…")
run_self_play(n_episodes=50)
print("Buffer size:", replay_buffer.num_frames().numpy())

Seeding replay buffer with 50 episodes of self‐play…
Buffer size: 1955


In [8]:
train(n_iterations=500)

Step 0    — loss=5.2739 — mean target_v=0.039
Step 100  — loss=4.1453 — mean target_v=0.020
Step 200  — loss=4.1319 — mean target_v=0.031
Step 300  — loss=4.0175 — mean target_v=0.035
Step 400  — loss=4.0133 — mean target_v=0.027
Step 500  — loss=3.9735 — mean target_v=0.035


In [9]:
train(n_iterations=500)

Step 0    — loss=3.9297 — mean target_v=0.016
Step 100  — loss=4.0232 — mean target_v=0.023
Step 200  — loss=3.9731 — mean target_v=0.035
Step 300  — loss=3.9292 — mean target_v=0.043
Step 400  — loss=3.9948 — mean target_v=0.023
Step 500  — loss=3.9342 — mean target_v=0.020


In [10]:
tf.train.latest_checkpoint('checkpoints')

'checkpoints/ckpt-12'

#### Evaluation with random move

In [11]:
def smoke_test_vs_random(
    checkpoint_tag='ckpt-23',
    checkpoint_dir='checkpoints',
    num_games=10,
    mcts_simulations=50,
):
    """
    Run a quick smoke test of a model checkpoint vs. a random player.

    Args:
        checkpoint_tag (str): Specific model checkpoint name (e.g., 'ckpt-23').
        checkpoint_dir (str): Directory where checkpoints are stored.
        num_games (int): Number of self-play games to run for evaluation.
        mcts_simulations (int): MCTS simulations used for the agent's decisions.

    Returns:
        dict: Summary statistics (wins, draws, losses, win_rate).
    """

    def load_agent(tag):
        """Load a model from the specified checkpoint."""
        model = build_model()
        ckpt = tf.train.Checkpoint(model=model)
        prefix = f"{checkpoint_dir}/{tag}"
        ckpt.restore(prefix).expect_partial()
        print(f"✔︎ Loaded model from '{prefix}'")
        return model

    def play_one_game(agent):
        """Play a single game of agent (P1) vs. random player (P2)."""
        env = CrossTicTacToe()
        t = env.reset()
        prev = t.observation.copy()
        mcts = MCTS(agent, simulations=mcts_simulations)
        step = 0

        print("\n=== New Game vs. Random ===")
        print_board(t.observation)

        while not t.is_last():
            step += 1
            mover = env._current_player
            mover_char = "X" if mover == P1 else "O"

            if mover == P1:
                # Use MCTS with the agent's policy/value model
                pi, root = mcts.run(env)
                policy = root.N / root.N.sum() if root.N.sum() > 0 else root.P
                action = int(np.random.choice(len(policy), p=policy))
            else:
                # Random opponent samples a valid legal move
                legal = [i for i, (r, c) in enumerate(LEGAL_IDXS)
                         if env._board[r, c] == EMPTY]
                action = int(np.random.choice(legal))

            t = env.step(action)
            new = t.observation
            reward = float(t.reward)
            reason = detect_reason(prev, new, action)

            print(f"Step {step:3d} — {mover_char} → idx {action:2d} | {reason} | reward={reward: .1f}")
            print_board(new)
            prev = new.copy()

        print(f"🛑 Game over after {step} moves — final reward {float(t.reward): .1f}")
        return float(t.reward)

    # --- Run multiple games and collect stats ---
    agent = load_agent(checkpoint_tag)
    wins = draws = losses = 0

    for game in range(1, num_games + 1):
        result = play_one_game(agent)
        if result == 1.0:
            wins += 1
        elif result == 0.0:
            draws += 1
        else:
            losses += 1
        print(f"Game {game:2d}: reward={result:+.1f}")

    # --- Summary ---
    win_rate = wins / num_games
    print(f"\nSummary over {num_games} games vs. random:")
    print(f"  Wins:   {wins}")
    print(f"  Draws:  {draws}")
    print(f"  Losses: {losses}")
    print(f"→ Win rate: {win_rate:.2%}")

    return {
        'wins': wins,
        'draws': draws,
        'losses': losses,
        'win_rate': win_rate,
    }


In [12]:
results = smoke_test_vs_random(checkpoint_tag='ckpt-12', num_games=10)

✔︎ Loaded model from 'checkpoints/ckpt-12'

=== New Game vs. Random ===
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step   1 — X → idx 24 | ✅ Accepted on target | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . X . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step   2 — O → idx 71 | ✅ Accepted on target | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . X . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        

#### Use ckpt-12 to self-play

In [14]:
def load_latest_checkpoint(model_builder, checkpoint_dir='checkpoints'):
    """
    Reconstructs the model using the provided builder function
    and loads the latest checkpoint from the given directory.

    Args:
        model_builder (function): Function that returns a compiled model instance.
        checkpoint_dir (str): Directory where checkpoints are stored.

    Returns:
        model (tf.keras.Model): The model with restored weights.
    """
    model = model_builder()
    ckpt = tf.train.Checkpoint(model=model)
    latest = tf.train.latest_checkpoint(checkpoint_dir)

    if latest:
        ckpt.restore(latest).expect_partial()
        print(f"✔︎ Restored model from {latest}")
    else:
        raise RuntimeError(f"No checkpoint found in `{checkpoint_dir}/`")

    return model


In [15]:
def generate_self_play_games(model, replay_buffer, num_games=50, simulations=50, log_every=10):
    """
    Generates self-play games using the provided model and stores the results in the replay buffer.

    Args:
        model (tf.keras.Model): The neural network used for policy and value evaluation.
        replay_buffer: A buffer object with an `.add()` method and `.num_frames()` method.
        num_games (int): Number of self-play games to generate.
        simulations (int): Number of MCTS simulations per move.
        log_every (int): Frequency of logging progress.
    """
    for i in range(num_games):
        self_play_episode(model, replay_buffer, simulations=simulations)
        if (i + 1) % log_every == 0:
            print(f"  • Generated {i + 1} / {num_games} self‑play games")
    print("✔︎ Self‑play data in buffer:", replay_buffer.num_frames().numpy())


In [16]:
# reset replay buffer
reset_replay_buffer()
print("Buffer size:", replay_buffer.num_frames().numpy())

Buffer size: 0


In [17]:
model = load_latest_checkpoint(build_model)
generate_self_play_games(model, replay_buffer, num_games=50)

✔︎ Restored model from checkpoints/ckpt-12
  • Generated 10 / 50 self‑play games
  • Generated 20 / 50 self‑play games
  • Generated 30 / 50 self‑play games
  • Generated 40 / 50 self‑play games
  • Generated 50 / 50 self‑play games
✔︎ Self‑play data in buffer: 2833


In [18]:
train(n_iterations=1000)

Step 0    — loss=3.7844 — mean target_v=0.027
Step 100  — loss=3.9583 — mean target_v=0.035
Step 200  — loss=4.0196 — mean target_v=0.023
Step 300  — loss=3.9847 — mean target_v=0.016
Step 400  — loss=3.9266 — mean target_v=0.020
Step 500  — loss=3.9474 — mean target_v=0.027
Step 600  — loss=3.9466 — mean target_v=0.031
Step 700  — loss=3.8639 — mean target_v=0.027
Step 800  — loss=3.8942 — mean target_v=0.027
Step 900  — loss=3.9223 — mean target_v=0.035
Step 1000 — loss=3.9529 — mean target_v=0.023


In [19]:
tf.train.latest_checkpoint('checkpoints')

'checkpoints/ckpt-23'

In [25]:
results = smoke_test_vs_random(checkpoint_tag='ckpt-23', num_games=10)

✔︎ Loaded model from 'checkpoints/ckpt-23'

=== New Game vs. Random ===
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step   1 — X → idx 11 | ✅ Accepted | reward= 0.0
        . . . .         
        . . . .         
        . . . X         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step   2 — O → idx 74 | ✅ Accepted | reward= 0.0
        . . . .         
        . . . .         
        . . . X         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
   

#### Round-robin tournament

compare two models' performance. If win rate > 55%, promote new model as current best. Otherwise, discard it

In [21]:
# evaluate_versions_verbose.py

import os, glob
import numpy as np
import tensorflow as tf

# ── CONFIG ────────────────────────────────────────────────────────────────
# Define which versions to evaluate against each other
CURRENT_VERSION  = ''     # e.g., 'ckpt-50'
OLD_VERSIONS     = ['']   # e.g., ['ckpt-30', 'ckpt-40']
MCTS_SIMS        = 20     # Number of MCTS simulations per move
GAMES_PER_PAIR   = 10     # Games per matchup for quick testing

# ── HELPERS ────────────────────────────────────────────────────────────────

def print_board(board):
    """Display the board in a human-readable format."""
    for i in range(board.shape[0]):
        row = ""
        for j in range(board.shape[1]):
            if not CROSS_MASK[i, j]:
                row += "  "  # Non-playable cell
            else:
                row += {EMPTY: ". ",
                        P1:   "X ",
                        P2:   "O "}[board[i, j]]
        print(row)
    print()

def detect_reason(prev, new, action):
    """Diagnose what happened after a move was attempted."""
    r, c = LEGAL_IDXS[action]
    if prev[r, c] != EMPTY:
        return "🛑 Occupied → forfeited"
    diff = (new != prev) & CROSS_MASK
    locs = list(zip(*np.where(diff)))
    if len(locs) == 1:
        tr, tc = locs[0]
        return "✅ Accepted" if (tr, tc) == (r, c) else f"↪️ Redirected to {locs[0]}"
    elif not locs:
        return "🚫 Forfeited"
    else:
        return f"❓ Unexpected at {locs}"

def load_agent_tag(tag):
    """Load a model from a checkpoint tag or directory."""
    model = build_model()
    ckpt  = tf.train.Checkpoint(model=model)

    # Try to load from subdirectory first
    subdir = os.path.join('checkpoints', tag)
    if os.path.isdir(subdir):
        latest = tf.train.latest_checkpoint(subdir)
    else:
        # Try matching by file prefix
        files = glob.glob(os.path.join('checkpoints', f"{tag}*"))
        prefixes = {os.path.splitext(f)[0] for f in files}
        latest = max(prefixes) if prefixes else None

    # Fallback to latest overall
    if not latest:
        latest = tf.train.latest_checkpoint('checkpoints')
    if not latest:
        raise FileNotFoundError(f"No checkpoint for tag {tag}")

    ckpt.restore(latest).expect_partial()
    print(f"✔︎ Loaded checkpoint: {latest}")
    return model

def play_verbose(agent_X, agent_O, sims=MCTS_SIMS):
    """Play a single verbose game between agent_X and agent_O using MCTS."""
    env    = CrossTicTacToe()
    t      = env.reset()
    prev   = t.observation.copy()
    mcts_X = MCTS(agent_X, sims)
    mcts_O = MCTS(agent_O, sims)
    turn   = 0

    print("\n―― New Game ――")
    print_board(t.observation)

    while not t.is_last():
        mover = env._current_player
        mover_char = "X" if mover == P1 else "O"
        turn += 1

        if mover == P1:
            pi, root = mcts_X.run(env)
            policy = root.N / root.N.sum() if root.N.sum() > 0 else root.P
            action = int(np.random.choice(len(policy), p=policy))
        else:
            pi, root = mcts_O.run(env)
            policy = root.N / root.N.sum() if root.N.sum() > 0 else root.P
            action = int(np.random.choice(len(policy), p=policy))

        t = env.step(action)
        new = t.observation
        reason = detect_reason(prev, new, action)
        print(f"Turn {turn:2d} — {mover_char} → idx {action:2d} | {reason} | reward={float(t.reward): .1f}")
        print_board(new)
        prev = new.copy()

    return float(t.reward)


In [None]:
# ── ROUND‑ROBIN LOOP ───────────────────────────────────────────────────────
def evaluate_model_vs_older_versions(
    CURRENT_VERSION,
    OLD_VERSIONS,
    GAMES_PER_PAIR=10,
    MCTS_SIMS=50,
):
    """
    Run head-to-head matches between the current model and each older version.

    Args:
        CURRENT_VERSION (str): Checkpoint tag for the current model (e.g., 'ckpt-50').
        OLD_VERSIONS (list of str): List of older checkpoint tags to compare against.
        GAMES_PER_PAIR (int): Number of games to play per pair (even → equal sides).
        MCTS_SIMS (int): MCTS simulations per move.

    Output:
        Prints W/D/L and win-rate for the current model vs. each older one.
    """
    
    current = load_agent_tag(CURRENT_VERSION)

    for old_tag in OLD_VERSIONS:
        opponent = load_agent_tag(old_tag)
        w_cur = w_old = d = 0  # Win counters for current, old, and draws

        print(f"\n=== {CURRENT_VERSION} vs {old_tag} ({GAMES_PER_PAIR} games) ===")
        for i in range(1, GAMES_PER_PAIR + 1):
            print(f"\n→ Game {i}/{GAMES_PER_PAIR}")
            
            if i % 2 == 1:
                # Odd-numbered game: current plays as X (first), opponent as O
                result = play_verbose(current, opponent, sims=MCTS_SIMS)
            else:
                # Even-numbered game: sides are swapped
                # Reverse the reward so that it's always from current model's perspective
                result = -play_verbose(opponent, current, sims=MCTS_SIMS)

            # Track outcome
            if result == 1:
                w_cur += 1
            elif result == -1:
                w_old += 1
            else:
                d += 1

        rate = w_cur / GAMES_PER_PAIR
        print(f"\n★ Result: {CURRENT_VERSION} W/D/L = {w_cur}/{d}/{w_old}  Win‑rate = {rate:.0%}")


In [23]:
evaluate_model_vs_older_versions(
    CURRENT_VERSION = 'ckpt-23',
    OLD_VERSIONS = ['ckpt-12'],
    GAMES_PER_PAIR=10,
    MCTS_SIMS=50,
)

✔︎ Loaded checkpoint: checkpoints/ckpt-23
✔︎ Loaded checkpoint: checkpoints/ckpt-12

=== ckpt-23 vs ckpt-12 (10 games) ===

→ Game 1/10

―― New Game ――
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  1 — X → idx 78 | ↪️ Redirected to (11, 7) | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . X         

Turn  2 — O → idx  0 | ✅ Accepted | reward= 0.0
        O . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . .

In [24]:
evaluate_model_vs_older_versions(
    CURRENT_VERSION = 'ckpt-12',
    OLD_VERSIONS = ['ckpt-23'],
    GAMES_PER_PAIR=10,
    MCTS_SIMS=50,
)

✔︎ Loaded checkpoint: checkpoints/ckpt-12
✔︎ Loaded checkpoint: checkpoints/ckpt-23

=== ckpt-12 vs ckpt-23 (10 games) ===

→ Game 1/10

―― New Game ――
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  1 — X → idx  2 | ✅ Accepted | reward= 0.0
        . . X .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  2 — O → idx 32 | ↪️ Redirected to (4, 5) | reward= 0.0
        . . X .         
        . . . .         
        . . . .         
        . . . .         
. . . . . O . . . . . . 
. . . . . . 

#### Use ckpt-23 to self-play

In [27]:
# reset replay buffer
reset_replay_buffer()
print("Buffer size:", replay_buffer.num_frames().numpy())

Buffer size: 0


In [28]:
model = load_latest_checkpoint(build_model)
generate_self_play_games(model, replay_buffer, num_games=50)

✔︎ Restored model from checkpoints/ckpt-23
  • Generated 10 / 50 self‑play games
  • Generated 20 / 50 self‑play games
  • Generated 30 / 50 self‑play games
  • Generated 40 / 50 self‑play games
  • Generated 50 / 50 self‑play games
✔︎ Self‑play data in buffer: 2696


In [29]:
train(n_iterations=1000)

Step 0    — loss=3.9375 — mean target_v=0.020
Step 100  — loss=3.9416 — mean target_v=0.027
Step 200  — loss=3.8923 — mean target_v=0.012
Step 300  — loss=3.8947 — mean target_v=0.027
Step 400  — loss=3.8779 — mean target_v=0.035
Step 500  — loss=3.9231 — mean target_v=0.020
Step 600  — loss=3.9081 — mean target_v=0.035
Step 700  — loss=3.8520 — mean target_v=0.016
Step 800  — loss=3.8670 — mean target_v=0.031
Step 900  — loss=3.8446 — mean target_v=0.027
Step 1000 — loss=3.8993 — mean target_v=0.023


In [30]:
tf.train.latest_checkpoint('checkpoints')

'checkpoints/ckpt-34'

In [31]:
results = smoke_test_vs_random(checkpoint_tag='ckpt-34', num_games=10)

✔︎ Loaded model from 'checkpoints/ckpt-34'

=== New Game vs. Random ===
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step   1 — X → idx 77 | 🚫 Forfeited | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Step   2 — O → idx 54 | 🚫 Forfeited | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
 

In [32]:
evaluate_model_vs_older_versions(
    CURRENT_VERSION = 'ckpt-34',
    OLD_VERSIONS = ['ckpt-23'],
    GAMES_PER_PAIR=10,
    MCTS_SIMS=50,
)

✔︎ Loaded checkpoint: checkpoints/ckpt-34
✔︎ Loaded checkpoint: checkpoints/ckpt-23

=== ckpt-34 vs ckpt-23 (10 games) ===

→ Game 1/10

―― New Game ――
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  1 — X → idx 58 | ✅ Accepted | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . X . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  2 — O → idx 12 | ✅ Accepted | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        O . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 


In [33]:
evaluate_model_vs_older_versions(
    CURRENT_VERSION = 'ckpt-23',
    OLD_VERSIONS = ['ckpt-34'],
    GAMES_PER_PAIR=10,
    MCTS_SIMS=50,
)

✔︎ Loaded checkpoint: checkpoints/ckpt-23
✔︎ Loaded checkpoint: checkpoints/ckpt-34

=== ckpt-23 vs ckpt-34 (10 games) ===

→ Game 1/10

―― New Game ――
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  1 — X → idx 52 | ↪️ Redirected to (6, 1) | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . . . . . . . 
. X . . . . . . . . . . 
. . . . . . . . . . . . 
        . . . .         
        . . . .         
        . . . .         
        . . . .         

Turn  2 — O → idx 68 | ✅ Accepted | reward= 0.0
        . . . .         
        . . . .         
        . . . .         
        . . . .         
. . . . . . . . . . . . 
. . . . . . 