In this notebook I'm going to make a neural network that rates the quality and likelihood of a position on a chess board. The goal is to seed a neural network so that it can later be used as a heuristic for a UCT algorithm and learn through self play. Learning from scratch would take too long to converge to anything meaningful so I'll try to start here.

In [1]:
import chess
import chess.pgn

import pandas as pd
import numpy as np
import random 

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.optim as optim

import sqlite3

This is a huge file of past games I found on the internet. I think it's exact purpose is to provide lines to a chess engine. I have to parse the games into a format I can work with, for now the reward from the engine will be -1 for loss, 1 for win, and 0 for draw. I'm going to abstract away black/white by making the current player be "white" and switch the game result from -1 to 1 based on whether it's a win or loss when we hit the actual model training. Iteration is for when I start training a model so I can track results over time.

In [2]:
pgn_path = r'C:\Users\natha\Documents\PythonNotebooks\data\Chess\chess_games.pgn'

model_num = 5
db_path = f'data/Chess/training_data_{model_num}.db'

In [3]:
def populate_database():
    count = 0
    dfs = []
    
    with open(pgn_path, 'r') as pgn_file:
        game = chess.pgn.read_game(pgn_file)
        while game:
            iteration = 0
            board = chess.Board()
            score = game.headers['Result']
            if score == '1/2-1/2':
                score = 0
            elif score == '1-0':
                score = 1
            elif score == '0-1':
                score = -1
            
            rows = []
            try:
                for move in game.mainline_moves():
                    board.push(move)
                    rows.append((board.fen(), score, iteration))
                dfs.append(pd.DataFrame(data=rows, columns=['fen', 'score', 'iteration']))
                count += 1
        
                if count >= 1000:
                    pd.concat(dfs).to_sql('training', conn, if_exists='append', index=False)
                    count = 0
            except:
                pass
    
            try:
                game = chess.pgn.read_game(pgn_file)
            except:
                game = chess.pgn.read_game(pgn_file)

In [4]:
class ChessDataset(Dataset):
    symbol_to_channel = {
        'r': 0,
        'n': 1,
        'b': 2,
        'k': 3,
        'q': 4,
        'p': 5,
        'R': 6,
        'N': 7,
        'B': 8,
        'K': 9,
        'Q': 10,
        'P': 11
    }
    
    def __init__(self, db_path, total_rows, offset=0):        
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        get_rows = """SELECT fen, score, likelihood FROM training LIMIT ? OFFSET ?"""
        cursor.execute(get_rows, (total_rows, offset))
        self.data = []
        rows = cursor.fetchall()
        count = 0
        for row in rows:
            if count % (total_rows // 10) == 0:
                print(f'{count}/{total_rows}')
            count += 1
            self.data.append(self.fen_to_tensor(row[0], row[1], row[2]))
        
    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    @staticmethod
    def fen_to_tensor(fen, score, likelihood):
        array = np.zeros((12, 64))
        board = chess.Board(fen=fen)
        piece_map = board.piece_map()
        for square_num in piece_map:
            piece_type = str(piece_map[square_num])
            channel = ChessDataset.symbol_to_channel[piece_type]
            array[channel][square_num] = 1
        array = array.reshape(12,8,8)
        if board.turn == chess.BLACK:
            array = np.concatenate((array[6:], array[:6]), axis=0)
            array = np.array([np.flipud(sub_array) for sub_array in array])
        return torch.tensor(array, dtype=torch.float32), torch.tensor((score, 1.0), dtype=torch.float32)

In [5]:
dataset = ChessDataset(db_path=db_path, total_rows=1024000)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True)

#validation_dataset = ChessDataset(db_path, total_rows=102400, offset=1024000)
#val_dataloader = DataLoader(validation_dataset, batch_size=256, shuffle=True)

0/1024000


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ChessNet(nn.Module):
    def __init__(self):
        super(ChessNet, self).__init__()
        # Define convolutional layers
        self.conv1 = nn.Conv2d(12, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1)

        # Define fully connected layers
        self.fc1 = nn.Linear(128 * 8 * 8, 256)
        self.fc2 = nn.Linear(256, 1)
        self.fc3 = nn.Linear(256, 1)

    def forward(self, x):
        # Convolutional layers with ReLU activation
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        
        # Flatten the output for the fully connected layers
        x = x.view(-1, 128 * 8 * 8)
        
        # Fully connected layers with ReLU activation
        x = F.relu(self.fc1(x))
        likelihood = torch.sigmoid(self.fc2(x))
        quality = torch.tanh(self.fc3(x))
        return quality, likelihood

