In [1]:
import numpy as np
import chess
import chess.pgn
from chess.engine import PovScore, Cp
from io import StringIO,TextIOWrapper
import h5py
import sys
import zstandard as zstd

We prepare the data in several stages.

1. Extract valid games (see below) into pgn files using string operations.
2. Create different hdf5 files for training, validation and testing containing (game,label) tensors for different time controls from multiple pgn files, filtering out invalid games using the python chess library.
3. Create oversampled training files.

Valid game files are based on the `TimeControl "{a time control}"` field, whether the term `[%eval` is in the string and whether `BlackRatingDiff` and `WhiteRatingDiff` is below a certain value as well as whether `Termination` is *not* `Abandoned` or `Rules nfraction`.

If all the above conditions are met we utilise the `Board` class to parse the pgn, also checking that game length is above some minimum.

In [2]:
#stores a time-control:file string. The time-control is a regex string that matches the time-control of a game. The file string is the name of the file that the game should be saved to.

file_dict = {"300+0":"blitz",
             "300+3":"blitz",
             "60+0":"ultrabullet",
             "120+1":"bullet",
             "180+0":"superblitz",
             "180+2":"superblitz",
             "600+0":"rapid",
             "600+5":"rapid",
             "900+10":"rapid"
}

#the maximum rating diff above which we ignore the game
MAX_RATING_DIFF = 40

#Termination strings that we ignore
TERMINATION_STRINGS=set(["Abandoned","Rules infraction"])

NUM_MOVES = 40 #number of moves to consider for each game

