In [97]:
import chess
import chess.pgn
import chess.engine
import re
import os
import sys
from io import StringIO
import subprocess
import numpy as np
from contextlib import contextmanager

In [None]:
from typing import List, Dict, Tuple, Generator

In [4]:
import pyperclip as pc

## Sample PGN Data

In [5]:
test_pgn = '''
[Event "Rated Bullet game"]
[Site "https://lichess.org/69CbaD8f"]
[Date "2021.10.07"]
[White "Cubigami"]
[Black "JoinedToday"]
[Result "1-0"]
[UTCDate "2021.10.07"]
[UTCTime "05:16:00"]
[WhiteElo "1930"]
[BlackElo "1912"]
[WhiteRatingDiff "+5"]
[BlackRatingDiff "-6"]
[Variant "Standard"]
[TimeControl "60+0"]
[ECO "B01"]
[Opening "Scandinavian Defense: Mieses-Kotroc Variation"]
[Termination "Normal"]
[Annotator "lichess.org"]

1. e4 { [%eval 0.24] [%clk 0:01:00] } 1... d5?! { (0.24 → 0.82) Inaccuracy. e5 was best. } { [%eval 0.82] [%clk 0:01:00] } (1... e5 2. Nf3 Nc6 3. Bb5 Nf6 4. O-O Nxe4 5. Re1 Nd6 6. Nxe5) 2. exd5 { [%eval 0.36] [%clk 0:01:00] } 2... Qxd5 { [%eval 0.66] [%clk 0:01:00] } { B01 Scandinavian Defense: Mieses-Kotroc Variation } 3. Nc3 { [%eval 0.46] [%clk 0:01:00] } 3... Qe6+?! { (0.46 → 1.12) Inaccuracy. Qa5 was best. } { [%eval 1.12] [%clk 0:01:00] } (3... Qa5 4. Nf3 Nf6 5. d4 Bf5 6. Ne5 c6 7. Bf4 Nbd7 8. Nc4) 4. Be2 { [%eval 0.94] [%clk 0:00:59] } 4... Qg6? { (0.94 → 2.18) Mistake. Nf6 was best. } { [%eval 2.18] [%clk 0:01:00] } (4... Nf6 5. d4 Qd6 6. Nf3 e6 7. O-O Be7 8. Nb5 Qd8 9. c4) 5. Nf3 { [%eval 1.51] [%clk 0:00:58] } 5... Nc6?? { (1.51 → 4.52) Blunder. Qxg2 was best. } { [%eval 4.52] [%clk 0:00:59] } (5... Qxg2 6. Rg1 Qh3 7. d4 Nf6 8. Rg3 Qf5 9. Ne5 c6 10. Rf3) 6. O-O?? { (4.52 → 0.37) Blunder. Nb5 was best. } { [%eval 0.37] [%clk 0:00:57] } (6. Nb5 Kd8 7. d4 a6 8. d5 axb5 9. dxc6+ Ke8 10. Ne5 Qe6) 6... h5?? { (0.37 → 5.60) Blunder. Bh3 was best. } { [%eval 5.6] [%clk 0:00:59] } (6... Bh3 7. Ne1 Bf5 8. Nd5 O-O-O 9. Ne3 Nf6 10. Nf3 Be4 11. d3) 7. h4?? { (5.60 → 0.08) Blunder. Nb5 was best. } { [%eval 0.08] [%clk 0:00:56] } (7. Nb5 Bh3) 7... a6?? { (0.08 → 2.63) Blunder. Bh3 was best. } { [%eval 2.63] [%clk 0:00:58] } (7... Bh3 8. Ng5 Bf5 9. Bb5 f6 10. Nf3 e6 11. Nd4 Ne7 12. Qf3 Kd7 13. Re1 a6 14. Ba4) 8. Bd3?? { (2.63 → -0.22) Blunder. d4 was best. } { [%eval -0.22] [%clk 0:00:53] } (8. d4) 8... Qf6?? { (-0.22 → 1.46) Blunder. Bf5 was best. } { [%eval 1.46] [%clk 0:00:56] } (8... Bf5) 9. Ne4 { [%eval 1.24] [%clk 0:00:51] } 9... Qe6?! { (1.24 → 2.16) Inaccuracy. Qg6 was best. } { [%eval 2.16] [%clk 0:00:54] } (9... Qg6 10. Ng3) 10. Nfg5? { (2.16 → 0.77) Mistake. Neg5 was best. } { [%eval 0.77] [%clk 0:00:51] } (10. Neg5 Qd6 11. Bc4 Nh6 12. c3 Bf5 13. Qb3 O-O-O 14. Bxf7 e5 15. Be6+ Bxe6 16. Qxe6+ Qxe6) 10... Qd7?? { (0.77 → 8.62) Blunder. Qd5 was best. } { [%eval 8.62] [%clk 0:00:52] } (10... Qd5 11. Nc3 Qd8 12. Bc4 e6 13. Re1 Be7 14. d3 Nh6 15. Qxh5 g6 16. Qd1 Nf5 17. g3) 11. Qf3?? { (8.62 → -0.59) Blunder. Nxf7 was best. } { [%eval -0.59] [%clk 0:00:50] } (11. Nxf7) 11... f6?? { (-0.59 → 8.00) Blunder. Ne5 was best. } { [%eval 8.0] [%clk 0:00:50] } (11... Ne5 12. Qf4 f6 13. Be2 Nc6 14. Nf3 e5 15. Qe3 Nge7 16. Qb3 Qd5 17. d3 Qxb3 18. axb3) 12. Nc5 { [%eval 7.68] [%clk 0:00:47] } 12... Qd8? { (7.68 → Mate in 1) Checkmate is now unavoidable. Ne5 was best. } { [%eval #1] [%clk 0:00:49] } (12... Ne5 13. Bg6+ Nxg6 14. Nxd7 Bxd7 15. Qxb7 Rd8 16. Qe4 Nxh4 17. Ne6 Bxe6 18. Qxe6 Rd6 19. Qc4) 13. Nge6? { (Mate in 1 → 8.51) Lost forced checkmate sequence. Bg6# was best. } { [%eval 8.51] [%clk 0:00:45] } (13. Bg6#) 13... Bxe6 { [%eval 8.53] [%clk 0:00:43] } 14. Nxe6 { [%eval 8.78] [%clk 0:00:45] } 14... Qd7? { (8.78 → Mate in 1) Checkmate is now unavoidable. Ne5 was best. } { [%eval #1] [%clk 0:00:43] } (14... Ne5 15. Qxb7) 15. Bg6# { [%clk 0:00:45] } { White wins by checkmate. } 1-0'''