In [7]:
def generate_random_fens(batch_size):
    board = chess.Board()
    starting_pieces = list(board.piece_map().values())
    numbers = list(range(2, 33))
    weights = np.array(list(range(2, 33)))
    weights = weights / sum(weights)

    fens = []
    for i in range(batch_size):
        new_piece_map = {}
        random.shuffle(starting_pieces)

        random_count = np.random.choice(numbers, p=weights)
        count = 0
        for value in starting_pieces:
            new_piece_map[random.choice(chess.SQUARES)] = value
            count += 1
            if count >= random_count:
                break
        board.set_piece_map(new_piece_map)
        fens.append(board.fen())
    return fens
def get_random_tensors(batch_size):
    fens = generate_random_fens(64)
    input_tensors = []
    for fen in fens:
        input_tensor, _ = ChessDataset.fen_to_tensor(fen, 0, 0)
        input_tensors.append(input_tensor)
    inputs = torch.stack(input_tensors)
    labels = torch.zeros(64)
    return inputs, labels

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ChessNet()
model.to(device)

num_epochs = 12

quality_criterion = nn.MSELoss()  # You can use Mean Squared Error for quality
likelihood_criterion = nn.BCELoss()

optimizer = optim.Adam(model.parameters(), lr=0.001)

In [9]:
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs, targets) in enumerate(dataloader, 0):
        inputs = inputs.squeeze().to(device)
        quality_targets = targets.squeeze()[:, 0].to(device)
        likelihood_targets = targets.squeeze()[:, 1].to(device)
    
        optimizer.zero_grad()
    
        quality_output, likelihood_output = model(inputs)
    
        # Calculate losses
        quality_loss = quality_criterion(quality_output.squeeze(), quality_targets)
        likelihood_loss = likelihood_criterion(likelihood_output.squeeze(), likelihood_targets)
    
        # Backpropagation for quality and likelihood on real data
        total_loss = quality_loss + likelihood_loss
        total_loss.backward()
    
        # Train likelihood on fake data
        fake_inputs, fake_likelihoods = get_random_tensors(inputs.size(0))
        fake_inputs = fake_inputs.to(device)
        fake_likelihoods = fake_likelihoods.to(device)
    
        _, fake_likelihood_output = model(fake_inputs)
    
        # Calculate likelihood loss on fake data
        fake_likelihood_loss = likelihood_criterion(fake_likelihood_output.squeeze(), fake_likelihoods)
    
        # Backpropagate on fake likelihood loss
        fake_likelihood_loss.backward()
    
        # Accumulate running loss
        running_loss += (quality_loss.item() + likelihood_loss.item() + fake_likelihood_loss.item())
    
        # Update parameters
        optimizer.step()
        
    print(f'Epoch [{epoch + 1}/{num_epochs}]\n  Test Loss: {running_loss / len(dataloader):.4f}')
    running_loss = 0.0


    #validation_loss = 0.0
    #running_quality_loss = 0.0
    #with torch.no_grad():
    #    for inputs_val, targets_val in val_dataloader:
    #        inputs_val = inputs_val.squeeze().to(device)
    #        quality_targets_val = targets_val.squeeze()[:, 0].to(device)
    #        likelihood_targets_val = targets_val.squeeze()[:, 1].to(device)
    #        
    #        quality_output_val, likelihood_output_val = model(inputs_val)
    #        
    #        # Calculate validation losses
    #        quality_loss_val = quality_criterion(quality_output_val.squeeze(), quality_targets_val)
    #        likelihood_loss_val = likelihood_criterion(likelihood_output_val.squeeze(), likelihood_targets_val)
    #        
    #        validation_loss += (quality_loss_val + likelihood_loss_val).item()
    #        running_quality_loss += quality_loss_val.item()
            
    #print(f'  Validation Loss: {validation_loss / len(val_dataloader):.4f}\t {running_quality_loss / len(val_dataloader):.4f}\n')
            

Epoch [1/12]
  Test Loss: 0.1792
Epoch [2/12]
  Test Loss: 0.0292
Epoch [3/12]
  Test Loss: 0.0233
Epoch [4/12]
  Test Loss: 0.0153
Epoch [5/12]
  Test Loss: 0.0109
Epoch [6/12]
  Test Loss: 0.0180
Epoch [7/12]
  Test Loss: 0.0163
Epoch [8/12]
  Test Loss: 0.0141
Epoch [9/12]
  Test Loss: 0.0087
Epoch [10/12]
  Test Loss: 0.0073
Epoch [11/12]
  Test Loss: 0.0116
Epoch [12/12]
  Test Loss: 0.0060


In [10]:
torch.save(model.state_dict(), f'data/Chess/model_{model_num+1}.pth')