In [15]:
def get_game_tensor(game_string):
    """returns a tensor representation of the game string. If the game is invalid, returns None. Note that a valid game will have 2 game tensors, one for each player. We also return the ratings of the players and the file that the game should be saved to."""

    #start by checking if the game is valid. The time control is a substring of the form 'TimeControl "{TC}"' where {TC} is a variable, check if {TC} is in the file_dict.

    time_control = game_string.split('TimeControl "')[1].split('"')[0]
    if time_control not in file_dict:
        return None
    
    valid = False
    
    if '[%eval' in game_string and 'WhiteRatingDiff' in game_string and 'BlackRatingDiff' in game_string:
        white_diff = int(game_string.split('WhiteRatingDiff "')[1].split('"')[0])
        black_diff = int(game_string.split('BlackRatingDiff "')[1].split('"')[0])
        if abs(white_diff) < MAX_RATING_DIFF and abs(black_diff) < MAX_RATING_DIFF:
            valid = True
    if not valid:
        return None
    
    #check for termination strings
    for term in TERMINATION_STRINGS:
        if term in game_string:
            return None
    
    ########prepare the game tensors
    gt1 = np.zeros((NUM_MOVES,136))
    gt2 = np.zeros((NUM_MOVES,136))

    game = chess.pgn.read_game(StringIO(game_string))

    board = game.board()
    white_time = 0
    black_time = 0

    move_number = 0

    current_eval = PovScore(Cp(0), chess.WHITE)
    current_move_color = chess.WHITE
    while True:
        t = np.zeros(136)

        for i in range(64):
            if board.piece_at(i) is None:
                t[i] = 0
            elif board.piece_at(i).color == current_move_color:
                t[i] = board.piece_at(i).piece_type
            else:
                t[i] = board.piece_at(i).piece_type + 7

        # get the evaluation, time etc.
        t[128] = move_number // 2  # move number

        t[129] = white_time if current_move_color == chess.WHITE else black_time

        t[131] = black_time if current_move_color == chess.WHITE else white_time

        if current_eval is None: #mate in 0
            t[135] = 1
            t[134] = 0
        elif current_eval.pov(current_move_color).is_mate(): #mate in X
            t[133] = 1
            t[132] = current_eval.pov(current_move_color).mate()
        else:
            t[133] = 0
            t[132] = current_eval.pov(current_move_color).score()

        if move_number == 0:
            m = game.next()
        else:
            m = m.next()
        if m is None:
            break

        if current_move_color == chess.WHITE:
            white_time = m.clock()
        else:
            black_time = m.clock()

        current_eval = m.eval()
        board = m.board()

        for i in range(64):
            if board.piece_at(i) is None:
                t[i + 64] = 0
            elif board.piece_at(i).color == current_move_color:
                t[i + 64] = board.piece_at(i).piece_type
            else:
                t[i + 64] = board.piece_at(i).piece_type + 7

        t[130] = white_time if current_move_color == chess.WHITE else black_time

        if current_eval is None:
            t[135] = 1
            t[134] = 0
        elif current_eval.pov(current_move_color).is_mate():
            t[135] = 1
            t[134] = current_eval.pov(current_move_color).mate()
        else:
            t[135] = 0
            t[134] = current_eval.pov(current_move_color).score()

        if current_move_color == chess.WHITE:
            gt1[move_number // 2] = t
        else:
            gt2[move_number // 2] = t

        current_move_color = not current_move_color

        move_number += 1

        if move_number == NUM_MOVES * 2:
            break

    return np.array(gt1),np.array(gt2),int(game.headers['WhiteElo']),int(game.headers['BlackElo']),file_dict[time_control]


We want to read from an input file compressed using zst and write all the resultant game tensors and ratings to a hdf5 file.

In [83]:
CHUNKSIZE = 1000

def write_to_hdf5(reader):
    """writes the games in the reader to an hdf5 file. The reader is a generator that yields game strings. The games are stored in the file according to the time-control of the game. We will write the game tensors as a dataset in the file. We will also write the ratings of the players as a dataset in the file. The file will be named according to the time-control of the games."""

    #open all the files so that we don't have to keep doing it.
    files = {}
    for file_name in set(file_dict.values()):
        files[file_name] = h5py.File(f"{file_name}.hdf5","a")

    file_indexes = {}
    if files[file_name].get("game_tensors") is not None:
        file_indexes = {file_name:len(files[file_name]["game_tensors"]) for file_name in files}
    else:
        file_indexes = {file_name:0 for file_name in files}

    game = ""
    count = 0

    for line in reader:
        if line.startswith("[Event") and game == "": #start of a new game when the file hasn't been initialized
            game = line
        elif line.startswith("[Event") and game != "": #start of a new game when the file has been initialized, write the previous game to the file

            game_tensors = get_game_tensor(game)
            if game_tensors is None:
                game = line
                continue
            else:
                #print("read game",game)
                if count % 10000 == 0:
                    print("read",count,"games")
                count += 1
                gt1,gt2,white_rating,black_rating,file_name = game_tensors
                #print(np.array(gt1.shape),np.array([white_rating]).shape)
                f = files[file_name]
                if f.get("game_tensors") is None:
                    f.create_dataset("game_tensors",shape=(CHUNKSIZE,40,136),maxshape=(None,40,136),chunks=True,compression='gzip',compression_opts=9)
                    f.create_dataset("ratings",shape=(CHUNKSIZE,1),chunks=True,maxshape=(None,1),compression='gzip',compression_opts=9)
                    f["game_tensors"][0] = gt1
                    f["game_tensors"][1] = gt2
                    f["ratings"][0] = np.array([white_rating])
                    f["ratings"][1] = np.array([black_rating])
                    file_indexes[file_name] = 2
                else: #file already exists
                    #check if we need to resize the dataset
                    if file_indexes[file_name]+1 >= f["game_tensors"].shape[0]:
                    #+1 as we are writing 2 games at a time
                        f["game_tensors"].resize((f["game_tensors"].shape[0] + CHUNKSIZE,40,136))
                        f["ratings"].resize((f["ratings"].shape[0] + CHUNKSIZE,1))
                    #write the new game
                    f["game_tensors"][file_indexes[file_name]] = gt1
                    f["game_tensors"][file_indexes[file_name]+1] = gt2
                    f["ratings"][file_indexes[file_name]] = np.array([white_rating])
                    f["ratings"][file_indexes[file_name]+1] = np.array([black_rating])
                    file_indexes[file_name] += 2
                game = line
        else: #continue reading the game
            game += line

    for f in files.values():
        #reshape the datasets to remove the extra space
        f["game_tensors"].resize((file_indexes[f],40,136))
        f["ratings"].resize((file_indexes[f],1))
        f.close()

Allow processing of "plain" files or "zst" files passed in from the command line.

In [84]:
def read_file(fn):
    #if the filename ends in .pgn we will read it as a text file. If it ends in .zst we will read it as a compressed file using streaming.
    if fn.endswith(".pgn"):
        with open(fn,"r") as f:
            write_to_hdf5(f)
    elif fn.endswith(".zst"):
        with open(fn,"rb") as f:
            dctx = zstd.ZstdDecompressor()
            with dctx.stream_reader(f) as reader:
                text_stream = TextIOWrapper(reader, encoding='utf-8')
                write_to_hdf5(text_stream)

In [85]:
read_file("data/all_data/lichess09.pgn.zst")
read_file("data/all_data/lichess05.pgn.zst")

read 0 games


KeyboardInterrupt: 

In [None]:
#read the hdf file up to some index and bins index values into 48 bins based on the rating. These bins are 600-650,650-700...
def create_bins(file,start_index,end_index):
    bins=[[]*48]
    f = h5py.File(file,"r")
    ratings = f["ratings"][start_index:end_index]
    for r in range(len(ratings)):
        bin=(ratings[r]-600)//50
        bins[bin].append(r)
    return bins

def shuffle_bins(bins):
    #shuffle the bins and the bin contents
    np.random.shuffle(bins)
    for b in bins:
        np.random.shuffle(b)



In [None]:
from tensorflow.keras.utils import Sequence

class DataGenerator(Sequence):
    def __init__(self, file, batch_size, start_index,end_index, shuffle=False):
        self.f = h5py.File(file,"r")
        self.batch_size = batch_size
        self.start_index = start_index
        self.end_index = end_index
        if shuffle:
            self.bins = create_bins(file,start_index,end_index)

        self.current_index = 0
        self.current_bin = 0

    def __len__(self):
        return self.end_index-self.start_index//self.batch_size
        

    def __getitem__(self, index):
        if not self.shuffle:
            #if we are not shuffling, we just return the next batch
            if self.current_index+self.batch_size > self.end_index:
                self.current_index = self.start_index
            x_batch = self.f["game_tensors"][self.current_index:self.current_index+self.batch_size]
            y_batch = self.f["ratings"][self.current_index:self.current_index+self.batch_size]
            self.current_index += self.batch_size
            return x_batch,y_batch
        else:


    def on_epoch_end(self):
        self.current_bin = 0
        self.current_index = 0
        if shuffle:
            shuffle_bins(self.bins)

    def __del__(self):
        self.f.close()