In [1]:
!pip install python-chess

Collecting python-chess
  Downloading python_chess-1.999-py3-none-any.whl (1.4 kB)
Collecting chess<2,>=1 (from python-chess)
  Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess, python-chess
Successfully installed chess-1.10.0 python-chess-1.999


In [2]:
import chess.pgn
import os
import numpy as np
import math
import random
from tqdm import tqdm

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
"""Data source:https://www.chess.com/forum/view/general/chess-pgn-database-over-9-million-games
which contains 9 million chess games in PGN format.
"""

class ChessDataset(Dataset):
    """Chess dataset class for batch loading PGN files."""
    def __init__(self, positions, moves):
        self.positions = positions
        self.moves = moves

    def __len__(self):
        return len(self.positions)

    def __getitem__(self, idx):
        position = self.positions[idx]
        move = self.moves[idx]
        move_encoded = encode_move(move)
        # Return class index instead of one-hot encoding
        return position, move_encoded  # Convert to integer


def board_to_tensor(board):
    """Encode the chess board to a 8x8x14 tensor representation."""
    piece_map = board.piece_map()
    tensor = np.zeros((8, 8, 14), dtype=np.float32)

    # Define piece types and their channels
    piece_channels = {
        chess.PAWN: 0,
        chess.KNIGHT: 1,
        chess.BISHOP: 2,
        chess.ROOK: 3,
        chess.QUEEN: 4,
        chess.KING: 5
    }

    for square, piece in piece_map.items():
        row = 7 - square // 8
        col = square % 8
        channels = piece_channels[piece.piece_type]
        if piece.color == chess.WHITE:
            tensor[row, col, channels] = 1
        else:
            tensor[row, col, channels + 6] = 1

    # Encode the player's turn channel
    tensor[:, :, 12] = 1 if board.turn == chess.WHITE else 0

    # Encode castling move
    if board.has_kingside_castling_rights(chess.WHITE):
        tensor[7, 7, 13] = 1  # Kingside castling rights for white
    if board.has_queenside_castling_rights(chess.WHITE):
        tensor[7, 0, 13] = 1  # Queenside castling rights for white
    if board.has_kingside_castling_rights(chess.BLACK):
        tensor[0, 7, 13] = 1  # Kingside castling rights for black
    if board.has_queenside_castling_rights(chess.BLACK):
        tensor[0, 0, 13] = 1  # Queenside castling rights for black

    # Encode en passant square
    if board.ep_square is not None:
        row = 7 - board.ep_square // 8
        col = board.ep_square % 8
        tensor[row, col, 13] = 1

    return torch.tensor(tensor).permute(2, 0, 1)


def parse_pgn_to_tensor(pgn_file):
    """Parse PGN file and return a list of tensors."""
    positions = []
    moves = []
    print(os.getcwd())
    with open(pgn_file, mode="rt", encoding='utf-8') as f:
        while True:
            game = chess.pgn.read_game(f)
            if game is None:
                break
            board = game.board()
            for move in game.mainline_moves():
                positions.append(board_to_tensor(board))
                moves.append(move)
                board.push(move)
    return positions, moves

def encode_move(move):
    """Encode the move to a 0-63 integer."""
    from_square = move.from_square
    to_square = move.to_square
    # Combine the squares into a single index
    return from_square * 64 + to_square

# Example usage

