In [1]:
import chess
import chess.pgn
import os
import sys
import numpy as np

sys.path.append("..")

from silvermind import states

In [2]:
num_cpu_cores = os.environ["NUM_CPU_CORES"]
if num_cpu_cores is None or num_cpu_cores == "":
    num_cpu_cores = input("NUM_CPU_CORES")
num_cpu_cores = int(num_cpu_cores) - 4

In [3]:
def make_dataset(location="../pgns", verbose=False, max_games_per_batch=sys.maxsize):
    
    def worker_make_dataset(return_dict, file_name, max_games=sys.maxsize):
        
        games = []
        with open(file_name) as pgn:
            while (game := chess.pgn.read_game(pgn)) is not None:
                if len(games) >= max_games:
                    break
                games.append(game)
        
        X = []
        y = []
        for i, game in enumerate(games):
            result = {"1-0":1, "0-1":0, "1/2-1/2":None, "*":None}[game.headers["Result"]]
            if result is None:
                continue
            board = game.board()
            X.append(states.BoardState(board).serialize())
            y.append(result)
            for move in game.mainline_moves():
                board.push(move)
                X.append(states.BoardState(board).serialize())
                y.append(result)
            if verbose and i % (max(1, max_games // 5)) == 0:
                print(f"batch {round(i/max_games, 3)*100}% complete")
        
        return_dict[file_name] = (X, y)
    
    import multiprocessing
    
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    workers = []
    
    file_names = [f"{location}/{file_title}" for file_title in os.listdir(location) if file_title.endswith(".pgn")]
    
    if verbose:
        print("Creating workers")
    for file_name in file_names:
        worker = multiprocessing.Process(
            target=worker_make_dataset,
            args=(return_dict, file_name, max_games_per_batch)
        )
        workers.append(worker)
    
    if verbose:
        print("Activating workers")
    num_workers_executed = 0
    while(num_workers_executed < len(workers)):
        workers_to_execute = min(num_cpu_cores, len(workers) - num_workers_executed)
        for i in range(workers_to_execute):
            workers[num_workers_executed + i].start()
        for i in range(workers_to_execute):
            workers[num_workers_executed + i].join()
        
        num_workers_executed += workers_to_execute
        
        if verbose:
            print(f"Dataset creation {round(min(1, num_workers_executed/len(workers)), 3) * 100}% complete...")
    
    y = []
    X = []
    for file_name in file_names:
        if file_name not in return_dict:
            continue
        res_X, res_y = return_dict[file_name]
        X.extend(res_X)
        y.extend(res_y)
    X = np.array(X)
    y = np.array(y)
    
    return X, y

X, y = make_dataset(max_games_per_batch=100, verbose=False)

if not os.path.isdir("../dataset"):
    os.mkdir("../dataset")

np.savez("../dataset/tiny_game_data.npz", X, y)