In [31]:
import chess
import chess.pgn
import chess.engine
import io

import numpy as np
from tqdm.auto import tqdm
import pandas as pd

In [32]:
DATA_DIR = "data/"
STOCKFISH_DIR = 'stockfish/'
ARCHIVE_DIR = DATA_DIR + 'archives/'
BITBOARD_DIR = DATA_DIR + 'bitboards/'
ELITE_DATA_BASE_URL  = "https://database.nikonoel.fr/"
STOCKFISH_DOWNSTREAM = "https://github.com/official-stockfish/Stockfish/releases/latest/download/"

SAMPLE_ZIP = "lichess_elite_2021-11.zip"
SAMPLE_PGN = "lichess_elite_2021-11.pgn"
SAMPLE_BITBOARD = "elite_bitboard.csv"
ELITE_DATA_SAMPLE_URL = ELITE_DATA_BASE_URL + SAMPLE_ZIP
SAMPLE_ZIP_FILE = ARCHIVE_DIR + SAMPLE_ZIP
SAMPLE_PGN_FILE  = DATA_DIR + SAMPLE_PGN

STOCKFISH_AVX512_TAR = "stockfish-ubuntu-x86-64-avx512.tar"
STOCKFISH_AVX512 = "stockfish-ubuntu-x86-64-avx512"
STOCKFISH_AVX512_URL = STOCKFISH_DOWNSTREAM + STOCKFISH_AVX512_TAR
STOCKFISH_AVX512_EXE = STOCKFISH_DIR + STOCKFISH_AVX512

SAMPLE_BITBOARD_FILE = BITBOARD_DIR + SAMPLE_BITBOARD 

In [33]:
![ ! -f $SAMPLE_ZIP_FILE ] && wget $ELITE_DATA_SAMPLE_URL -P $ARCHIVE_DIR
![ ! -f $SAMPLE_PGN_FILE ] && unzip $SAMPLE_ZIP_FILE -d $DATA_DIR

In [34]:
![ ! -f $STOCKFISH_AVX512_TAR ] && wget $STOCKFISH_AVX512_URL
![ ! -f $STOCKFISH_AVX512_EXE ] && tar -xf $STOCKFISH_AVX512_TAR

In [66]:
class PGNProvider:
    
    def __init__(self, path : str, skip : int = 0):
        c = 0
        for line in open(path, "r"):
            if line == '\n':
                c += 1
        
        self.len = (c + 1) // 2
        self.read_count = 0
        self.pgn_file = open(path, "r")

        if skip >= self.len:
            raise Exception("Not enough games to skip")
        
        for _ in range(skip):
            while self.pgn_file.readline() != '\n': pass
            while self.pgn_file.readline() != '\n': pass
            self.len -= 1
    
    def __iter__(self):
        return self
    
    def __len__(self):
        return self.len
    
    def __next__(self):
        pgn_string = ""
        c = False

        while True:
            line = self.pgn_file.readline()
            
            if line == '':
                raise StopIteration
            
            if (c and line == '\n') or line == '':
                break
            elif line == '\n':
                c = True
            pgn_string += line   
        self.read_count += 1
        return pgn_string
    
    def __str__(self):
        return f"Read positions : {self.read_count} Limit : {self.limit} PGN file : {self.pgn_file}"