# positions, moves = parse_pgn_to_tensor(pgn_file_path)
# dataset = ChessDataset(positions, moves)
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [None]:
class NeuralNet(nn.Module):
    def __init__(self, input_shape=(8, 8, 14)):
        """input_shape: tuple, shape of the input tensor (height, width, channels)
            14 channels = 6 channels for white pieces: Pawn, Knight, Bishop, Rook, Queen, King
                          6 channels for black pieces:
                          1 channel for indicating if it's white's turn: All 1s if white, 0s if black
                          1 channel for special rules: E.g., castling rights or en passant availability
        """
        super(NeuralNet, self).__init__()

        # Define the neural network architecture
        # Convolutional layers
        self.conv1 = nn.Conv2d(input_shape[2], 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Fully connected layers
        self.fc1 = nn.Linear(64 * 4 * 4, 256)

        # Policy head
        self.policy_head = nn.Linear(256, 4096) # Assume there are 4096 possible moves in a turn

        # Value head
        self.value_head = nn.Linear(256, 1) # Output a evaluation score between -1 and 1

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool(x)

        x = x.view(-1, 64 * 4 * 4) # Flatten the tensor

        x = F.relu(self.fc1(x))

        policy = F.softmax(self.policy_head(x), dim=1)
        evaluation = torch.tanh(self.value_head(x))

        return policy, evaluation

    def train_model(self, model, train_loader, epochs, learning_rate=0.01):
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        criteration_policy = nn.CrossEntropyLoss()
        criteration_value = nn.MSELoss()

        model.train()
        for epoch in range(epochs):
            total_loss_policy = 0
            total_loss_value = 0

            for data in train_loader:
                inputs, policy_targets = data
                optimizer.zero_grad()

                policy_pred, value_pred = model(inputs)

                # Convert policy_targets to Tensor
                policy_targets = torch.tensor(policy_targets, dtype=torch.long)

                policy_loss = criteration_policy(policy_pred, policy_targets)
                value_loss = criteration_value(value_pred, torch.zeros_like(value_pred))

                loss = policy_loss + value_loss
                loss.backward()
                optimizer.step()

                total_loss_policy += policy_loss.item()
                total_loss_value += value_loss.item()

            print(f"Epoch {epoch + 1}/{epochs}, Policy loss: {total_loss_policy}, Value loss: {total_loss_value}")

# Example usage
pgn_file_path = "drive/MyDrive/Colab Notebooks/preprocessed_db.pgn"
positions, moves = parse_pgn_to_tensor(pgn_file_path)

dataset = ChessDataset(positions, moves)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

model = NeuralNet(input_shape=(8, 8, 14))
model.train_model(model=model, train_loader=dataloader, epochs=10)
torch.save(model.state_dict(), "chess_cnn_model.pth")


/content


In [None]:
class Node:
    def __init__(self, board):
        self.M = 0
        self.V = 0
        self.board = board
        self.visitedMovesAndNodes = []
        self.nonVisitedLegalMoves = []

        self.parent = None
        for move in board.legal_moves:
            self.nonVisitedLegalMoves.append(move)

    def isLeaf(self):
        return len(self.nonVisitedLegalMoves) != 0

    def isTerminal(self):
        return len(self.nonVisitedLegalMoves) == 0 and len(self.visitedMovesAndNodes) == 0

class MCTS:
    def __init__(self, board, maxIter):
        self.root = Node(board)
        self.maxIter = maxIter

    def selection(self, node):
        if node.isLeaf() or node.isTerminal():
            return node

        maxUCTChild = None
        maxUCTValue = -float('inf')
        for move, child in node.visitedMovesAndNodes:
            uctValue = self.uct(child, node)
            if uctValue > maxUCTValue:
                maxUCTValue = uctValue
                maxUCTChild = child
        if maxUCTChild is None:
            raise ValueError ("Could not identify child with best UCT value ")

        return self.selection(maxUCTChild)


    def uct(self, node, parent):
        return node.M + math.sqrt(2 * math.log(parent.V) / node.V)


    def expansion(self, node):
        move = random.choice(node.nonVisitedLegalMoves)
        node.nonVisitedLegalMoves.remove(move)
        board = node.board.copy()
        board.push(move)
        child = Node(board)
        child.parent = node
        node.visitedMovesAndNodes.append((move, child))
        return child

    def simulation(self, node):
        board = node.board.copy()
        while board.outcome(claim_draw = True) is None:
            move = random.choice(list(board.legal_moves))
            board.push(move)

        payout = 0.5
        outcome = board.outcome(claim_draw = True)

        if outcome == 1:
            payout = 1
        elif outcome == -1:
            payout = 0
        elif outcome == 0:
            payout = 0.5

        return payout

    def backpropagation(self, node, result):
        node.M = ((node.M * node.V) + result) / (node.V + 1)
        node.V += 1

        if node.parent is not None:
            self.backpropagation(node.parent, result)

if __name__ == "__main__":
    board = chess.Board("r1bqkb1r/pppp1ppp/2n2n2/4p2Q/2B1P3/8/PPPP1PPP/RNB1K1NR w KQkq - 4 4")
    root = MCTS(board, 200)
    for i in range(root.maxIter):
        node = root.selection(root.root)
        if not node.isTerminal():
            node = root.expansion(node)
        result = root.simulation(node)
        root.backpropagation(node, result)
    root.root.visitedMovesAndNodes.sort(key = lambda x: x[1].V, reverse = True)
    print([(m.uci(), child.M , child.V) for m, child in root.root.visitedMovesAndNodes[0:10]])
