In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import os
import chess.pgn
import numpy as np

if torch.cuda.is_available():
    device = torch.device("cuda")
    print('Using CUDA device:', device)
else:
    device = torch.device("cpu")
    print('CUDA is not available, using CPU')

Using CUDA device: cuda


In [2]:
def create_uci_labels():
    """
    Creates the labels for the universal chess interface into an array and returns them
    :return:
    """
    labels_array = []
    letters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']
    numbers = ['1', '2', '3', '4', '5', '6', '7', '8']
    promoted_to = ['q', 'r', 'b', 'n']

    for l1 in range(8):
        for n1 in range(8):
            destinations = [(t, n1) for t in range(8)] + \
                           [(l1, t) for t in range(8)] + \
                           [(l1 + t, n1 + t) for t in range(-7, 8)] + \
                           [(l1 + t, n1 - t) for t in range(-7, 8)] + \
                           [(l1 + a, n1 + b) for (a, b) in
                            [(-2, -1), (-1, -2), (-2, 1), (1, -2), (2, -1), (-1, 2), (2, 1), (1, 2)]]
            for (l2, n2) in destinations:
                if (l1, n1) != (l2, n2) and l2 in range(8) and n2 in range(8):
                    move = letters[l1] + numbers[n1] + letters[l2] + numbers[n2]
                    labels_array.append(move)
    for l1 in range(8):
        l = letters[l1]
        for p in promoted_to:
            labels_array.append(l + '2' + l + '1' + p)
            labels_array.append(l + '7' + l + '8' + p)
            if l1 > 0:
                l_l = letters[l1 - 1]
                labels_array.append(l + '2' + l_l + '1' + p)
                labels_array.append(l + '7' + l_l + '8' + p)
            if l1 < 7:
                l_r = letters[l1 + 1]
                labels_array.append(l + '2' + l_r + '1' + p)
                labels_array.append(l + '7' + l_r + '8' + p)
    return labels_array

uci_table = create_uci_labels()
uci_dict = {label: i for i, label in enumerate(uci_table)}

In [3]:
piece_values = {
    "p": 1,
    "n": 3,
    "b": 3,
    "r": 5,
    "q": 9,
    "k": 1000,
    "P": -1,
    "N": -3,
    "B": -3,
    "R": -5,
    "Q": -9,
    "K": -1000,
}

