### Variational AutoEncoder Chess Position Generator

##### Inspiration
* Recently, I have been reading up about generative models, and one of them that caught my eye was the VAE.
* It allows you to generate new data that is similar to your training data.
* At the same time, I am interested in chess and have enjoyed solving chess puzzles for quite awhile.
* However, the premise of a chess puzzle is that the player knows that there exists a optimal move / sequence of moves that provides the player an advantage.
* This helps the player to improve in terms of tactics and pattern recognition, but in most cases when playing a game of chess, we do not know if there exists an optimal solution.
* This introduces the idea of an anti-puzzle, where the premise is now that the chess position provided may have an optimal solution, or the "solution" is to play a move that maintains the status-quo.
* With the VAE, we can train it with a training set of legal chess positions, and have it output more chess positions.
* Since the VAE would not have any idea if the chess position has an optimal solution or not, it is perfect for creating "anti-puzzle" solutions.
* Furthermore, chess is a "constrained" game, where the rules are clear and we can check if the position generated by the VAE is a legal position or not.
* For this model, the goal is to simply generate new (legal) chess positions.

In [1]:
import math
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import backend as K
import matplotlib.pyplot as plt
import keras
from scipy.stats import norm
from keras import layers, models, metrics, losses, optimizers
from keras.callbacks import EarlyStopping
from sklearn.model_selection import KFold, train_test_split

2023-08-30 01:49:58.550242: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2, in other operations, rebuild TensorFlow with the appropriate compiler flags.


##### Data Collection

* The easiest way to obtain chess is positions is from my own games.
* I exported move data from some chess games that I have played online in Lichess, which comes in a .pgn file.
* From this file, we can get the move orders for the games that I have exported, from which I can deduce the chess positions.
* For this, I used the python-chess library, which helps to deduce FEN positions from PGN move list
* Once we get the FEN positions, we can derive the values for the input data we wish to parse into our model

##### Data Representation
* Although this doesn't give the chess positions directly, we can manipulate it into a form that works for the VAE.
* The current idea is to have a 8 x 8 x 12 matrix, which means to say each of the 12 pieces (K, Q, R, B, N, P, k, q, r, b, n, p) each have their own 8 x 8 chessboard that denotes their position.
* We can generate these as all chess games I exported start from the standard position, and we can denote the piece at a certain position with a 1 (i.e. 0 marks that the piece is not at that position).
* This coincidentally is a perfect data set for generating anti-puzzles as it is formed from the sequence of moves of a game, of which not all positions have an optimal solution.

In [2]:
DIR = os.path.dirname(__vsc_ipynb_file__)
fen_data_path = os.path.join(DIR, "data", "fen-data.txt")

In [3]:
PIECE_TO_IDX = dict([[c, i] for i, c in enumerate('KQRBNPkqrbnp')])

def generate_matrix_from_fen(fen_string):
    # initialise board
    board = [[[0 for k in range(12)] for j in range(8)] for i in range(8)]

    # process FEN string
    board_string = fen_string.split(" ")[0].split("/")
    row, col = 0, 0
    for board_row in board_string:
        for row_item in board_row:
            if row_item.isnumeric():
                col += int(row_item)
            else:
                board[row][col][PIECE_TO_IDX[row_item]] = 1
                col += 1
        row += 1
        col = 0
    return board

In [4]:
data = []
with open(fen_data_path) as file:
    for line in file:
        data.append(np.array(generate_matrix_from_fen(line)))
data = np.array(data, 'float64')
print(data.shape)

(18175, 8, 8, 12)


In [5]:
# (hyper)parameters
latent_dims = 8
hidden_layers = 3
base_units = 2 << 5
kernel_size = (3, 3)
strides = 2
dropout_rate = 0.3
beta = 10 ** -1

