In [1]:
import numpy as np
import pandas as pd
import plotly.express as px

from os.path import getsize

from chessf.engine import Stockfish

# import statsmodels.api as sm

**Parsing functions**

In [2]:
PGN_FILE = "pgn/lichess_db_chess960_rated_2025-10.pgn"
print(f"PGN file size (bytes): {getsize(PGN_FILE):,}")

f = open(PGN_FILE, mode="r")

PGN file size (bytes): 898,587,353


In [3]:
for _ in range(500):
    line = f.readline()
    print(line, end="")

[Event "Rated Chess960 game"]
[Site "https://lichess.org/JNTEWgur"]
[Date "2025.10.01"]
[Round "-"]
[White "Blindcatkelevra"]
[Black "paparoach"]
[Result "1-0"]
[UTCDate "2025.10.01"]
[UTCTime "00:37:15"]
[WhiteElo "2053"]
[BlackElo "1982"]
[WhiteRatingDiff "+8"]
[BlackRatingDiff "-4"]
[TimeControl "30+1"]
[Termination "Normal"]
[FEN "bbrqnnkr/pppppppp/8/8/8/8/PPPPPPPP/BBRQNNKR w KQkq - 0 1"]
[SetUp "1"]
[Variant "Chess960"]

1. d4 { [%clk 0:00:30] } 1... b6 { [%clk 0:00:30] } 2. c4 { [%clk 0:00:30] } 2... d5 { [%clk 0:00:31] } 3. b3 { [%clk 0:00:30] } 3... dxc4 { [%clk 0:00:31] } 4. bxc4 { [%clk 0:00:31] } 4... Ng6 { [%clk 0:00:30] } 5. e4 { [%clk 0:00:31] } 5... Nf6 { [%clk 0:00:29] } 6. e5 { [%clk 0:00:29] } 6... Nd7 { [%clk 0:00:30] } 7. Nd3 { [%clk 0:00:27] } 7... e6 { [%clk 0:00:29] } 8. Ng3 { [%clk 0:00:27] } 8... O-O { [%clk 0:00:30] } 9. O-O { [%clk 0:00:27] } 9... c5 { [%clk 0:00:30] } 10. Nh5 { [%clk 0:00:13] } 10... cxd4 { [%clk 0:00:28] } 11. f4 { [%clk 0:00:13] } 11... f5

In [None]:
def parse_key_value(string):
    key, value = string.strip()[1:-1].split(maxsplit=1)
    return key, value[1:-1]

def parse_moves(string):
    tokens = string.split()
    moves = [token for token in tokens if token[0].isalpha()]
    evals = [token[:-1] for token in tokens if token.endswith("]") and "." in token]
    clock = [token[:-1] for token in tokens if (len(token) == 8 and token[1] == token[4] == ":")]
    return moves, evals, clock

def get_next_game():
    game = {}
    while True:
        line = f.readline()
        if line == "":
            raise Exception()
        elif line == "\n":
            continue
        elif line.startswith("["):
            key, value = parse_key_value(line)
            game[key] = value
        elif line.startswith("1."):
            moves, evals, clock = parse_moves(line)
            game["Moves"] = moves
            game["Evals"] = evals
            game["Clock"] = clock
            return game

In [None]:
get_next_game()

In [None]:
class FilePGN:

    def parse(self, game):
        moves = [
            string
            for string in game
            if string[0].isalpha()
        ]

        moves = [
            self.parse_pgn_move(move, self.get_side(k))
            for k, move in enumerate(moves)
        ]

        return moves

    @staticmethod
    def get_side(k):
        if k % 2 == 0:
            return "w"
        else:
            return "b"

    @staticmethod
    def parse_pgn_move(move, side):

        move = move.translate(str.maketrans("", "", "x=+#?!"))

        if (move == "O-O"):
            move = {"w": "Ke1g1", "b": "Ke8g8"}[side]
        if (move == "O-O-O"):
            move = {"w": "Ke1c1", "b": "Ke8c8"}[side]

        if move[0].isupper():
            piece, move = move[0], move[1:]
        else:
            piece = "p"

        if move[-1].isupper():
            promotion_to, move = move[-1], move[:-1]
        else:
            promotion_to = ""

        piece = piece.lower()
        promotion_to = promotion_to.lower()

        to_square = move[-2:]
        disambig = move[:-2]

        return (side, piece, disambig, to_square, promotion_to)

parser_moves = FilePGN()

In [None]:
stockfish = Stockfish("stockfish/stockfish-windows-x86-64-avx2.exe")

In [None]:
PGN_FILE = "pgn/lichess_db_standard_rated_2018-06.pgn"
print(f"PGN file size (bytes): {getsize(PGN_FILE):,}")

f = open(PGN_FILE, mode="r")

