# Example: Improving LLM Chess Ability with Mixture-of-N Sampling


In [None]:
import asyncio
import logging
import random
from typing import Dict, List, Optional, Tuple
from uuid import UUID

import chess
import neatplot
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tensorzero import AsyncTensorZeroGateway
from tqdm import tqdm, trange
from utils import AbstractPlayer, proportion_ci, run_puzzle

In [None]:
logger = logging.getLogger(__name__)
neatplot.set_style("notex")

Below, we create a chess player class that takes the current state of the board and calls a TensorZero function to choose a move.
We give the LLM access to the current board state as ASCII, the color the player should play as, and the legal moves in Standard Algebraic Notation (SAN).
The TensorZero function returns a JSON object with the thinking and the move.
We log the thinking and the move, and then return the move to the caller.
We also return the episode ID, which we can use to give feedback on the move later.


In [None]:
class TensorZeroPlayer(AbstractPlayer):
    """
    A chess player that uses a TensorZero LLM to choose moves.
    """

    def __init__(
        self, client: AsyncTensorZeroGateway, variant_name: Optional[str] = None
    ):
        self.client = client
        self.variant_name = variant_name

    async def play(
        self, board: chess.Board, episode_id: Optional[UUID] = None
    ) -> Tuple[str, Optional[UUID]]:
        """
        Returns the move chosen by the TensorZero LLM in SAN (Standard Algebraic Notation).
        """
        legal_moves_san = [board.san(move) for move in board.legal_moves]

        try:
            result = await self.client.inference(
                function_name="play_chess_board",
                input={
                    "messages": [
                        {
                            # We pass the board state, the color of the player, and the legal moves in SAN to
                            # TensorZero.
                            "role": "user",
                            "content": {
                                "board": str(board),
                                "color": "white" if board.turn else "black",
                                "legal_moves_san": legal_moves_san,
                            },
                        }
                    ]
                },
                variant_name=self.variant_name,
                episode_id=episode_id,
            )
            thinking = result.output.parsed["thinking"]
            logger.info(f"Player thinking: {thinking}")
            move = result.output.parsed["move"]
            logger.info(f"Player move: {move}")
            episode_id = result.episode_id
        except Exception as e:
            logger.error(f"Error occurred: {type(e).__name__}: {e}")
            logger.info("Choosing a random legal move as fallback.")
            move = random.choice(legal_moves_san)
            return move, episode_id
        return move, episode_id

Below is the main function that runs many chess puzzles and sends feedback to TensorZero on each episode describing whether the puzzle solution was successful.
We'll try a handful of different variants to see how they perform.


In [None]:
async def run_puzzles(
    player: AbstractPlayer,
    puzzle_df: pd.DataFrame,
    variant_name: str,
    semaphore: asyncio.Semaphore,
    client: Optional[AsyncTensorZeroGateway] = None,
    disable_progress_bar: bool = False,
) -> List[bool]:
    """
    Runs the puzzles in the dataframe and returns the list of successes.
    """
    successes = []
    episode_ids = []
    num_successes = 0
    total_puzzles = len(puzzle_df)
    progress_bar = trange(
        total_puzzles, desc=f"[Inference] {variant_name}", disable=disable_progress_bar
    )

    tasks = [
        asyncio.create_task(run_puzzle(puzzle_df.iloc[i].to_dict(), player, semaphore))
        for i in range(total_puzzles)
    ]

    for task in asyncio.as_completed(tasks):
        success, episode_id = await task
        successes.append(success)
        episode_ids.append(episode_id)
        if success:
            num_successes += 1
        current = len(successes)
        logger.info(
            f"Puzzle {current} completed {'successfully' if success else 'unsuccessfully'}"
        )
        ci_lower, ci_upper = proportion_ci(num_successes, current)
        progress_bar.update(1)
        progress_bar.set_postfix(
            {
                "Success": f"{num_successes}/{current} CI: ({ci_lower:.2f}, {ci_upper:.2f})"
            },
            refresh=True,
        )
    progress_bar.close()

    if client:
        for success, episode_id in tqdm(
            zip(successes, episode_ids),
            total=len(successes),
            desc=f"[Feedback] {variant_name}",
            disable=disable_progress_bar,
        ):
            if episode_id:
                async with semaphore:
                    await client.feedback(
                        episode_id=episode_id,
                        metric_name="puzzle_success",
                        value=success,
                    )

    return successes

In [None]:
# This train set size will take a while to run (10 minutes with 10 concurrent requests and 1000 examples for the best-of-5 variants) but give statistically significant results
# For a quick test, you can use NUM_EXAMPLES = 10
NUM_EXAMPLES = 1000
puzzle_df = pd.read_csv("data/lichess_easy_puzzles_train.csv")
puzzle_df = puzzle_df.head(NUM_EXAMPLES)

In [None]:
# Reduce this value if you're getting rate-limited by OpenAI
MAX_CONCURRENT_T0_REQUESTS = 50
semaphore = asyncio.Semaphore(MAX_CONCURRENT_T0_REQUESTS)

In [None]:
# So we can plot later
variant_stats: Dict[str, Dict[str, float]] = {}