In [21]:
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = K.random_normal(shape = (batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class vae_chess(models.Model):

    def __init__(self, latent_dims, hidden_layers, base_units, kernel_size, strides, dropout_rate, beta):
        super(vae_chess, self).__init__()

        self.latent_dims = latent_dims
        self.beta = beta

        self.encoder = self.generate_encoder_model(hidden_layers, base_units, kernel_size, strides, dropout_rate)
        self.decoder = self.generate_decoder_model(hidden_layers, base_units, kernel_size, strides, dropout_rate)
        print(self.encoder.summary())
        print(self.decoder.summary())

        self.total_loss_tracker = metrics.Mean(name = "total_loss")
        self.reconstruction_loss_tracker = metrics.Mean(name = "reconstruction_loss")
        self.kl_loss_tracker = metrics.Mean(name = "kl_loss")

    @property
    def metrics(self):
        return [self.total_loss_tracker, self.reconstruction_loss_tracker, self.kl_loss_tracker]

    def generate_encoder_model(self, hidden_layers, base_units, kernel_size, strides, dropout_rate):
        encoder_input = layers.Input(shape = (8, 8, 12), name = "encoder_input")

        for i in range(hidden_layers):
            conv_layer = layers.Conv2D(base_units << i, kernel_size, strides, padding = "same")(encoder_input if i == 0 else dropout_layer)
            batch_norm_layer = layers.BatchNormalization()(conv_layer)
            activation_layer = layers.Activation('relu')(batch_norm_layer)
            dropout_layer = layers.Dropout(dropout_rate)(activation_layer)
        self.pass_back_shape = K.int_shape(dropout_layer)[1:]

        flatten_layer = layers.Flatten()(dropout_layer)
        z_mean = layers.Dense(self.latent_dims, name = "z_mean")(flatten_layer)
        z_log_var = layers.Dense(self.latent_dims, name = "z_log_var")(flatten_layer)
        z = Sampling()([z_mean, z_log_var])

        return models.Model(encoder_input, [z_mean, z_log_var, z], name = "encoder")
    
    def generate_decoder_model(self, hidden_layers, base_units, kernel_size, strides, dropout_rate):
        decoder_input = layers.Input(shape = (self.latent_dims), name = "decoder_input")

        before_reshape = layers.Dense(np.prod(self.pass_back_shape))(decoder_input)
        reshape_layer = layers.Reshape(self.pass_back_shape)(before_reshape)

        for i in range(hidden_layers - 1, -1, -1):
            conv_transpose_layer = layers.Conv2DTranspose(base_units << i, kernel_size, strides, padding = "same")(reshape_layer if i == hidden_layers - 1 else dropout_layer)
            batch_norm_layer = layers.BatchNormalization()(conv_transpose_layer)
            activation_layer = layers.Activation('relu')(batch_norm_layer)
            dropout_layer = layers.Dropout(dropout_rate)(activation_layer)

        decoder_output = layers.Conv2DTranspose(12, kernel_size, 1, padding = "same")(dropout_layer)

        return models.Model(decoder_input, decoder_output, name = "decoder")
    
    def call(self, inputs):
        z_mean, z_log_var, z = self.encoder(inputs)
        reconstruction = self.decoder(z)
        return z_mean, z_log_var, reconstruction
    
    def train_step(self, data):
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstruction = self(data)
            reconstruction_loss = tf.reduce_mean(losses.binary_crossentropy(data, reconstruction, axis = (1, 2, 3)))
            kl_loss = tf.reduce_mean(tf.reduce_sum(-0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)), axis = 1))
            total_loss = reconstruction_loss + self.beta * kl_loss
        
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {m.name : m.result() for m in self.metrics}
    
    def test_step(self, data):
        z_mean, z_log_var, reconstruction = self(data)
        reconstruction_loss = tf.reduce_mean(losses.binary_crossentropy(data, reconstruction, axis = (1, 2, 3)))
        kl_loss = tf.reduce_mean(tf.reduce_sum(-0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)), axis = 1))
        total_loss = reconstruction_loss + self.beta * kl_loss
        
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {m.name : m.result() for m in self.metrics}

vae = vae_chess(latent_dims, hidden_layers, base_units, kernel_size, strides, dropout_rate, beta)
vae.compile(optimizer = "adam")

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 encoder_input (InputLayer)     [(None, 8, 8, 12)]   0           []                               
                                                                                                  
 conv2d_18 (Conv2D)             (None, 4, 4, 64)     6976        ['encoder_input[0][0]']          
                                                                                                  
 batch_normalization_36 (BatchN  (None, 4, 4, 64)    256         ['conv2d_18[0][0]']              
 ormalization)                                                                                    
                                                                                                  
 activation_36 (Activation)     (None, 4, 4, 64)     0           ['batch_normalization_36[0]

In [23]:
EPOCHS = 500
BATCH_SIZE = 256

vae.fit(data, epochs = EPOCHS, batch_size = BATCH_SIZE, shuffle = True, validation_split = 0.1)

Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500

KeyboardInterrupt: 