In [None]:
def get_data():

    big_df = pd.DataFrame()#columns=["K", "Q", "R", "B", "N", "P", "k", "q", "r", "b", "n", "p"])
    for i in range(10_000):
        print(i, end="\r")
        a = get_next_analysed_game()
        a = a.replace("[", "").replace("]", "")
        a = a.split()
        
        moves = a[1::8]
        evals = a[4::8]
        
        if moves[-1].endswith("#"):
            moves = moves[:-1]
            evals = evals[:-1]
        
        moves = moves[:200]
        evals = evals[:200]
        
        moves = parser_moves.parse(moves)
        
        stockfish.start_new_game()
        d = []
        for move_number, move in enumerate(moves, start=1):
            stockfish.make_pgn_move(move)
            d.append(list(stockfish.pieces_count))
        
        assert len(d) == len(evals)
        
        df = pd.DataFrame(d[20:], columns=["K", "Q", "R", "B", "N", "P", "k", "q", "r", "b", "n", "p"])
        df["Eval"] = evals[20:]
    
        df["Qq"] = df["Q"] - df["q"]
        df["Rr"] = df["R"] - df["r"]
        df["Bb"] = df["B"] - df["b"]
        df["Nn"] = df["N"] - df["n"]
        df["Pp"] = df["P"] - df["p"]

        df = df.drop(columns=["K", "Q", "R", "B", "N", "P", "k", "q", "r", "b", "n", "p"])
        
        big_df = pd.concat([big_df, df])
    
    big_df = big_df[~big_df["Eval"].str.startswith("#")]
    big_df = big_df.astype(float)
    return big_df

In [None]:
df = get_data()

In [None]:
X = sm.add_constant(df.drop(columns={"Eval"}))
Y = df["Eval"]

In [None]:
model = sm.OLS(Y, X)

In [None]:
results = model.fit()

In [None]:
print(results.summary())

In [None]:
def moves_to_df(moves):
    s = moves.replace("[", "").replace("]", "")
    s = s.split(" ")
    s = s[:-1]
    
    # Small fix for when last move is mate
    if len(s) % 8 != 0:
        s.insert(-3, "%eval")
        s.insert(-3, "#0")

    df = pd.DataFrame.from_dict({
        "Move": s[1::8],
        "Eval": s[4::8],
    }, orient="index").transpose()
    
    df["MoveNumber"] = (df.index // 2) + 1
    df["MoveSide"] = (df.index % 2)
    
    # Only first 200 moves have analysis
    df = df.head(200)

    return df

In [None]:
get_next_analysed_game()[-1]

In [None]:
def get_and_parse_next_analysed_game():
    while True:
        data = get_next_analysed_game()
        game = params_to_dict(data[:-1])
        
        is_good_game = all([
            game["TimeControl"].split("+")[0] in ["600", "900", "1200", "1800"],
            abs(float(game["WhiteRatingDiff"])) <= 20,
            abs(float(game["BlackRatingDiff"])) <= 20,
            abs(int(game["WhiteElo"]) - int(game["BlackElo"])) <= 200,
            game["Termination"] in ["Normal", "Time forfeit"]
        ])
        
        if not is_good_game:
            continue
        
        moves = moves_to_df(data[-1])
        
        # FirstMoves = moves["Move"].values[:20]
        # FirstMoves = " ".join(FirstMoves)
        # FirstMoves = FirstMoves.replace("?", "").replace("!", "").replace("+", "").replace("#", "")
        # game["FirstMoves"] = FirstMoves

        return game, moves

In [None]:
def get_two_dfs(size=10_000):
    games_list = []
    moves_list = []
    n_good_games_found = 0
    while n_good_games_found < size:
        
        try:
            a, b = get_and_parse_next_analysed_game()
        except:
            continue

        game_id = a["Site"].split("/")[-1]
        a["GameId"] = game_id
        b["GameId"] = game_id
        
        games_list.append(a)
        moves_list.append(b)
        
        n_good_games_found += 1
        
    return (
        pd.DataFrame(games_list),
        pd.concat(moves_list)
    )

**Start parsing**

In [None]:
PGN_FILE = "pgn/lichess_db_standard_rated_2018-06.pgn"
print(f"PGN file size (bytes): {getsize(PGN_FILE):,}")

f = open(PGN_FILE, mode="r")

# OFFSET = 0
# f.seek(OFFSET)
# while True:
#     line = f.readline()
#     if line.startswith("1."):
#         break

In [None]:
batch_size = 10_000
n_batches = 25

In [None]:
for batch in range(1, n_batches + 1):

    df_games, df_moves = get_two_dfs(batch_size)

    df_games = df_games[[
        "GameId",
        "Result",
        "TimeControl",
        "Termination",
        
        "White",
        "Black",
        "WhiteElo",
        "BlackElo",
        "WhiteRatingDiff",
        "BlackRatingDiff",
        "ECO",
        "Opening"
    ]]

    df_games["WhiteElo"] = df_games["WhiteElo"].astype(int)
    df_games["BlackElo"] = df_games["BlackElo"].astype(int)

    df_games["WhiteRatingDiff"] = df_games["WhiteRatingDiff"].astype(int)
    df_games["BlackRatingDiff"] = df_games["BlackRatingDiff"].astype(int)
    
    # Assert ids are the same in both dfs
    ids = set(df_games["GameId"]) & set(df_moves["GameId"])
    df_games = df_games[ df_games["GameId"].isin(ids) ]
    df_moves = df_moves[ df_moves["GameId"].isin(ids) ]
    assert set(df_games["GameId"]) == set(df_moves["GameId"])

    df_games.to_parquet(f"parsed/batch_{batch}_games.parquet")
    df_moves.to_parquet(f"parsed/batch_{batch}_moves.parquet")

    print(f"Saved batch #{batch}")

In [None]:
print(f"Last cursor position (bytes): {f.tell():,}")

In [None]:
# Last cursor position (bytes): 42,669,125,447