In [None]:
import pandas as pd
import numpy as np
import random
from sklearn.model_selection import train_test_split
import chess

PATH = 'data/chessData.csv'
random.seed(0)

In [None]:
df = pd.read_csv(PATH, encoding="utf-8")
# 100k datapoints. White's turn. Preprocess to remove "mate in X" evaluations with '#' character. All values between -2k and 2k centipawns. No 0-evaluation.
df['Evaluation'] = df['Evaluation'].apply(pd.to_numeric, errors='coerce')
df = df[(-2000 <= df['Evaluation']) & (df['Evaluation'] <= 2000) & (df['Evaluation'] != 0) & (df['FEN'].apply(lambda fen: fen.split()[1]) == 'w')].dropna()[:100000]

In [None]:
y = df['Evaluation']
fig = y.plot.hist(bins=100)
fig.set_title('Distribution of Evaluation Scores over Dataset')
fig.set_xlabel('Evaluation Score (Centipawns)')

In [None]:
### 80/10/10 train/val/test split
df_train, df_test  = train_test_split(df, test_size=0.2, random_state=1)
df_val, df_test= train_test_split(df_test, test_size=0.5, random_state=1)
print('Train/Validation/Test Splits: ' , df_train.shape[0], df_val.shape[0], df_test.shape[0])

In [None]:
"""
Credits to @shailesh for portions of this code.
https://github.com/ShaileshSridhar2403/neuralStockfish
"""
pieces = (chess.PAWN,chess.KNIGHT,chess.BISHOP,chess.ROOK,chess.QUEEN,chess.KING)
colours = (chess.WHITE,chess.BLACK)

def fenToVec(fen):
	"""
	Input: 
	FEN string.
	
	Output: 768-vector. Each of the 12 sets (where each set is one of the unique pieces) 
	of 64 elements (where each element is a square) is a one-hot encoding of whether the
	piece is on the square. 12 * 64 = 768.
	"""
	posFen = fen.split()[0]
	board = chess.BaseBoard(posFen)
	l = []
	
	for colour in colours:
		for piece in pieces:
			v = np.zeros(64)
			for i in board.pieces(piece,colour):
				v[i] = 1
			l.append(v)
	l = np.concatenate(l)
	return l


def vecToFen(vec):
	"""
	Reverses above function.
	"""
	vecList = np.split(vec,12)
	whiteList = vecList[:6]
	blackList = vecList[6:]
	board = chess.BaseBoard()
	board.clear_board()
	for pieceType in range(len(whiteList)):
		pieceArr = whiteList[pieceType]
		for ind in range(len(pieceArr)):
			if pieceArr[ind]:
				board.set_piece_at(ind ,chess.Piece(pieces[pieceType],chess.WHITE))
				
	for pieceType in range(len(blackList)):
		pieceArr = blackList[pieceType]
		for ind in range(len(pieceArr)):
			if pieceArr[ind]:
				board.set_piece_at(ind ,chess.Piece(pieces[pieceType],chess.BLACK))
	
	return board.board_fen()

In [None]:
def fenToVec2D(x):
    return pd.DataFrame(x.apply(fenToVec).values.tolist())

In [None]:
class BaselineModel:
    def __init__(self):
        # Value of pieces in centipawns.
        self.values = {chess.PAWN:100, chess.KNIGHT:300, chess.BISHOP:300, chess.ROOK:500, chess.QUEEN:900, chess.KING:0}

    def predict(self, x):
        board = chess.BaseBoard(x.split()[0])
        pred = sum((1 if piece.color == chess.WHITE else -1) * self.values[piece.piece_type] for piece in board.piece_map().values())
        return pred if pred else random.uniform(-1, 1)

In [None]:
def eval(y, y_pred):
    """
    Returns MSE of evaluation scores and accuracy of sign of evaluation 
    scores (which indicate winning player).
    """
    mse = np.mean((y - y_pred)**2)
    accuracy = np.mean(np.sign(y) == np.sign(y_pred))
    return mse, accuracy

In [None]:
### Baseline Test
baseline = BaselineModel()