In [67]:
class DataGenerator:

    def __init__(self, pgn_path : str, engine_path : str, pos_limit : int, skip : int = 0, avoid_repetitions : bool = True):
        self.pgn_file = pgn_path
        self.pos_limit = pos_limit
        self.avoid_repetitions = avoid_repetitions
        self.pgn_provider = PGNProvider(pgn_path, skip=skip)
        self.signatures = set()
        self.pos_total = 0
        self.pos_written = 0
        self.games = 0
        self.engine = chess.engine.SimpleEngine.popen_uci(engine_path)
        self.engine.configure({"Threads" : 16, "Hash" : 1024})

    def write(self, out_file : str):
        print("Writing header...")
        self.write_header(out_file)
        print("Done")
        print("Writing boards...")
        with tqdm(total=self.pos_limit, position=0, leave=True, desc="Positions") as pbar:
            for game_str in self.pgn_provider:
                game = chess.pgn.read_game(io.StringIO(game_str))
                pos_count = self.write_positions_from_game(out_file, game)
                self.games += 1 if pos_count > 0 else 0
                pbar.update(pos_count)
                if self.pos_written >= self.pos_limit: break
        print("Done")
        print(f"Read {self.pos_total} positions total")
        print(f"Written {self.pos_written} positions")
        print(f"Games : {self.games}")

    def write_positions_from_game(self, out_file, game : chess.pgn.Game) -> int:
        pos_count = 0
        while game.next() is not None:
            self.pos_total += 1
            pos_count += self.write_pos(out_file, game)
            if self.pos_written >= self.pos_limit: break
            game = game.next()
        return pos_count

    def write_header(self, out_file : str) -> None:
        header = []
        for side in chess.COLORS:
            for piece in chess.PIECE_TYPES:
                curr_piece = chess.piece_symbol(piece)
                if side == chess.WHITE:
                    header.append(curr_piece.upper())
                else:
                    header.append(curr_piece)
        header += ["score", "draw", "fen"]
        with open(out_file, "w") as f:
            f.write(";".join(header) + "\n")

    def write_pos(self, out_file : str, game : chess.pgn.Game) -> int:
        content = []
        bitboards = self.pgn_to_bitboard(game)
        if bitboards in self.signatures:
            return 0
        self.signatures.add(bitboards)        
        info = self.engine.analyse(game.board(), chess.engine.Limit(time=0.1))
        score = info['score']
        if score.is_mate():
            return 0
        fen = game.board().fen()
        score = score.relative.score()
        draw = 1 if -50 <= score <= 50 else 0
        
        for bitboard in bitboards:
            content.append(str(bitboard))
        content += [str(score), str(draw), fen]
        with open(out_file, "a") as f:
            f.write(";".join(content) + "\n")
        self.pos_written += 1
        return 1
    
    def pgn_to_bitboard(self, game : chess.pgn.Game) -> tuple[int]:
        result = []
        for side in chess.COLORS:
            for piece in chess.PIECE_TYPES:
                result.append(int(game.board().pieces(piece, side)))
        return tuple(result)

data_generator = DataGenerator(SAMPLE_PGN_FILE, STOCKFISH_AVX512_EXE, skip=144100, pos_limit=10000)

In [297]:
# data_generator.write(SAMPLE_BITBOARD_FILE)
# Writing header...
# Done
# Writing boards...
# 
# Positions:   0%|          | 0/100000 [00:00<?, ?it/s]
# Done
# Read 117526 positions total
# Written 100000 positions
# Games : 1441


In [68]:
data_generator.write(SAMPLE_BITBOARD_FILE)

Writing header...
Done
Writing boards...


Positions:   0%|          | 0/10000 [00:00<?, ?it/s]

Done
Read 11180 positions total
Written 10000 positions
Games : 137


In [3]:
bdf = pd.read_csv(SAMPLE_BITBOARD_FILE, sep=";", dtype="uint64", usecols=range(12))
metadf = pd.read_csv(SAMPLE_BITBOARD_FILE, sep=";", usecols=range(12,15))

In [9]:
metadf['draw'].to_numpy().size

100000

In [296]:
metadf['draw'].mean()

0.36333

In [11]:
def pd_bitboard_to_numpy(df : pd.DataFrame) -> np.ndarray :
    return np.unpackbits(np.ascontiguousarray(df.to_numpy()).view(np.uint8), axis=1)

In [12]:
pd_bitboard_to_numpy(bdf)

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]], dtype=uint8)

In [1]:
with open("test", "wb") as f:
    f.write("123")

TypeError: a bytes-like object is required, not 'str'

In [4]:
a = 123123
a.to_bytes(length=3)

b'\x01\xe0\xf3'