def createStateObj(board):
    # convert state into a 8x8x12 tensor
    state = np.zeros((18, 8, 8), dtype=np.float32)
    net_piece_value = 0

    for square, piece in board.piece_map().items():
        for square, piece in board.piece_map().items():
            net_piece_value += piece_values[str(piece)]
            if piece.color == chess.WHITE:
                state[piece.piece_type - 1, square // 8, square % 8] = 1.0
            else:
                state[piece.piece_type + 5, square // 8, square % 8] = 1.0

    # append the 5 states above
    p1_can_castle_queenside = board.has_queenside_castling_rights(chess.WHITE)
    p1_can_castle_kingside = board.has_kingside_castling_rights(chess.WHITE)
    p2_can_castle_queenside = board.has_queenside_castling_rights(chess.BLACK)
    p2_can_castle_kingside = board.has_kingside_castling_rights(chess.BLACK)
    turn = board.turn

    state[12,:,:] = float(p1_can_castle_queenside)
    state[13,:,:] = float(p1_can_castle_kingside)
    state[14,:,:] = float(p2_can_castle_queenside)
    state[15,:,:] = float(p2_can_castle_kingside)
    state[16,:,:] = float(turn)
    state[17,:,:] = float(net_piece_value)
    
    return state

tag = ('1/2-1/2', '1-0', '0-1')

def createData(fp, n_data=10000):
    X = []
    y = []

    for i in range(n_data):
        game = chess.pgn.read_game(fp)
        if game is None:
            if len(X) >= 10:
                return np.array(X), np.array(y)
            raise StopIteration

        # if game.headers["Result"] == '1/2-1/2':
        #     win = 0
        # elif game.headers["Result"] == '1-0':
        #     win = 1
        # elif game.headers["Result"] == '0-1':
        #     win = -1
        # else:
        #     print("Error: Unexpected result string" + game.headers["Result"])
        #     continue
        if game.headers["Result"] not in tag:
            continue

        board = game.board()

        lst = list(game.mainline_moves())
        for move in lst[:-1]:
            X.append(createStateObj(board))
            board.push(move)
            y_ele = np.zeros(1968)
            try:
                y_ele[uci_dict[str(move)]] = 1.0
            except KeyError:
                X.pop()
                break
            y.append(y_ele)
    
    data = list(zip(X, y))
    random.shuffle(data)
    X, y = zip(*data)

    return np.array(X), np.array(y)

In [4]:
class MyNeuralNet(nn.Module):
    def __init__(self):
        super(MyNeuralNet, self).__init__()
        # input: 8*8*18 -> 8*8*128 -> 8*8*128 -> 1*1968
        # conv -> batchnorm -> relu -> conv -> batchnorm -> relu -> flatten
        self.conv1 = nn.Conv2d(18, 128, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(128)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(128, 128, 3, padding=1)
        self.bn2 = nn.BatchNorm2d(128)
        self.relu2 = nn.ReLU()
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(8*8*128, 1968)
        self.sm = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.conv1(x)
        x = x.view(-1, 128, 8, 8)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.flatten(x)
        x = self.fc(x)
        x = self.sm(x)
        return x
    
    def predict(self, x):
        with torch.no_grad():
            x = torch.tensor(x, dtype=torch.float)
            x = x.view(1, 18, 8, 8)
            return self.forward(x)

In [5]:
class DataLoader:
    def __init__(self, filename):
        self.filename = filename
        self.fp = open(self.filename, 'r')

    def _create_data(self, step):
        try:
            X, y = createData(self.fp, step)
        except StopIteration:
            self.fp.close()
            self.fp = open(self.filename, 'r')
            print("reached end of", self.filename)
            try:
                with open('log_training.txt', 'a') as tfp:
                    tfp.write(f"reached end of {self.filename}\n")
            except:
                pass
            raise StopIteration
        
        return X, y

    def get_data(self, step=10000):
        return self._create_data(step)

In [6]:
class TestLoader(DataLoader):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.X, self.y = self.get_data(250)
        self.X = torch.tensor(self.X, dtype=torch.float32).to(device)
        self.y = torch.tensor(self.y, dtype=torch.float32).to(device)
        self.fp.close()

In [13]:
# training_iterators1 = [
#     DataLoader(f"data/db{yr}.pgn") for yr in range(2000, 2023)
# ]

training_iterators = [
    DataLoader(f"filtered/output-2015_{mo:02d}.pgn") for mo in range(8, 13)
] + [
    DataLoader(f"filtered/output-2016_{mo:02d}.pgn") for mo in range(1, 13)
] + [
    DataLoader(f"filtered/output-2016_{mo:02d}.pgn") for mo in range(1, 13)
]

training_idx = 0

In [None]:
testing_iterator = TestLoader("processed/db2023.pgn")

In [10]:
model = MyNeuralNet()

model.load_state_dict(torch.load('models/model_16109.pt'))

model.to(device)

MyNeuralNet(
  (conv1): Conv2d(18, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu1): ReLU()
  (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu2): ReLU()
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc): Linear(in_features=8192, out_features=1968, bias=True)
  (sm): Softmax(dim=1)
)

In [None]:
# hyperparameters

lr = 0.0001
num_epochs = 30000

optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

# Train the network
for epoch in range(25000, num_epochs):
    with open('log25000.txt', 'a') as f:
        f.write(f"Epoch {epoch + 1} of {num_epochs}")
    print(f"Epoch {epoch + 1} of {num_epochs}")
    running_loss = 0.0
    
    # Get the inputs
    try:
        X, y = training_iterators[training_idx].get_data(300)
    except StopIteration:
        training_idx += 1
        if training_idx == len(training_iterators):
            training_idx = 0
        continue

    X = torch.tensor(X, dtype=torch.float32).to(device)
    y = torch.tensor(y, dtype=torch.float32).to(device)

    # Zero the parameter gradients
    optimizer.zero_grad()

    # Forward + backward + optimize
    outputs = model(X)
    loss = criterion(outputs, y)
    loss.backward()
    optimizer.step()

    # Print statistics
    running_loss += loss.item()
    with open('log25000.txt', 'a') as f:
        f.write('[%d] training loss: %.5f' % (epoch + 1, running_loss))
    print('[%d] training loss: %.5f' % (epoch + 1, running_loss))
    running_loss = 0.0
    
    del X
    del y
    del outputs

    X, y = testing_iterator.X, testing_iterator.y
    if epoch % 10 == 0:
        with torch.no_grad():
            outputs = model(X)
            loss = criterion(outputs, y)
        running_loss += loss.item()
        with open('log25000.txt', 'a') as f:
            f.write('[%d] test loss: %.5f' % (epoch + 1, running_loss))
        print('[%d] test loss: %.5f' % (epoch + 1, running_loss))
        running_loss = 0.0
        del X
        del y
        del outputs

    if epoch % 100 == 9:
        torch.save(model.state_dict(), f"models/model_{epoch}.pt")