In [3]:
# import necessary packages
import chess
import chess.pgn
import pickle
import h5py
import numpy as np
import random

In [4]:
# piece to index mapping
piece_to_ind = {}
cur_ind = 0
for color in [True, False]:
    for piece_num in range(6):
        piece_to_ind[(color, piece_num+1)] = cur_ind
        cur_ind += 1

# outcome to value mapping
outcome_to_val = {}
outcome_to_val['1-0'] = 1.0
outcome_to_val['0-1'] = -1.0
outcome_to_val['1/2-1/2'] = 0.0

In [34]:
def convert_board(board):
    '''
    input: board in PGN notation
    output: board in 8x8x12 numpy array
        8x8 for board dimensions, 12 to show how many pieces can fil an individual square
    '''
    # create empty 8x8x12 numpy array
    b_tensor = np.zeros((8, 8, 12))

    # iterate over board squares 
    for i in range(64):
        piece = board.piece_at(i)
        if not piece:
            continue
        ind = piece_to_ind[(piece.color, piece.piece_type)]

        # create one-hot encoded values
        b_tensor[i//8, i%8, ind] = 1

    return b_tensor

In [6]:
# return a random next board
def get_random_next(board):
    '''
    input: board in PGN notation
    output: random legal chess move given the board layout, used in training process of the generator model
    '''
    moves = list(board.legal_moves)
    board.push(random.choice(moves))
    return board

In [36]:
# Generates training data based on single board transitions
def gen_board_pair_data(infile, outfile):
    '''
    input: PGN file containing step-by-step gameplay, name of outfile
    output: file with 3 numpy arrays
        1. initial board layout
        2. board position that came after the initial layout in the real game
        3. random legal chess move that could have been played based on initial layout
    '''
    # game data
    pgn = open(infile)
    cur_game = chess.pgn.read_game(pgn)
    game = 0

    # set up output file
    out = h5py.File(outfile+'.hdf5', 'w')
    f_boards, s_boards, r_boards = [
        out.create_dataset(dname, (0, 8, 8, 12), dtype='b',
                            maxshape=(None, 8, 8, 12),
                            chunks=True)
        for dname in ['f_boards', 's_boards', 'r_boards']]
    playing, results, move_props = [
        out.create_dataset(dname, (0,), dtype='b',
                            maxshape=(None,),
                            chunks=True)
        for dname in ['playing', 'results', 'move_props']]

    # loop through games, adding to outfile
    line_num = 0
    size = 0
    game_num = 0
    while cur_game:
        node = cur_game
        move_total = 0
        outcome = outcome_to_val[cur_game.headers['Result']]
        to_play = 1
        # loop through boards
        while not node.is_end():
            # check if datasets need to be resized
            if line_num+1 >= size:
                out.flush()
                size = 2*size+1
                [d.resize(size=size, axis=0) for d in
                    [f_boards, s_boards, r_boards, playing, results, move_props]]

            move_total += 1
            next_node = node.variation(0)
            
            # add layouts to numpy arrays in outfile

            # first board layout
            f_boards[line_num] = convert_board(node.board())

            # board layout following that of first board position in the actual game
            s_boards[line_num] = convert_board(next_node.board())
            
            # random move decision following first board position
            r_boards[line_num] = convert_board(get_random_next(node.board()))
            
            playing[line_num] = to_play
            results[line_num] = outcome
            to_play = -1*to_play
            node = next_node
            line_num += 1
        
        # count number of moves
        for move in range(1, move_total+1):
            move_props[line_num-move_total-1+move] = move/float(move_total)

        # move on to next game in the PGN file
        cur_game = chess.pgn.read_game(pgn)
        game_num += 1
    game = game + 1

    # finish storing collected data in outfile
    [d.resize(size=line_num, axis=0) for d in
        [f_boards, s_boards, r_boards, playing, results, move_props]]
    out.close()

In [15]:
def gen_player_data(infile, outfile, player_name):
    '''
    input: PGN file containing step-by-step gameplay for a specific player, name of outfile, last name of player
    output: file with 3 numpy arrays
        1. initial board layout
        2. chess move of the player following initial layout
        3. player in the game who made the move
    '''
    # game data
    pgn = open(infile)
    cur_game = chess.pgn.read_game(pgn)

    # set up output file
    out = h5py.File(outfile+'.hdf5', 'w')
    f_boards, s_boards = [
        out.create_dataset(dname, (0, 8, 8, 12), dtype='b',
                            maxshape=(None, 8, 8, 12),
                            chunks=True)
        for dname in ['f_boards', 's_boards']]
    p_color = [
        out.create_dataset(dname, (0,), dtype='b',
                            maxshape=(None,),
                            chunks=True)
        for dname in ['p_color']][0]
    full_boards = []

    # loop through games 
    line_num = 0
    size = 0
    game_num = 0
    while cur_game:
        node = cur_game
        move_total = 0
        to_play = 1
        player = -1
        if player_name in cur_game.headers['White']:
            player = 1
        # loop through boards
        while not node.is_end():
            # check if datasets need to be resized
            if line_num+1 >= size:
                out.flush()
                size = 2*size+1
                [d.resize(size=size, axis=0) for d in
                    [f_boards, s_boards, p_color]]

            next_node = node.variation(0)

            # add layouts to numpy arrays in outfile
            # ensures only player's moves are recorded
            if to_play == player:
                full_boards.append(node.board())
                # first position boards
                f_boards[line_num] = convert_board(node.board())
                
                # second position boards
                s_boards[line_num] = convert_board(next_node.board())
                
                # color of moving player
                p_color[line_num] = player
                line_num += 1
                
            to_play = -1*to_play
            node = next_node
        cur_game = chess.pgn.read_game(pgn)
        game_num += 1

    # finish storing collected data in outfile
    [d.resize(size=line_num, axis=0) for d in
        [f_boards, s_boards, p_color]]
    out.close()
    
    pickle.dump(full_boards, open("full_boards_tal.pkl", "wb"))

In [None]:
def main():
    datafile = 'ficsgamesdb_202201_standard2000_nomovetimes_243266.pgn'
    playerfile = 'tal.pgn'
    gen_board_pair_data(datafile, 'tal_training')
    gen_player_data(playerfile, 'tal_player', 'Tal')

if __name__ == '__main__':
    main()

In [None]:
print('Tal complete!')