In [3]:
import pkg_resources
pkg_resources.require("numpy==1.22.2")  # modified to use specific numpy
import numpy as np
print(np.__version__)
print(np.__path__)

1.22.2
['/opt/anaconda3/lib/python3.8/site-packages/numpy']


In [4]:
# libraries for data cleaning and preparation for model
import os
import os.path
import chess
import chess.pgn

# libraries for GAN model

from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import vstack
from numpy.random import randn
from numpy.random import randint

In [6]:
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers.convolutional import Conv2D,Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras.initializers import RandomNormal
from keras.layers import LeakyReLU
from keras.layers import BatchNormalization
from keras.layers import Activation,Reshape
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import Dropout


ImportError: cannot import name 'plot_model' from 'keras.utils' (/opt/anaconda3/lib/python3.8/site-packages/keras/utils/__init__.py)

In [174]:
# make a list of all pgn files for one player 
inputs = []
data = "/Users/zoepratt/Documents/GitHub/Top-Chess-Players/data/Raw_game/Raw_game/Alekhine"
for file in os.listdir(data):
    if file.endswith(".pgn"):
        inputs.append(os.path.join(data, file))
 
 # concatanate all pgn files in a file called names for the player
with open('merged_file.pgn', 'w') as outfile:
    for fname in inputs:
        with open(fname, encoding="utf-8", errors = 'ignore') as infile:
            outfile.write(infile.read())
            outfile.write('---')

In [175]:
def extractdata(pgn):
    '''
    input: pgn files of player
    output: 
        function returns step-by-step gameplay as a list
        function returns which player is playing White as a list
    
    list 'side' will be used to ensure only moves made by 
    intended player will be used in creation of the GAN
    '''
    
    side = []
    game_moves = []
    length = 20 #used for training purposes, to remove for DA servers
    for index in range(length):
        try:
            if chess.pgn.read_game(pgn).mainline_moves():
                # extracts game moves from the pgn files
                game_moves.append(chess.pgn.read_game(pgn).mainline_moves()) 
                
                # extracts player's name playing white from pgn files
                side.append(chess.pgn.read_game(pgn).headers["White"]) 
                
        except:
            print(index,chess.pgn.read_game(pgn))
            pass

    return game_moves, side

In [176]:
def categorize_moves(game_moves, side, name):
    '''
    input: game_moves and side list from extractdata function
    output:
        function returns 2 lists, which contain all of player's move
        list PW: player's moves when they are playing white
        list PB: player's moves when they are playing black 
    '''
    
    PW = [] # empty list for all moves when player playing white
    PB = [] # empty list for all moves when player playing black
    
    match = 0
    
    for game in game_moves:
        board = chess.Board() # saves FEN notation of chess board
        white = side[match]
        if white == name:
            identifier = 0
        else:
            identifier = 1
        
        play = 0
        for move in game:
            if play % 2 == identifier: # creates list PW of moves when the player is playing white
                PW.append(board.copy())
            board.push(move) # move game forward one move
            if play % 2 == identifier: # creates list PB of moves when the player is playing black
                PB.append(board.copy())
            play = play + 1
        match = match + 1
    
    return PW, PB

In [177]:
def make_matrix(board): 
    '''
    input: FEN notation of a board position
    output: matrix representing board position at a given moment
        each board square is an individual item in the matrix, blank squares are represented by a period
    '''
    
    pgn = board.epd() # convert FEN notation of board into EPD notation
    matrix = []  

    # retrieve only the first field from EPD notation: the piece placement
    pieces = pgn.split(" ", 1)[0] 
    
    # separate into placement of individual pieces
    rows = pieces.split("/")
    
    # separates rows so that each specific square on the board is its own list entry, formatted as a matrix
    for row in rows:
        game_row = []  
        for item in row:
            if item.isdigit():
            
            # replaces numbers in epd that represent the number of empty squares with a period for each empty square
            # example: '8' = '........'
            
                for i in range(0, int(item)):
                    game_row.append('.')
            else:
                game_row.append(item)
        matrix.append(game_row)

    return matrix

In [178]:
# translate game pieces to binary values using one-hot encoding
# each game piece is represented by a unique binary vector

chess_dict = {
    'p' : [1,0,0,0,0,0,0,0,0,0,0,0,0],
    'P' : [0,0,0,0,0,0,1,0,0,0,0,0,0],
    'n' : [0,1,0,0,0,0,0,0,0,0,0,0,0],
    'N' : [0,0,0,0,0,0,0,1,0,0,0,0,0],
    'b' : [0,0,1,0,0,0,0,0,0,0,0,0,0],
    'B' : [0,0,0,0,0,0,0,0,1,0,0,0,0],
    'r' : [0,0,0,1,0,0,0,0,0,0,0,0,0],
    'R' : [0,0,0,0,0,0,0,0,0,1,0,0,0],
    'q' : [0,0,0,0,1,0,0,0,0,0,0,0,0],
    'Q' : [0,0,0,0,0,0,0,0,0,0,1,0,0],
    'k' : [0,0,0,0,0,1,0,0,0,0,0,0,0],
    'K' : [0,0,0,0,0,0,0,0,0,0,0,1,0],
    '.' : [0,0,0,0,0,0,0,0,0,0,0,0,1],
}

In [179]:
def translate(matrix, chess_dict):
    '''
    input: matrix created in the previous function, chess dictionary
    output: layout of a chess board represented by a matrix of one-hot encoded values
    '''
    
    rows = []
    for row in matrix:
        pieces = []
        for piece in row:

            # appends one-hot endoded value associated with each chess piece, pulled from chess_dict
            pieces.append(chess_dict[piece])
        rows.append(pieces)

    return rows