## Python chess basics
https://python-chess.readthedocs.io/en/latest/

In [6]:
def is_analyzed_by_lichess(game: chess.pgn.Game) -> bool:
    return 'Annotator' in game.headers and game.headers['Annotator'] == 'lichess.org'


def generate_games(pgn_filename: str) -> Generator[str, None, None]:
    with open(pgn_filename) as file:
        while True:
            game = chess.pgn.read_game(file)
            if game is not None:
                yield game
            else:
                return


EVAL_REGEX_PAT = '\[%eval ([+-]?(?:[0-9]*[.])?[0-9]+|#[0-9])]'
def generate_fens_with_cpl(single_game_pgn: str) \
        -> Generator[Tuple[str, float], None, None]:
    """
    For each ply in the PGN, return its FEN and the CP score of the position. The PGN
    must be annotated using Lichess's %eval comment format.
    :param single_game_pgn: A PGN string containing only one game
    """
    def get_score_from_comment(comment: str) -> float:
        """ Return the pawn-score in the comment if it contains "%eval". If "%eval" not in comment, raises ValueError. """
        if '%eval' not in comment:
            raise ValueError

        # Get score part using regex
        evals = re.findall(EVAL_REGEX_PAT, comment)
        assert len(evals) == 1, f'error: "%eval" tag appears more than once in move.comment = {comment}'
        score = evals[0]

        # Convert to float
        try:
            score = float(score)
        except ValueError:
            # Only explanation for ValueError should be that %eval's score is a checkmate, ex. '#-5' or '#9'
            # Convert to centipawn-score with mate_score, then /100 to get pawn-score
            # https://python-chess.readthedocs.io/en/latest/engine.html?highlight=mate_score#chess.engine.Score.score
            assert '#' in score
            score = chess.engine.Mate(int(score.lstrip('#'))).score(mate_score=10_000) / 100.0
        return score


    def get_cpl(from_score: float, to_score: float, after_move_by: chess.Color) -> int:
        """ Get centipawn loss (CPL) based on scores of consecutive moves and side to move. """
        return int((to_score - from_score) * 100) * ((-1) ** (not after_move_by))


    game: chess.pgn.Game = chess.pgn.read_game(StringIO(single_game_pgn))
    assert is_analyzed_by_lichess(game), f'error: PGN not analyzed by lichess. PGN: {game}'

    board = game.board()
    mainline_nodes: List[chess.pgn.ChildNode] = list(game.mainline())
    for i, node in enumerate(mainline_nodes):
        move: chess.Move = node.move
        comment: str = node.comment
        try:
            next_comment = mainline_nodes[i+1].comment
        except IndexError:
            # Mainline over -> can't get centipawn loss on this move
            return

        # Get centipawn loss (CPL)
        try:
            score = get_score_from_comment(comment)
            next_score = get_score_from_comment(next_comment)
        except ValueError:
            # If for some reason there is no "%eval" in either comment,
            # continue because CPL can't be calculated
            continue
        cpl = get_cpl(score, next_score, board.turn)

        board.push(move)
        yield board.fen(), cpl