First, let's try a reasonable prompt with GPT-4o Mini. You can check `config/functions/play_chess_board/chess_prompt/` to see the templates.


In [None]:
variant_name = "baseline"

In [None]:
async with AsyncTensorZeroGateway("http://localhost:3000", timeout=180.0) as client:
    results = await run_puzzles(
        TensorZeroPlayer(client, variant_name),
        puzzle_df,
        variant_name,
        semaphore,
        client,
    )

In [None]:
num_successes = sum(results)
total_puzzles = len(results)
print(
    f"{variant_name}: {num_successes}/{total_puzzles} = {num_successes/total_puzzles:.2f}"
)
ci_lower, ci_upper = proportion_ci(num_successes, total_puzzles)
print(f"{variant_name} confidence interval: ({ci_lower:.2f}, {ci_upper:.2f})")
variant_stats[variant_name] = {
    "mean": num_successes / total_puzzles,
    "ci_lower": ci_lower,
    "ci_upper": ci_upper,
}

Next, let's try mixture-of-5 sampling with the same prompt and model. You should see a statistically significant improvement in performance by spending more compute.


In [None]:
variant_name = "mixture_of_5"

In [None]:
async with AsyncTensorZeroGateway("http://localhost:3000", timeout=180.0) as client:
    results = await run_puzzles(
        TensorZeroPlayer(client, variant_name),
        puzzle_df,
        variant_name,
        semaphore,
        client,
    )

In [None]:
num_successes = sum(results)
total_puzzles = len(results)
print(
    f"{variant_name}: {num_successes}/{total_puzzles} = {num_successes/total_puzzles:.2f}"
)
ci_lower, ci_upper = proportion_ci(num_successes, total_puzzles)
print(f"{variant_name} confidence interval: ({ci_lower:.2f}, {ci_upper:.2f})")
variant_stats[variant_name] = {
    "mean": num_successes / total_puzzles,
    "ci_lower": ci_lower,
    "ci_upper": ci_upper,
}

The previous two variants used the same prompt and model.
Let's try a mixture-of-5 variant that uses different prompts for each candidate.
You should once again see an improvement in performance!
Try experimenting with different prompts to see if you can get even better performance.


In [None]:
variant_name = "mixture_of_5_diverse_prompts"

In [None]:
async with AsyncTensorZeroGateway("http://localhost:3000", timeout=180.0) as client:
    results = await run_puzzles(
        TensorZeroPlayer(client, variant_name),
        puzzle_df,
        variant_name,
        semaphore,
        client,
    )

In [None]:
num_successes = sum(results)
total_puzzles = len(results)
print(
    f"{variant_name}: {num_successes}/{total_puzzles} = {num_successes/total_puzzles:.2f}"
)
ci_lower, ci_upper = proportion_ci(num_successes, total_puzzles)
print(f"{variant_name} confidence interval: ({ci_lower:.2f}, {ci_upper:.2f})")
variant_stats[variant_name] = {
    "mean": num_successes / total_puzzles,
    "ci_lower": ci_lower,
    "ci_upper": ci_upper,
}

In [None]:
# Prepare data for plotting
variants = list(variant_stats.keys())
means = np.array([variant_stats[v]["mean"] for v in variants])
ci_lower = np.array([variant_stats[v]["ci_lower"] for v in variants])
ci_upper = np.array([variant_stats[v]["ci_upper"] for v in variants])

# Create the bar chart
fig, ax = plt.subplots(figsize=(12, 6))
x = np.arange(len(variants))
bars = ax.bar(x, means, yerr=[means - ci_lower, ci_upper - means], capsize=5, alpha=0.8)

# Customize the chart
ax.set_ylabel("Mean Success Rate")
ax.set_title("Success Rate — Chess Puzzles")
ax.set_xticks(x)
ax.set_xticklabels(variants, rotation=45, ha="right")
# ax.set_ylim(0, 1)  # Set y-axis limits from 0 to 1

# Add value labels on top of each bar
for bar in bars:
    height = bar.get_height()
    ax.text(
        bar.get_x() + bar.get_width() / 2.0,
        height / 2,
        f"{height:.2f}",
        ha="center",
        va="bottom",
    )

plt.tight_layout()
plt.show()

## Bonus (Optional) — Stockfish

If you have [Stockfish](https://stockfishchess.org/) installed (`brew install stockfish` on Mac), you can try running the same puzzles with a real chess engine.


In [None]:
from utils import StockfishPlayer

In [None]:
# Stockfish is CPU bound so we should only run 1 at a time
semaphore = asyncio.Semaphore(1)

player = StockfishPlayer(3190)  # very powerful
results = await run_puzzles(player, puzzle_df, "stockfish", semaphore)

In [None]:
num_successes = sum(results)
total_puzzles = len(results)
print(
    f"{variant_name}: {num_successes}/{total_puzzles} = {num_successes/total_puzzles:.2f}"
)
ci_lower, ci_upper = proportion_ci(num_successes, total_puzzles)
print(f"{variant_name} confidence interval: ({ci_lower:.2f}, {ci_upper:.2f})")
variant_stats[variant_name] = {
    "mean": num_successes / total_puzzles,
    "ci_lower": ci_lower,
    "ci_upper": ci_upper,
}