In [180]:
def one_hot_matrix(X, Y):
    '''
    inputs:
        X: list of moves when player is playing white
        Y: list of moves when player is playing black
        
    translate and make_matrix functions to convert each instance of the game board into a one-hot encoded matrix
    then transforms matrix into numpy array
    
    outputs:
        X_array: numpy array of all moves when player is playing white
        Y_array: numpy array of all moves when player is playing black
    '''
    for i in range(len(X)):
        X[i] = translate(make_matrix(X[i]),chess_dict)
    for i in range(len(Y)):
        Y[i] = translate(make_matrix(Y[i]),chess_dict)
    X_array = np.array(X)
    Y_array = np.array(Y)

    return X_array, Y_array

In [182]:
def define_discriminator():
    '''
    create the PatchGAN discriminator model
    
    model takes a chess move from the source domain (real move) and a chess move from the target domain 
    (imitated move)and predicts the likelihood of whether the move from the target domain is a real or 
    generated version of the chess player's gameplay
    '''
    
    # initiatlize as randomly distributed with a SD of 0.02
    init = RandomNormal(stddev=0.02)
    in_src_image = Input(shape=image_shape)
    in_target_image = Input(shape=image_shape)
    merged = concatenate([in_src_image, in_target_image])
    d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merged)
    d = LeakyReLU(alpha=0.2)(d)
    d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLU(alpha=0.2)(d)
    d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLU(alpha=0.2)(d)
    d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLU(alpha=0.2)(d)
    d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
    d = BatchNormalization()(d)
    d = LeakyReLU(alpha=0.2)(d)
    d = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
    patch_out = Activation('sigmoid')(d)
    model = Model(inputs = [in_src_image, in_target_image], outputs = patch_out)
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, loss_weights=[0.5])
    return model

In [183]:
def define_encoder_block(layer_in, n_filters, batchnorm=True):
    init = RandomNormal(stddev=0.02)
    g = Conv2D(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
    if batchnorm:
        g = BatchNormalization()(g, training=True)
    g = LeakyReLU(alpha=0.2)(g)
    return g

In [184]:
def decoder_block(layer_in, skip_in, n_filters, dropout=True):
    init = RandomNormal(stddev=0.02)
    g = Conv2DTranspose(n_filters, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(layer_in)
    g = BatchNormalization()(g, training=True)
    if dropout:
        g = Dropout(0.5)(g, training=True)
    g = concatenate([g, skip_in])
    g = Activation('relu')(g)
    return g

In [185]:
def define_generator(image_shape=(8,8,13)):
    init = RandomNormal(stddev=0.02)
    in_image = Input(shape=image_shape)
    e1 = define_encoder_block(in_image, 64, batchnorm=False)
    e2 = define_encoder_block(e1, 128)
    b = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(e2)
    b = Activation('relu')(b)
    d6 = decoder_block(b, e2, 128, dropout=False)
    d7 = decoder_block(d6, e1, 64, dropout=False)
    g = Conv2DTranspose(13, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d7)
    out_image = Activation('softmax')(g)
    model = Model(in_image, out_image)
    return model

In [186]:
def define_gan(g_model, d_model, image_shape):
    d_model.trainable = False
    in_src = Input(shape=image_shape)
    gen_out = g_model(in_src)
    dis_out = d_model([in_src, gen_out])
    model = Model(in_src, [dis_out, gen_out])
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'mae'], optimizer=opt, loss_weights=[1,100])
    return model

In [187]:
def generate_real_samples(dataset, n_samples, patch_shape):
    trainA, trainB = dataset
    ix = randint(0, trainA.shape[0], n_samples)
    X1, X2 = trainA[ix], trainB[ix]
    y = ones((n_samples, patch_shape, patch_shape, 1))
    return [X1, X2], y
 
def generate_fake_samples(g_model, samples, patch_shape):
    X = g_model.predict(samples)
    y = zeros((len(X), patch_shape, patch_shape, 1))
    return X, y

In [188]:
def train(d_model, g_model, gan_model, dataset, n_epochs=100, n_batch=1):
    n_patch = d_model.output_shape[1]
    trainA, trainB = dataset
    bat_per_epo = int(len(trainA) / n_batch)
    n_steps = bat_per_epo * n_epochs
    for i in range(n_steps):
        [X_realA, X_realB], y_real = generate_real_samples(dataset, n_batch, n_patch)
        X_fakeB, y_fake = generate_fake_samples(g_model, X_realA, n_patch)
        d_loss1 = d_model.train_on_batch([X_realA, X_realB], y_real)
        d_loss2 = d_model.train_on_batch([X_realA, X_fakeB], y_fake)
        g_loss, _, _ = gan_model.train_on_batch(X_realA, [y_real, X_realB])
        print('>%d, d1[%.3f] d2[%.3f] g[%.3f]' % (i+1, d_loss1, d_loss2, g_loss))
    if (i+1) % (bat_per_epo * 10) == 0:
        clear_output()

In [189]:
image_shape = (8,8,13)
d_model = define_discriminator()
g_model = define_generator()
gan_model = define_gan(g_model, d_model, image_shape)
train(d_model, g_model, gan_model, [X,y])

NameError: name 'RandomNormal' is not defined

In [181]:
adams_pgn = open("/Users/zoepratt/Documents/GitHub/Top-Chess-Players/data/test_Adams.pgn")

def main_adams():
    game_moves, side = extractdata(adams_pgn)
    PW, PB = categorize_moves(game_moves, side, 'Adams, Michael')
    X_array, Y_array = one_hot_matrix(PW, PB)

main_adams()