In [2]:
import io
import os
from collections import Counter
from concurrent.futures import ProcessPoolExecutor, as_completed

import chess
import chess.pgn


In [3]:
pgn_file = "/home/vandy/Work/MATH6310/blunder-analysis/data/raw/lichess_db_standard_rated_2025-08.pgn"

In [4]:
# Parallel PGN parser


def stream_games(pgn_path):
    """Stream raw PGN game texts from file. Yields one game's full text (string) at a time.

    Heuristic: a game contains tag-pairs (lines starting with '[') and moves.
    A blank line after tags+moves marks the boundary.
    """
    with open(pgn_path, encoding="utf-8", errors="replace") as f:
        buf = []
        seen_tag = False
        seen_move = False
        for line in f:
            buf.append(line)
            if line.startswith("["):
                seen_tag = True
            elif line.strip() == "":
                # blank line potentially ends a game
                if seen_tag and seen_move:
                    yield "".join(buf)
                    buf = []
                    seen_tag = False
                    seen_move = False
            # non-empty non-tag line -> move text (or comments)
            elif not line.startswith("["):
                seen_move = True
        # EOF: if any buffered lines, yield them as last game
        if buf:
            yield "".join(buf)


def stream_chunks(pgn_path, chunk_size_bytes=8 * 1024 * 1024):
    """Yield PGN text chunks sized for parallel parsing without splitting games."""
    if chunk_size_bytes <= 0:
        raise ValueError("chunk_size_bytes must be positive")
    with open(pgn_path, encoding="utf-8", errors="replace") as f:
        buffer = []
        size_acc = 0
        for line in f:
            buffer.append(line)
            size_acc += len(line.encode("utf-8"))
            if size_acc >= chunk_size_bytes and line.strip() == "":
                yield "".join(buffer)
                buffer = []
                size_acc = 0
        if buffer:
            yield "".join(buffer)


def chunk_to_games(chunk_text):
    """Split a chunk of PGN text into complete game strings."""
    games = []
    buf = []
    seen_tag = False
    seen_move = False
    for line in chunk_text.splitlines(keepends=True):
        buf.append(line)
        if line.startswith("["):
            seen_tag = True
        elif line.strip() == "":
            if seen_tag and seen_move:
                games.append("".join(buf))
                buf = []
                seen_tag = False
                seen_move = False
        else:
            seen_move = True
    if buf and "".join(buf).strip():
        games.append("".join(buf))
    return games


def parse_game_text(game_text):
    """Worker: parse a single game PGN text and return a small summary dict.

    Keep it small to reduce pickling cost.
    """
    try:
        game = chess.pgn.read_game(io.StringIO(game_text))
        if game is None:
            return None
        headers = dict(game.headers)
        # count moves
        move_count = sum(1 for _ in game.mainline_moves())
        return {
            "Event": headers.get("Event"),
            "White": headers.get("White"),
            "Black": headers.get("Black"),
            "Result": headers.get("Result"),
            "Moves": move_count,
        }
    except Exception as e:
        # return minimal error info, or you can log to file
        return {"error": str(e)}


def parse_chunk(chunk_text):
    """Worker helper: parse every game in a chunk."""
    return parse_batch(chunk_to_games(chunk_text))


def process_pgn_parallel(pgn_path, max_workers=None, chunk_size_bytes=8 * 1024 * 1024, max_games=10_000):
    """Read games from pgn_path and parse them in parallel by chunk."""
    results = []
    max_workers = max_workers or max(1, os.cpu_count() - 1)
    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for chunk in stream_chunks(pgn_path, chunk_size_bytes=chunk_size_bytes):
            futures.append(executor.submit(parse_chunk, chunk))
        stop = False
        for fut in as_completed(futures):
            chunk_results = fut.result()
            for record in chunk_results:
                if record is None:
                    continue
                results.append(record)
                if len(results) >= max_games:
                    stop = True
                    break
            if stop:
                break
        if stop:
            for fut in futures:
                if not fut.done():
                    fut.cancel()
    return results[:max_games]


def parse_batch(game_texts):
    """Helper executed in worker process: parse a list of game texts."""
    out = []
    for gt in game_texts:
        out.append(parse_game_text(gt))
    return out


def parse_games_to_records(game_summaries):
    """Dummy hook: convert parsed game summaries into DB-ready records."""
    return game_summaries


def load_records_to_db(records, connection_params=None):
    """Dummy hook: persist records into a database."""
    return len(records)


# summaries = process_pgn_parallel(pgn_file, max_workers=6, max_games=10_000)
# Example: count games and aggregate results
# print(f"Parsed {len(summaries)} games")
# # e.g. count results

# c = Counter(s.get("Result") for s in summaries)
# print(c)

In [5]:
pgn_file

'/home/vandy/Work/MATH6310/blunder-analysis/data/raw/lichess_db_standard_rated_2025-08.pgn'

In [6]:
%%time
i = 0
for game in stream_games(pgn_file):
    game_text = parse_game_text(game)
    i += 1

    if i > 10000:
        break

print(f"Total Games Processed: {i}")

Total Games Processed: 10001
CPU times: user 11.8 s, sys: 16.7 ms, total: 11.8 s
Wall time: 11.8 s


In [None]:
MAX_GAMES = 10_000
results = []
max_workers = max(1, os.cpu_count() - 1)
with ProcessPoolExecutor(max_workers=max_workers) as executor:
    futures = []
    for chunk in stream_chunks(pgn_file, chunk_size_bytes=8 * 1024 * 1024):
        futures.append(executor.submit(parse_chunk, chunk))
    stop = False
    for fut in as_completed(futures):
        chunk_results = fut.result()
        for record in chunk_results:
            if record is None:
                continue
            results.append(record)
            if len(results) >= MAX_GAMES:
                stop = True
                break
        if stop:
            break
    if stop:
        for fut in futures:
            if not fut.done():
                fut.cancel()
results = results[:MAX_GAMES]