In [1]:
from collections.abc import Sequence
import io
import os
import chess
import chess.engine
import chess.pgn
import numpy as np
from tqdm import tqdm
import pandas as pd

In [2]:
def get_puzzle_prob_from_pandas_row(
    puzzle,
):
  game = chess.pgn.read_game(io.StringIO(puzzle['PGN']))
  if game is None:
    raise ValueError(f'Failed to read game from PGN {puzzle["PGN"]}.')
  board = game.end().board()
  moves = puzzle['Moves'].split(' ')
  return (len(moves),
          evaluate_puzzle(
            board=board,
            moves = moves,
          ))


def evaluate_puzzle(
    board: chess.Board,
    moves: Sequence[str],
) -> bool:
  prob = 1.0
  for move_idx, move in enumerate(moves):
    if move_idx % 2 == 1:
      prob = prob / float(len(list(board.legal_moves)))
      predicted_move = move
      if move != predicted_move:
        board.push(chess.Move.from_uci(predicted_move))
        return board.is_checkmate()
    board.push(chess.Move.from_uci(move))
  return prob

def get_interval_str(value, start=200, end=3000, step=200):
  if value < start or value >= end:
    raise ValueError(f"Input {value} is out of range.")
  bucket = (value - start) // step
  lower = start + bucket * step
  return f"{lower}-{lower + step}"


In [3]:
puzzles_path = os.path.join(
    os.getcwd(),
    '../data/chess_data/puzzles.csv',
)
puzzles = pd.read_csv(puzzles_path)
result_df = pd.DataFrame(columns=[f"{i}-{i + 200}" for i in range(200, 3000, 200)])
result_df.index.name = 'puzzle_len'

for puzzle_id, puzzle in tqdm(puzzles.iterrows(), total=len(puzzles), desc="Evaluating puzzles"):
  puzzle_len, prob = get_puzzle_prob_from_pandas_row(
    puzzle=puzzle,
  )
  interval = get_interval_str(puzzle['Rating'])
  if puzzle_len not in result_df.index:
      result_df.loc[puzzle_len] = {col: np.array([0,0]) for col in result_df.columns}
  result_df.loc[puzzle_len, interval] = result_df.loc[puzzle_len, interval] + [prob,1]
result_df = result_df.sort_index()

Evaluating puzzles: 100%|██████████| 10000/10000 [00:11<00:00, 875.13it/s]


In [4]:
def sum_arrays(col):
    return np.sum(np.stack(col), axis=0)

sums = {col: sum_arrays(result_df[col]) for col in result_df.columns}
new_row_df = pd.DataFrame({col: [val] for col, val in sums.items()}, index=['all'])

result_df = pd.concat([result_df, new_row_df])

In [5]:
def sum_arrays_row(row):
    return np.sum(np.stack(row), axis=0)

sums_col = result_df.apply(sum_arrays_row, axis=1)
result_df['all'] = sums_col

In [5]:
result_df = result_df.applymap(lambda x: np.nan if x[1] == 0 else x[0] / x[1])

  result_df = result_df.applymap(lambda x: np.nan if x[1] == 0 else x[0] / x[1])