y_pred = df_train['FEN'].apply(baseline.predict)
mse, accuracy = eval(df_train['Evaluation'], y_pred)
print(f'Baseline Train Error: {mse}.')
print(f'Baseline Train Accuracy: {accuracy}.')

y_pred = df_val['FEN'].apply(baseline.predict)
mse, accuracy = eval(df_val['Evaluation'], y_pred)
print(f'Baseline Validation Error: {mse}.')
print(f'Baseline Validation Accuracy: {accuracy}.')

y_pred = df_test['FEN'].apply(baseline.predict)
mse, accuracy = eval(df_test['Evaluation'], y_pred)
print(f'Baseline Test Error: {mse}.')
print(f'Baseline Test Accuracy: {accuracy}.')

In [52]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MLPModel(nn.Module):
    def __init__(self):
        super(MLPModel, self).__init__()
        DROPOUT_PROB = 0.5
        INPUT_SIZE = 768
        self.dropout = nn.Dropout(DROPOUT_PROB)
        self.linear1 = nn.Linear(INPUT_SIZE, INPUT_SIZE // 2)
        self.linear2 = nn.Linear(INPUT_SIZE // 2, INPUT_SIZE // 4)
        self.linear3 = nn.Linear(INPUT_SIZE // 4, 1)

    def predict(self, x, device):
        input = torch.as_tensor(fenToVec2D(x).values, dtype=torch.float32)
        input = input.to(device)
        input = F.relu(self.linear1(input))
        input = self.dropout(input)
        input = F.relu(self.linear2(input))
        input = self.dropout(input)
        output = self.linear3(input)
        return output

In [53]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        DROPOUT_PROB = 0.5
        INPUT_SIZE = 128
        self.dropout = nn.Dropout(DROPOUT_PROB)
        self.linear1 = nn.Linear(INPUT_SIZE, INPUT_SIZE // 2)
        self.linear2 = nn.Linear(INPUT_SIZE // 2, INPUT_SIZE // 4)
        self.linear3 = nn.Linear(INPUT_SIZE // 4, 1)
        self.conv1 = nn.Conv2d(in_channels = 12, out_channels = 32, kernel_size = 2, stride = (2,2))
        self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 2, stride = (2,2))
        self.conv3 = nn.Conv2d(in_channels = 64, out_channels = INPUT_SIZE, kernel_size = 2, stride = (2,2))
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

    def predict(self, x, device):
        input = torch.as_tensor(fenToVec2D(x).values, dtype=torch.float32)
        input = input.to(device)
        input = input.reshape(-1, 12, 8, 8)
        input = self.pool(self.conv1(input))
        input = self.pool(self.conv2(input))
        input = self.dropout(self.pool(self.conv3(input)))
        input = torch.flatten(input, start_dim=1)   
        input = F.relu(self.linear1(input))
        input = self.dropout(input)
        input = F.relu(self.linear2(input))
        input = self.dropout(input)
        output = self.linear3(input)
        return output

In [54]:
from math import ceil

def batches(dataset, sz=256):
    return enumerate(dataset[i*sz:(i+1)*sz] for i in range(ceil(dataset.shape[0] / sz)))

In [55]:
from tqdm import tqdm

def test(model, device, dataset, flag):
    model.eval()
    with torch.no_grad():
        truth = []
        predictions = []
        for step, batch in tqdm(batches(dataset), desc=f"{flag} eval"):
            b_fens, b_labels = batch['FEN'], batch['Evaluation']
            truth.extend(b_labels)
            logits = model.predict(b_fens, device)
            logits = logits.detach().cpu().numpy()
            predictions.extend(logits)
        mse, accuracy = eval(np.array(truth).flatten(), np.array(predictions).flatten())
    return mse, accuracy

In [56]:
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau

def train(modelType, trainset, valset, testset, lr, epochs):
    print(f"Beginning training with {epochs} epochs.")

    device = torch.device('cuda')

    model = modelType()
    model = model.to(device)

    optimizer = AdamW(model.parameters(), lr=lr) #weight decay 0.01
    scheduler = ReduceLROnPlateau(optimizer, 'min')
    
    train_mse, train_acc = test(model, device, trainset, 'train')
    val_mse, val_acc = test(model, device, valset, 'val')

    for epoch in range(epochs):
        print(f"epoch number: {epoch}, train acc: {train_acc}, val acc: {val_acc}, train mse: {train_mse}, val mse: {val_mse}")
        model.train()

        for step, batch in tqdm(batches(trainset), desc='train'):
            b_fens, b_labels = batch['FEN'], batch['Evaluation']

            optimizer.zero_grad()

            logits = model.predict(b_fens, device)

            b_labels = torch.as_tensor(b_labels.values).float()
            b_labels = b_labels.to(device)

            loss = F.mse_loss(logits.flatten(), b_labels)
            loss.backward()
            optimizer.step()

        train_mse, train_acc = test(model, device, trainset, 'train')
        val_mse, val_acc = test(model, device, valset, 'val')
        
        scheduler.step(val_mse)
    
    test_mse, test_acc = test(model, device, testset, 'test')
    print(f"epoch number: {epochs}, train acc: {train_acc}, val acc: {val_acc}, test acc: {test_acc}, train mse: {train_mse}, val mse: {val_mse}, test mse: {test_mse}")

In [51]:
### MLP Test
train(MLPModel, df_train, df_val, df_test, 3e-3, 40)

Beginning training with 40 epochs.


train eval: 313it [00:24, 12.95it/s]
val eval: 40it [00:02, 13.44it/s]


epoch number: 0, train acc: 0.52055, val acc: 0.5209, train mse: 81869.85720377135, val mse: 79586.3116732814


train: 313it [00:23, 13.24it/s]
train eval: 313it [00:23, 13.44it/s]
val eval: 40it [00:03, 13.01it/s]


epoch number: 1, train acc: 0.7855125, val acc: 0.7858, train mse: 61469.91131155135, val mse: 62006.08962090879


train: 313it [00:24, 12.95it/s]
train eval: 313it [00:22, 13.66it/s]
val eval: 40it [00:02, 13.91it/s]


epoch number: 2, train acc: 0.826275, val acc: 0.8157, train mse: 50379.3557394038, val mse: 53393.04936509303


train: 313it [00:23, 13.17it/s]
train eval: 313it [00:23, 13.54it/s]
val eval: 40it [00:02, 13.76it/s]


epoch number: 3, train acc: 0.84495, val acc: 0.8308, train mse: 40533.96615000953, val mse: 46293.60579464734


train: 313it [00:23, 13.32it/s]
train eval: 313it [00:23, 13.58it/s]
val eval: 40it [00:03, 13.12it/s]


epoch number: 4, train acc: 0.8540625, val acc: 0.8376, train mse: 32934.38919789142, val mse: 40959.773220352065


train: 313it [00:23, 13.40it/s]
train eval: 313it [00:23, 13.14it/s]
val eval: 40it [00:02, 13.75it/s]


epoch number: 5, train acc: 0.85665, val acc: 0.8391, train mse: 27697.23667456863, val mse: 37428.83589791022


train: 313it [00:24, 12.93it/s]
train eval: 313it [00:23, 13.17it/s]
val eval: 40it [00:03, 12.20it/s]


epoch number: 6, train acc: 0.862525, val acc: 0.8458, train mse: 24648.926490234633, val mse: 35482.756128927955


train: 313it [00:23, 13.08it/s]
train eval: 313it [00:23, 13.38it/s]
val eval: 40it [00:03, 13.07it/s]


epoch number: 7, train acc: 0.86315, val acc: 0.8456, train mse: 22348.20323381009, val mse: 33863.55325828645


train: 170it [00:13, 12.98it/s]


KeyboardInterrupt: 

In [58]:
### CNN Test
trained_model = train(CNNModel, df_train, df_val, df_test, 3e-4, 80)

Beginning training with 80 epochs.


train eval: 0it [00:00, ?it/s]


RuntimeError: Given input size: (128x1x1). Calculated output size: (128x0x0). Output size is too small

In [57]:
import chess.svg

def experimentation(model, examples):
    for example in examples.iterrows():
        board = chess.BaseBoard(example['FEN'])
        chess.svg.board(board, squares=chess.SquareSet(chess.BB_DARK_SQUARES & chess.BB_FILE_B), size=350)