In [None]:
import subprocess

from chessf.parser import FilePGN
from chessf.engine import Stockfish

import numpy as np
import pandas as pd
pd.set_option("display.max_rows", 200)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
torch.manual_seed(0)

import plotly.express as px
from time import perf_counter

## Prepare data

In [None]:
pgn_2017_02 = "pgn/lichess_db_standard_rated_2017-02.pgn"
file = FilePGN(pgn_2017_02)

In [None]:
stockfish_path = "stockfish/stockfish.exe"
stockfish = Stockfish(stockfish_path)

In [None]:
moves, info = file.get_and_parse_next_good_game()

In [None]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x / 300))

def inverse(x, a, b, c):
    return a/(x-b) + c

popt2 = [0.54774, -2.797004, 0.790583]
popt3 = [0.548174, 2.829586, -0.785972]

def eval_to_winodds(eval_type, eval_int):

    if eval_type == "cp":
        return sigmoid(eval_int)
    else:
        if eval_int > 0:
            return inverse(eval_int, *popt2)
        elif eval_int < 0:
            return inverse(eval_int, *popt3)
    return 0

In [None]:
def get_df():
    
    game_moves, info = file.get_and_parse_next_good_game()
    if len(game_moves) < 15:
        return [], []
    
    stockfish.start_new_game()

    eval_types, eval_ints = [], []
    bitboards = []
    for game_move in game_moves:

        (eval_type, eval_int), (second_type, second_int) = stockfish.get_eval(depth=3)
        win_odds = eval_to_winodds(eval_type, eval_int)
        win_odds_second = eval_to_winodds(second_type, second_int)

        (eval_type, eval_int), (second_type, second_int) = stockfish.get_eval(depth=1)
        win_odds_2 = eval_to_winodds(eval_type, eval_int)

        bbm = stockfish.get_bitboard_matrix().flatten().astype(np.float32)

        eval_and_more = np.array([win_odds, win_odds_second, win_odds_2, win_odds - win_odds_2, win_odds - win_odds_second])
        
        features = np.concat([bbm, eval_and_more])
        add_info = np.array([
            info['WhiteElo'] / 3000, 
            info['BlackElo'] / 3000, 
            (info['WhiteElo'] - info['BlackElo']) / 1000
        ])
        features = np.concat([features, add_info])
        
        bitboards.append( features.copy() )
        stockfish.make_pgn_move(game_move)            

    if stockfish.get_legal_moves():
        pass
    
    bitboards = np.array(bitboards)
    result_int = {"1-0": 1, "0-1": 0, "1/2-1/2": 0}.get(info["Result"], 0)
    results = result_int * np.ones(bitboards.shape[0], dtype=np.uint8)
    
    return bitboards[10:], results[10:]

In [None]:
def get_data(n_games=10):

    XX = np.zeros((1, 776), dtype=np.float32)
    YY = np.zeros(1, dtype=np.uint8)
    
    n_found = 0
    while n_found < n_games:
        X, Y = get_df()
        if not len(X):
            continue

        XX = np.concatenate((XX, X))
        YY = np.concatenate((YY, Y))
        
        n_found += 1

    return torch.Tensor(XX), torch.Tensor(YY)

In [None]:
%%time
test_data = get_data(n_games=300)

## NN

In [None]:
N_CLASSES = 2

class SimpleFC(nn.Module):
    def __init__(self):
        super().__init__()        
        self.norm_input = nn.BatchNorm1d(776)
        
        self.layer_1 = nn.Linear(776, 16)
        self.norm_1 = nn.BatchNorm1d(16)

        self.layer_2 = nn.Linear(16, 16)
        self.norm_2 = nn.BatchNorm1d(16)

        self.layer_3 = nn.Linear(16, 16)
        self.norm_3 = nn.BatchNorm1d(16)
        
        self.layer_out = nn.Linear(16, N_CLASSES) 
        
    def forward(self, x):
        x = self.norm_input(x)
        
        x = F.relu(self.layer_1(x))
        x = self.norm_1(x)

        x = F.relu(self.layer_2(x))
        x = self.norm_2(x)

        x = F.relu(self.layer_3(x))
        x = self.norm_3(x)
        
        x = F.sigmoid(self.layer_out(x))
        return x

In [None]:
model = SimpleFC()

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
train_losses = []
test_losses = []
test_accuracy = []

## Training

In [None]:
for epoch in range(10):
    # <train>
    
    model.train()
    running_loss = 0.0

    train_data = get_data(n_games=200)
    n_times_to_see_train = 2
    
    for i in range(n_times_to_see_train):

        inputs, labels = train_data
        labels = labels.type(torch.LongTensor)        
        
        optimizer.zero_grad()
    
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

    mean_loss = running_loss / n_times_to_see_train
    train_losses.append(mean_loss)

    # <test>
    
    model.eval()
    correct, total = 0, 0
    running_loss = 0.0

    with torch.no_grad():
            
        images, labels = test_data
        labels = labels.type(torch.LongTensor)
        
        outputs = model(images)
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    mean_loss_test = running_loss / 1
    test_losses.append(mean_loss_test)

    accuracy = correct / total
    test_accuracy.append(accuracy)

    print(f"Epoch: {epoch}, Loss: {mean_loss:.6f}, Test: {mean_loss_test:.6f}, Acc: {accuracy:.6f}")

## Plot results

In [None]:
fig = px.line(y=test_losses, template='plotly_dark')
fig.update_traces(mode="lines")
fig.update_layout(xaxis_title="Эпоха", yaxis_title="Test Loss")
fig.update_layout(height=500, width=1000)
fig.show()

In [None]:
fig = px.line(y=train_losses, template='plotly_dark')
fig.update_traces(mode="lines")
fig.update_layout(xaxis_title="Эпоха", yaxis_title="Loss")
fig.update_layout(height=500, width=1000)
fig.show()

In [None]:
fig = px.line(y=test_accuracy, template='plotly_dark')
fig.update_traces(mode="lines")
fig.update_layout(xaxis_title="Эпоха", yaxis_title="Test Accuracy")
fig.update_layout(height=500, width=1000)
fig.show()

In [None]:
model.eval()
with torch.no_grad():
    images, labels = test_data
    labels = labels.type(torch.LongTensor)
    outputs = model(images)

pred = outputs[:, 1].numpy()
y = labels.numpy()
df = pd.DataFrame({"pred": pred, "y": y})

In [None]:
px.line(
    df.groupby( df["pred"] // 0.10 * 0.10 + 0.05 ).agg({"y": "mean"}).squeeze(),
    template='plotly_dark'
).update_layout(height=800, width=800, yaxis_range=(0, 1), xaxis_range=(0, 1)).update_traces(mode="lines+markers")