In [7]:
list(generate_fens_with_cpl(test_pgn))

[('rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq - 0 1', 57),
 ('rnbqkbnr/ppp1pppp/8/3p4/4P3/8/PPPP1PPP/RNBQKBNR w KQkq - 0 2', 46),
 ('rnbqkbnr/ppp1pppp/8/3P4/8/8/PPPP1PPP/RNBQKBNR b KQkq - 0 2', 30),
 ('rnb1kbnr/ppp1pppp/8/3q4/8/8/PPPP1PPP/RNBQKBNR w KQkq - 0 3', 20),
 ('rnb1kbnr/ppp1pppp/8/3q4/8/2N5/PPPP1PPP/R1BQKBNR b KQkq - 1 3', 66),
 ('rnb1kbnr/ppp1pppp/4q3/8/8/2N5/PPPP1PPP/R1BQKBNR w KQkq - 2 4', 18),
 ('rnb1kbnr/ppp1pppp/4q3/8/8/2N5/PPPPBPPP/R1BQK1NR b KQkq - 3 4', 124),
 ('rnb1kbnr/ppp1pppp/6q1/8/8/2N5/PPPPBPPP/R1BQK1NR w KQkq - 4 5', 67),
 ('rnb1kbnr/ppp1pppp/6q1/8/8/2N2N2/PPPPBPPP/R1BQK2R b KQkq - 5 5', 301),
 ('r1b1kbnr/ppp1pppp/2n3q1/8/8/2N2N2/PPPPBPPP/R1BQK2R w KQkq - 6 6', 414),
 ('r1b1kbnr/ppp1pppp/2n3q1/8/8/2N2N2/PPPPBPPP/R1BQ1RK1 b kq - 7 6', 523),
 ('r1b1kbnr/ppp1ppp1/2n3q1/7p/8/2N2N2/PPPPBPPP/R1BQ1RK1 w kq - 0 7', 552),
 ('r1b1kbnr/ppp1ppp1/2n3q1/7p/7P/2N2N2/PPPPBPP1/R1BQ1RK1 b kq - 0 7', 254),
 ('r1b1kbnr/1pp1ppp1/p1n3q1/7p/7P/2N2N2/PPPPBPP1/R1BQ1RK1 w kq -

In [170]:
@contextmanager
def start_engine_process() -> Generator[subprocess.Popen, None, None]:
    # Open the exe using Popen
    p = subprocess.Popen("./stockfish_14_win_x64_avx2/stockfish_14_x64_avx2.exe",
                         stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)

    # Send commands to this open process by using stdin.write(*command*)
    send_commands(p, 'isready\n')
    # p.stdout.readline()

    try:
        yield p
    finally:
        p.terminate()


def send_commands(proc: subprocess.Popen, *commands: str) -> None:
    if not commands:
        return

    for cmd in commands:
        proc.stdin.write(cmd.encode())

    proc.stdin.flush()


def analyze_static(p: subprocess.Popen, fen: str) -> str:
    # Send commands to this open process by using stdin.write(*command*)
    send_commands(p, f'position fen {fen}\n', 'eval\n')

    # Communicate and get the output from the executable
    lines = []
    for _ in range(73):
        line = p.stdout.readline().decode().strip()
        lines.append(line.strip('\n\r'))

        # Static eval doesn't run on positions where either king is in check
        if 'none (in check)' in line:
            break

    # Output
    return '\n'.join(lines)

In [171]:
with start_engine_process() as p:
    output = analyze_static(p, 'r3r3/pppkb1pp/8/n2nN3/8/8/PPPP1PPP/RNB1K2R b KQ - 2 12')
    print(output)
    pc.copy(output)

Stockfish 14 by the Stockfish developers (see AUTHORS file)
readyok
info string NNUE evaluation using nn-3475407dc199.nnue enabled

Final evaluation: none (in check)


In [12]:
def export_csv(output_filename: str, data: np.array) -> None:
    ...


def prepare_data(pgn_filename: str) -> np.array:


with open('data/lichess_db_standard_rated_2015-09.pgn/lichess_db_standard_rated_2015-09.pgn') as file:
    i = 0
    pgn = ''
    while pgn is not None:
        pgn = chess.pgn.read_game(file)

    print(i, 'games found')

KeyboardInterrupt: 