In [1]:
import chess
import chess.pgn
import os
import re
import numpy as np
import pandas as pd
from tqdm import tqdm

In [2]:
def load_pgns(file_path, num_games=None, start_index=0, encoding="utf-8"):
    games = []
    with open(file_path, "r", encoding=encoding) as file:
        for _ in tqdm(range(start_index), desc='Skipping games', unit='game', leave=False):
            game = chess.pgn.read_game(file)
            if game is None:
                break
        for _ in tqdm(range(num_games), desc='Loading games', unit='game', leave=True) if num_games else iter(int, 1):
            game = chess.pgn.read_game(file)
            if game is None:
                break
            games.append(game)
    return games

In [3]:
%%time
assets_path = os.path.join(os.path.dirname(os.getcwd()), 'data')
single_path = os.path.join(assets_path, 'kgames_sample.pgn')
games = load_pgns(single_path, 1000)
game = games[0]
board = game.board()

                                      

Loading games: 100%|██████████| 1000/1000 [00:02<00:00, 405.78game/s]

CPU times: total: 1.36 s
Wall time: 2.48 s





In [4]:
def get_eval_from_result(result):
    if result == '1-0':
        return 64
    elif result == '0-1':
        return -64
    else:
        return 0
        
def extract_eval_clk_from_pgn(input, result=''):
    clk_pattern = r"\[%clk\s+([0-9:]+)\]"
    eval_pattern = r"\[%eval\s+([0-9.-]+)\]"
    mate_pattern = r"\[%eval\s+#([0-9.-]+)\]"
    clk = re.search(clk_pattern, input)
    clk = clk.group(1) if clk else None
    eval = re.search(eval_pattern, input)
    eval = eval.group(1) if eval else None
    if not eval:
        eval = re.search(mate_pattern, input)
        eval = int(eval.group(1)) if eval else get_eval_from_result(result)
        if eval > 0:
            eval = 64
        else:
            eval = -64
    return eval, clk

def time_control_to_list(time_control_obj):
    time_control = [float(x) for x in time_control_obj.split('+')]
    time_control += [0] * (2 - len(time_control))
    if time_control[0] < 30:
        time_control[0] = time_control[0] * 60
    return time_control
        
def eval_to_cp(eval):
    eval = float(eval)
    return eval * 100

def clk_to_time(clk):
    h, m, s = clk.split(":")
    t = int(h) * 3600 + int(m) * 60 + int(s)
    return t

def eval_to_game_state(value, cuts = None):
    if cuts == None:
        cuts = [np.inf, 375, 250, 150, 75, 25, -25, -75, -150, -250, -375, -np.inf]
    for i in range(len(cuts) - 1):
        if cuts[i] >= value > cuts[i + 1]:
            return round(1-i/10, 2)

def fen_to_array(fen, mask=False):
    piece_mapping = {'p': -1, 'n': -2, 'b': -3, 'r': -4, 'q': -5, 'k': -6,
                     'P': 1, 'N': 2, 'B': 3, 'R': 4, 'Q': 5, 'K': 6}
    board_fen, turn, castling, en_passant, halfmove, fullmove = fen.split()
    board_array = [0] * 64
    rank = 7
    file = 0
    for char in board_fen:
        if char.isdigit():
            file += int(char)
        elif char == '/':
            rank -= 1
            file = 0
        else:
            index = rank * 8 + file
            board_array[index] = piece_mapping[char]
            file += 1
    if mask:
        arrays = [(np.array(board_array) == val).astype(int) for val in range(-6, 7) if val != 0]
        return np.array(arrays).flatten()
    else:
        return np.array(board_array)

def determine_move_quality(df):
    subjective_state = 20 * (df['game_state'] - 0.5) * np.power(-1, np.arange(len(df))) / 10
    delta_state = -10 * pd.concat([pd.Series([0.6]), df['game_state']]).diff().iloc[1:] * np.power(-1, np.arange(len(df))) / 10
    near_equality = np.abs(subjective_state) < 0.5
    decisive_advantage = np.abs(subjective_state) > 0.7
    fate_sealed = np.abs(subjective_state) > 0.9

    move_quality = np.full(len(df), 4) # Default to 4 (Mistake)
    move_quality[(delta_state > .45) | ((delta_state > .35) & decisive_advantage) | ((delta_state > .25) & fate_sealed)] = 5 # Blunder
    move_quality[((delta_state < .25) & ~decisive_advantage) | ((delta_state < .35) & near_equality)] = 3 # Inaccuracy
    move_quality[(delta_state < .15)] = 2 # Good
    move_quality[(delta_state < .05) & (subjective_state > -1)] = 1 # Great
    return move_quality.tolist()

In [5]:
def game_to_df(game, mask=False):
    result = game.headers['Result']
    board = game.board()
    moves = [move for move in game.mainline_moves()]
    board = game.board()
    node = game
    ply = 0
    df = pd.DataFrame(columns=['lan', 'game_state', 'time_remain', 'time_spent', 'fen_array', 'fen_string'])
    time_controls = time_control_to_list(game.headers['TimeControl'])
    current_clock = [time_controls[0] + time_controls[1]]*2
    for move in moves:
        turn = ply%2
        current_clock[turn] += time_controls[1]
        ply += 1
        node = node.next()
        eval, clk = extract_eval_clk_from_pgn(node.comment, result)
        eval = eval_to_game_state(eval_to_cp(eval))
    
        clk = clk_to_time(clk)    
        t_spent = current_clock[turn] - clk
        current_clock[turn] = clk
        
        lan = board.lan(move)
        fen_str = board.fen()
        fen_arr = fen_to_array(fen_str, mask)
        board.push(move)
        new_row = {'lan': lan, 'game_state': eval, 'time_remain': clk, 'time_spent':t_spent, 'fen_array': fen_arr, 'fen_string': fen_str}
        df.loc[ply] = new_row
    return df


In [6]:
df = game_to_df(game)
#df = game_to_df(game, True)

df['move_quality'] = determine_move_quality(df)
df[df['move_quality']>2]['fen_array'][6]

array([ 4,  2,  3,  0,  6,  3,  2,  4,  1,  1,  1,  0,  1,  1,  1,  1,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  5,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0, -1, -1, -1,
       -1,  0, -1, -1, -1, -4, -2, -3, -5, -6, -3, -2, -4])